""" Background monitoring thread based on the :obj:`threading`
library. A :obj:`Monitor` object starts a background
:class:`thread<threading.Thread>` which reads out the TPG-256A
:attr:`~Monitor.device` every second. The data can then be
printed to the terminal and/or saved to a PostgreSQL database
using the :obj:`lab_utils.database` library. The monitoring
thread is intended to be self-sustainable and will try to deal
with unexpected errors (usually issues with communication to
the device), recover, log them and keep running.
"""
# Imports
from datetime import datetime
from time import sleep
from threading import Thread, Event
import logging
from pkg_resources import resource_filename
from typing import List
# Third party
from psycopg2 import DatabaseError
from lab_utils.database import Database, DataType, Constraint
from serial.serialutil import SerialException
# Local
from .TPG_256A import TPG_256A, StateError
[docs]class Monitor(Thread):
""" Manages a background
:class:`thread<threading.Thread>`
which logs data from the TPG-256A
:attr:`~Monitor.device`."""
# Thread objects
device: TPG_256A = None #: :class:`TPG-256 A<.TPG_256A>` handler.
db: Database = None #: :class:`Database<lab_utils.database.Database>` object.
logger: logging.Logger = None #: Single :class:`logger<logging.Logger>` for the whole class.
# Thread flags
run_flag: Event = None #: :class:`Flag<threading.Event>` to signal the thread to stop.
database_flag: Event = False #: Database usage :class:`flag<threading.Event>`.
terminal_flag: Event = False #: Terminal output :class:`flag<threading.Event>`.
# Monitor Variables
table_name: str = 'pressure' #: Name of the PostgreSQL table where data will be saved.
column_list: List[str] = None #: :class:`~typing.List` of data labels to save.
[docs] def __init__(self,
device: TPG_256A,
name: str = 'Monitor Thread',
database_flag: bool = False,
terminal_flag: bool = False,
table_name: str = 'pressure'):
""" Initializes the :class:`Monitor` object. The
constructor checks that the given :paramref:`device`
is initialized. If :paramref:`database_flag` is set
to `True`, the :meth:`prepare_database` method is
called, which initializes the :attr:`database<db>`
object and sets up the connection. A table named
:paramref:`table_name` is created, as well as the
necessary :attr:`columns<column_list>` to store the
pressure data.
Finally, the method :meth:`run` starts and detaches
a background thread which will run indefinitely,
reading the TPG-256A :attr:`device`. The data is
saved to the :attr:`database` if :paramref:`database_flag`
is set to `True`, and it is printed to the terminal if
:paramref:`terminal_flag` is set to `True`.
Parameters
----------
device: :class:`.TPG_256A`
Device handle, must be already initialized and connected.
name: str, optional
Thread name for logging purposes, default is 'Monitor Thread'
database_flag: bool, optional
Save data to a PostgreSQL database, default is 'False'
terminal_flag: bool, optional
Print data to the logging terminal sink with 'info'
level, default is 'False'
table_name: str, optional
Name of the PostgreSQL table where the data is saved, default is 'pressure'.
Raises
------
:class:`StateError`
The supplied :attr:`device` was not properly initialized.
:class:`configparser.Error`
Database configuration file error.
:class:`psycopg2.DatabaseError`
Database connection error
"""
# Assign arguments
self.table_name = table_name
# Check device is ON and ready
self.device = device
if not device.connected:
raise StateError('Device not connected')
# Initialize logger
self.logger = logging.getLogger(name)
# Call the parent class initializer
super().__init__(name=name)
# Initialize flags
self.run_flag = Event()
self.database_flag = Event()
self.terminal_flag = Event()
# Set flags
self.run_flag.set()
if database_flag:
self.logger.info('Database option active')
self.database_flag.set()
if terminal_flag:
self.logger.info('Terminal option active')
self.terminal_flag.set()
# Initialize database
if database_flag:
self.prepare_database()
# Run
self.start()
[docs] def prepare_database(self):
""" Ensures the :attr:`database<db>` is ready to accept
data from the TPG-256A :attr:`device`. Initializes
the :attr:`database<db>` object and sets up the
connection. If the table :attr:`table_name` does not
exist, it is created, as well as the necessary
:attr:`columns<column_list>` to store the pressure
data. The labels of the columns are retrieved from
device's :attr:`~.TPG_256A.channel_info`."""
self.logger.info('Setting up database')
self.db = Database(config_file=resource_filename(__name__, 'conf/database.ini'))
self.db.connect(print_version=True)
# Check table exists, create otherwise
if not self.db.check_table(self.table_name):
self.db.create_timescale_db(self.table_name)
if not self.db.check_table(self.table_name):
raise RuntimeError('could not create TimescaleDB object \'%s\'', self.table_name)
self.logger.debug('Table \'%s\' ready', self.table_name)
# Create column list
self.column_list = []
for ch in self.device.channel_info:
if ch.logging:
self.column_list.append(ch.label)
# Check columns exist, create otherwise
for label in self.column_list:
if not self.db.new_column(
table_name=self.table_name,
column_name=label,
data_type=DataType.float,
constraints=[Constraint.positive_strict],
):
raise DatabaseError('could not create column \'%s\'', label)
self.logger.debug('Column \'%s\' ready', label)
[docs] def stop(self) -> bool:
""" Clears the :attr:`run_flag` to signal the
background thread to stop. The thread status is
then checked every 0.1 second (up to 5 seconds).
Returns `True` if the thread stopped, `False`
otherwise.
Returns
-------
bool:
`True` if the thread is not running within
5 seconds, `False` otherwise.
"""
self.logger.info('Sending stop signal to the logging thread')
self.run_flag.clear()
# Check thread status every 0.1 seconds, for 5 seconds
for _ in range(50):
if not self.is_alive():
self.logger.info('Monitor thread successfully stopped')
return True
sleep(0.1)
# Thread should have stopped by now, something went wrong
self.logger.error('Monitor thread did not finish within a reasonable time')
return False
[docs] def run(self) -> None:
""" Monitoring method start upon object creation.
The TPG-256A :attr:`device` is read every second
in an endless loop. The pressure data may be saved
to a PostgreSQL :attr:`database<db>` and/or printed
to the terminal, if the respective :attr:`terminal_flag`
and :attr:`database_flag` flags were set.
In case of unexpected error (which happens often with
the RS-232 communication protocol), the method will
try to recover, log any information and continue operations.
TODO: send a slack message if recovery is not possible
To stop logging and break the loop, the :meth:`stop`
method should be used to set the :attr:`run_flag`
flag.
"""
# Let the server reply to the client to produce a cleaner log
sleep(0.1)
self.logger.info('Starting monitor')
# Wait until the next turn of the second
seconds = datetime.now().second
while datetime.now().second == seconds:
sleep(0.001)
seconds = datetime.now().second
# Endless loop, use the stop() method to break it
while self.run_flag.is_set():
try:
# Read pressure data from the device
self.device.pressure_gauges()
# Print to terminal and/or file
self.print_string()
# Save to database
self.save_to_database()
except (SerialException, StateError, IOError) as e:
self.logger.error("{}: {}".format(type(e).__name__, e))
self.logger.error('Attempting to reset the device')
try:
if self.device.connected:
self.device.disconnect()
self.device.connect()
except (SerialException, StateError, IOError) as e:
self.logger.exception("{}: {}".format(type(e).__name__, e))
raise SystemExit('Could not re-establish connection to the device, terminating thread now...')
# Wait until the next turn of the second
while datetime.now().second == seconds:
sleep(0.01)
seconds = datetime.now().second
# We reach here when 'run_flag' has been cleared by the stop() method
self.logger.info('Stop signal! Quitting logging thread')
[docs] def print_string(self):
# Build message string
msg = ''
for ch in self.device.channel_info:
if ch.logging:
if ch.status_code is None or int(ch.status_code) > 2:
msg += '{:20}'.format('{:7}: Invalid'.format(ch.label))
else:
msg += '{:20}'.format('{:7}: {}'.format(ch.label, ch.data))
# Print to terminal
if self.terminal_flag.is_set():
self.logger.info(msg)
else:
self.logger.debug(msg)
# TODO: Print to file
pass
[docs] def save_to_database(self):
""" Saves the latest pressure data to the
PostgreSQL :attr:`database<db>`. """
if not self.database_flag.is_set():
return
if self.db is None:
raise StateError('database not initialized')
data = []
for ch in self.device.channel_info:
if ch.logging:
if ch.status_code < 3:
data.append(ch.data)
else:
# TODO: implement a 'Nan' option in the lab_utils database module
# For now a physically meaningless value is saved
data.append(2000.)
self.db.new_entry(
table_name=self.table_name,
columns=self.column_list,
data=data,
check_columns=False,
)