Source code for tpg_256a_pressure_monitor.Monitor

""" Background monitoring thread based on the :obj:`threading`
library. A :obj:`Monitor` object starts a background
:class:`thread<threading.Thread>` which reads out the TPG-256A
:attr:`~Monitor.device` every second. The data can then be
printed to the terminal and/or saved to a PostgreSQL database
using the :obj:`lab_utils.database` library. The monitoring
thread is intended to be self-sustainable and will try to deal
with unexpected errors (usually issues with communication to
the device), recover, log them and keep running.
"""

# Imports
from datetime import datetime
from time import sleep
from threading import Thread, Event
from typing import List

# Third party
from lab_utils.database import Database, DataType, Constraint
from lab_utils.custom_logging import configure_logging, getLogger
from serial.serialutil import SerialException

# Local
from .TPG_256A import TPG_256A, StateError


[docs]class Monitor(Thread): """ Manages a background :class:`thread<threading.Thread>` which logs data from the TPG-256A :attr:`~Monitor.device`.""" # Thread objects device: TPG_256A = None #: :class:`TPG-256 A<.TPG_256A>` handler. db: Database = None #: :class:`Database<lab_utils.database.Database>` object. # Thread flags run_flag: Event = None #: :class:`Flag<threading.Event>` to signal the thread to stop. database_flag: Event = False #: Database usage :class:`flag<threading.Event>`. terminal_flag: Event = False #: Terminal output :class:`flag<threading.Event>`. # Monitor Variables table_name: str = 'pressure' #: Name of the PostgreSQL table where data will be saved. column_list: List[str] = None #: :class:`~typing.List` of data labels to save.
[docs] def __init__(self, device: TPG_256A, name: str = 'Monitor Thread', database_flag: bool = False, database_config_file: str = None, terminal_flag: bool = False, table_name: str = 'pressure'): """ Initializes the :class:`Monitor` object. The constructor checks that the given :paramref:`device` is initialized. If :paramref:`database_flag` is set to `True`, the :meth:`prepare_database` method is called, which initializes the :attr:`database<db>` object and sets up the connection. A table named :paramref:`table_name` is created, as well as the necessary :attr:`columns<column_list>` to store the pressure data. Finally, the method :meth:`run` starts and detaches a background thread which will run indefinitely, reading the TPG-256A :attr:`device`. The data is saved to the :attr:`database` if :paramref:`database_flag` is set to `True`, and it is printed to the terminal if :paramref:`terminal_flag` is set to `True`. Parameters ---------- device: :class:`.TPG_256A` Device handle, must be already initialized and connected. name: str, optional Thread name for logging purposes, default is 'Monitor Thread' database_flag: bool, optional Save data to a PostgreSQL database, default is 'False' terminal_flag: bool, optional Print data to the logging terminal sink with 'info' level, default is 'False' table_name: str, optional Name of the PostgreSQL table where the data is saved, default is 'pressure'. Raises ------ :class:`StateError` The supplied :attr:`device` was not properly initialized. :class:`configparser.Error` Database configuration file error. :class:`psycopg2.DatabaseError` Database error (connection, access...) """ # Assign arguments self.table_name = table_name # Check device is ON and ready self.device = device if not device.connected: raise StateError('Device not connected') # Call the parent class initializer super().__init__(name=name) # Initialize flags self.run_flag = Event() self.database_flag = Event() self.terminal_flag = Event() # Set flags self.run_flag.set() if database_flag: getLogger().info('Database option active') self.database_flag.set() if terminal_flag: getLogger().info('Terminal option active') self.terminal_flag.set() # Initialize database if database_flag: self.prepare_database(database_config_file) # Run self.start()
[docs] def prepare_database(self, database_config_file: str = None, ): """ Ensures the :attr:`database<db>` is ready to accept data from the TPG-256A :attr:`device`. Initializes the :attr:`database<db>` object and sets up the connection. If the table :attr:`table_name` does not exist, it is created, as well as the necessary :attr:`columns<column_list>` to store the pressure data. The labels of the columns are retrieved from device's :attr:`~.TPG_256A.channel_info`. Parameters ---------- database_config_file: str, optional The configuration file of the database Raises ------ :class:`configparser.Error` Error reading configuration file. :class:`psycopg2.Error` Base exception for all kinds of database errors. """ getLogger().info('Setting up database') self.db = Database(config_file=database_config_file) self.db.connect(print_version=True) # Check table exists, create otherwise if not self.db.check_table(self.table_name): self.db.create_timescale_db(self.table_name) if not self.db.check_table(self.table_name): raise RuntimeError('could not create TimescaleDB object \'%s\'', self.table_name) getLogger().debug('Table \'%s\' ready', self.table_name) # Create column list self.column_list = [] for ch in self.device.channel_info: if ch.logging: self.column_list.append(ch.label) # Check columns exist, create otherwise getLogger().debug('Creating columns: %s', '; '.join(self.column_list)) for label in self.column_list: # Raises an error if the column could not be created self.db.new_column( table_name=self.table_name, column_name=label, data_type=DataType.float, constraints=[Constraint.positive_strict], ) getLogger().debug('Column \'%s\' ready', label)
[docs] def stop(self) -> bool: """ Clears the :attr:`run_flag` to signal the background thread to stop. The thread status is then checked every 0.1 second (up to 5 seconds). Returns `True` if the thread stopped, `False` otherwise. Returns ------- bool: `True` if the thread is not running within 5 seconds, `False` otherwise. """ getLogger().info('Sending stop signal to the logging thread') self.run_flag.clear() # Check thread status every 0.1 seconds, for 5 seconds for _ in range(50): if not self.is_alive(): getLogger().info('Monitor thread successfully stopped') return True sleep(0.1) # Thread should have stopped by now, something went wrong getLogger().error('Monitor thread did not finish within a reasonable time') return False
[docs] def run(self) -> None: """ Monitoring method start upon object creation. The TPG-256A :attr:`device` is read every second in an endless loop. The pressure data may be saved to a PostgreSQL :attr:`database<db>` and/or printed to the terminal, if the respective :attr:`terminal_flag` and :attr:`database_flag` flags were set. In case of unexpected error (which happens often with the RS-232 communication protocol), the method will try to recover, log any information and continue operations. To stop logging and break the loop, the :meth:`stop` method should be used to set the :attr:`run_flag` flag. """ # Let the server reply to the client to produce a cleaner log sleep(0.1) getLogger().info('Starting monitor') # Wait until the next turn of the second seconds = datetime.now().second while datetime.now().second == seconds: sleep(0.001) seconds = datetime.now().second # Endless loop, use the stop() method to break it while self.run_flag.is_set(): try: # Read pressure data from the device self.device.pressure_gauges() # Save to database self.save_to_database() # Print to terminal and/or file self.print_string() except (SerialException, StateError, IOError) as e: getLogger().error("{}: {}".format(type(e).__name__, e)) for i in range(5): sleep(2) getLogger().error('Attempting to reset the device (%d out of 5)', i) try: if self.device.connected: self.device.disconnect() self.device.connect() except (SerialException, StateError, IOError) as e: getLogger().error("{}: {}".format(type(e).__name__, e)) else: # Leave the reconnection "for" loop and return to the main "while" loop break # The reconnection attempts have failed, terminate the thread and notify the alarm manager getLogger().error('Terminating TPG 256A Pressure Monitor!') raise SystemExit( 'Could not re-establish connection to the device after 5 attempts, terminating thread now...' ) # Wait until the next turn of the second while datetime.now().second == seconds: sleep(0.001) seconds = datetime.now().second # We reach here when 'run_flag' has been cleared by the stop() method getLogger().info('Stop signal! Quitting logging thread')
[docs] def print_string(self): """ Prints the retrieved data to the terminal. The log level will be `INFO` if :attr:`terminal_flag` is set, and `DEBUG` otherwise.""" # Build message string msg = '' for ch in self.device.channel_info: if ch.logging: if ch.status_code is None or int(ch.status_code) > 2: msg += '{:20}'.format('{:7}: Invalid'.format(ch.label)) else: msg += '{:20}'.format('{:7}: {}'.format(ch.label, ch.data)) # Print to terminal if self.terminal_flag.is_set(): getLogger().info(msg) else: getLogger().debug(msg)
[docs] def save_to_database(self): """ Saves the latest pressure data to the PostgreSQL :attr:`database<db>`. If :attr:`database_flag` is not set, the method does nothing. Raises ------ :class:`psycopg2.Error` Base exception for all kinds of database errors. """ if not self.database_flag.is_set(): return if self.db is None: raise StateError('database not initialized') data = [] for ch in self.device.channel_info: if ch.logging: if ch.status_code < 3: data.append(ch.data) else: data.append('NaN') self.db.new_entry( table_name=self.table_name, columns=self.column_list, data=data, check_columns=False, )