""" Driver for the Pfeiffer TPG 256A. The device is
a six-channel pressure readout and monitor controller.
The :class:`TPG_256A` main class manages the interface
to the device and implements some of the available
operations through RS-232 communication. The driver
implements an auxiliary :class:`Channel` class to hold
information about the available gauges. A custom
exception :class:`StateError` is used for internal
error management.
The driver has been adapted to Python3 from the
:obj:`PyExpLabSys<PyExpLabSys.drivers.pfeiffer>`
library. More information is also available in the
:download:`device documentation <../Pfeiffer_MultiGauge256A_OpInstructions.pdf>`
"""
import time
from serial import Serial
import logging
from typing import List, Tuple
import configparser
# Code translations constants
MEASUREMENT_STATUS = {
0: 'Measurement data okay',
1: 'Underrange',
2: 'Overrange',
3: 'Sensor error',
4: 'Sensor off (IKR, PKR, IMR, PBR)',
5: 'No sensor (output: 5,2.0000E-2 [mbar])',
6: 'Identification error'
}
GAUGE_IDS = {
'TPR': 'Pirani Gauge or Pirani Capacitive gauge',
'IKR9': 'Cold Cathode Gauge 10E-9 ',
'IKR11': 'Cold Cathode Gauge 10E-11 ',
'PKR': 'FullRange CC Gauge',
'PBR': 'FullRange BA Gauge',
'IMR': 'Pirani / High Pressure Gauge',
'CMR': 'Linear gauge',
'noSEn': 'No Sensor',
'no Sensor': 'No Sensor',
'noid': 'No identifier'
}
[docs]class StateError(BaseException):
""" Mock-up exception to deal with unexpected device status.
It is used to signal for instance that the device should be
connected but it is not at a certain execution point.
"""
pass
[docs]class Channel:
""" Simple container to hold channel information. The setup
is read from the :attr:`configuration file<TPG_256A.config_file>`
(e.g. :attr:`label`) or is retrieved from the controller
directly (e.g. :attr:`gauge_id`).
The following gauge types are supported:
========= ==========================================
ID Description
========= ==========================================
TPR Pirani Gauge or Pirani Capacitive gauge
IKR9 Cold Cathode Gauge 10E-9
IKR11 Cold Cathode Gauge 10E-11
PKR FullRange CC Gauge
PBR FullRange BA Gauge
IMR Pirani / High Pressure Gauge
CMR Linear gauge
noSEn No Sensor
no Sensor No Sensor
noid No identifier
========= ==========================================
The measurement status can take the following values:
========= ==========================================
Code Status
========= ==========================================
0 Measurement data okay
1 Underrange
2 Overrange
3 Sensor error
4 Sensor off (IKR, PKR, IMR, PBR)
5 No sensor (output: 5,2.0000E-2 [mbar])
6 Identification error
========= ==========================================
"""
gauge_number: int = None #: The channel ID number.
connected: bool = False #: The gauge was detected by the controller.
active: bool = False #: The gauge should be ON.
logging: bool = False #: Data from the gauge should be recorded.
label: str = '' #: Label of the gauge, to be used when logging to a database.
gauge_id: str = None #: Gauge type, retrieved from the controller.
data: float = None #: Latest pressure readout value.
status_code: int = None #: Measurement status code.
status_str: str = None #: Measurement status description.
[docs] def __init__(self, gauge_number: int = None):
self.gauge_number = gauge_number
[docs]class TPG_256A(object): # noqa (ignore CamelCase convention)
""" Driver implementation for the Pfeiffer TPG-256A. The device
is a six-channel pressure readout and monitor controller. The
driver has been adapted to Python3 from the
:obj:`PyExpLabSys<PyExpLabSys.drivers.pfeiffer>` library, and
implements the following commands (see the
:download:`device documentation <../../Pfeiffer_MultiGauge256A_OpInstructions.pdf>`
for more information):
+-------------------+-----------------------------------------------------------+
| Mnemonic | Description |
+===================+===========================================+===============+
| PNR | Program number (firmware version) |
+-------------------+-----------------------------------------------------------+
| PR[1 ... 6] | Pressure measurement (measurement data) gauge [1 ... 6] |
+-------------------+-----------------------------------------------------------+
| TID | Transmitter identification (gauge identification) |
+-------------------+-----------------------------------------------------------+
| SEN,0,0,0,0,0,0 | Gauge status |
+-------------------+-----------------------------------------------------------+
"""
# Attributes
ETX = chr(3) #: End text (Ctrl-c), chr(3), \\x03
CR = chr(13) #: Carriage return, chr(13), \\r
LF = chr(10) #: Line feed, chr(10), \\n
ENQ = chr(5) #: Enquiry, chr(5), \\x05
ACK = chr(6) #: Acknowledge, chr(6), \\x06
NAK = chr(21) #: Negative acknowledge, chr(21), \\x15
# Serial port configuration
serial: Serial = None #: Serial port handler.
baud_rate: int = 9600 #: Baud rate for serial communication.
serial_port: str = '/dev/PfeifferTPG256A' #: Physical address of the device file.
timeout: float = 1.0 #: Time-out for serial connection error.
# Device setup
config_file: str = 'conf/tpg_256a.ini' #: Device configuration file
channel_info: List[Channel] = [] #: Channel information, loaded from the configuration file.
# Others
connected: bool = False #: Status flag.
logger: logging.Logger = None #: Single logger for the whole object
[docs] def __init__(self,
serial_port: str = None,
baud_rate: int = None,
connect: bool = False,
timeout: float = None,
config_file: str = None,
):
""" Initializes the :class:`TPG_256A` object. It calls
the :meth:`config` method to set up the device if a
:paramref:`~TPG_256A.__init__.config_file` is given. If
the :paramref:`~TPG_256A.__init__.connect` flag is set
to `True`, attempts the connection to the device.
Parameters
----------
serial_port : str, optional
Physical address of the device file, default is 'None'
timeout : float, optional
Serial communication time out, default is 'None'
baud_rate: int, optional
Baud rate for serial communication, default is 'None'
connect: bool, optional
If set, attempt connection to the device, default is `False`
config_file : str, optional
Configuration file, default is 'None'.
Raises
------
:class:`configparser.Error`
Configuration file error
:class:`~serial.SerialException`
The connection to the device has failed
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
# Initialize variables
self.logger = logging.getLogger('TPG-256A')
self.connected = False
for ch in range(6):
self.channel_info.append(Channel(gauge_number=ch+1))
# Load config file, if given
if config_file is not None:
self.config(config_file)
# Assign attributes, if given
# They override they configuration file
if baud_rate is not None:
self.baud_rate = baud_rate
if serial_port is not None:
self.serial_port = serial_port
if timeout is not None:
self.timeout = timeout
# Connect to the device
if connect:
self.connect()
[docs] def config(self, new_config_file: str = None):
""" Loads the TPG-256A configuration from a file. If
:paramref:`~TPG_256A.config.new_config_file` is not
given, the latest :attr:`config_file` is re-loaded;
if it is given and the file is successfully parsed,
:attr:`config_file` is updated to the new value.
Parameters
----------
new_config_file : str, optional
New configuration file to be loaded.
Raises
------
:class:`configparser.Error`
Configuration file error
"""
# Update configuration file, if given
if new_config_file is None:
new_config_file = self.config_file
# Initialize config parser and read file
self.logger.info("Loading configuration file %s", new_config_file)
config_parser = configparser.ConfigParser()
config_parser.read(new_config_file)
# Load serial port configuration
self.serial_port = config_parser.get(section='Connection', option='device')
self.baud_rate = config_parser.getint(section='Connection', option='baud_rate')
self.timeout = config_parser.getfloat(section='Connection', option='timeout')
# Load channel information
for ch in range(6):
sec_name = 'Sensor_{}'.format(ch+1)
act = False
log = False
lab = None
if config_parser.has_section(sec_name):
act = config_parser.getboolean(sec_name, 'active')
log = config_parser.getboolean(sec_name, 'logging')
lab = config_parser.get(sec_name, 'label')
if log and not act:
self.logger.warning('Sensor %d (%s) set to logging, but not active. Monitor disabled.', ch+1, lab)
log = False
self.logger.debug('Found sensor %d: %s, %s, %s', ch+1, str(act), str(log), lab)
else:
self.logger.debug('%s not found', sec_name)
self.channel_info[ch].active = act
self.channel_info[ch].logging = log
self.channel_info[ch].label = lab
# If everything worked, update local config_file for future calls
self.config_file = new_config_file
[docs] def connect(self):
""" Connects to the TPG-256A Controller. The methods
:meth:`gauge_identification` and :meth:`gauge_status`
are called to retrieve hardware information from the
device.
Raises
------
:class:`~serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
if self.connected:
raise StateError('device was already ON')
self.logger.info('Connecting to TPG-256A Controller on port %s', self.serial_port)
self.serial = Serial(
port=self.serial_port,
baudrate=self.baud_rate,
timeout=self.timeout,
)
self.connected = True
self.gauge_identification()
self.gauge_status()
self.logger.info('Connection successful')
[docs] def disconnect(self):
""" Closes the connection to the TPG-256A Controller.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
# Check the device is connected
if not self.connected:
self.logger.warning('Device is not ON')
raise StateError('Device is not ON')
self.logger.info('Closing connection to TPG-256A Controller on port %s', self.serial_port)
self.connected = False
self.serial.close()
self.logger.info('Connection closed')
[docs] def gauge_status(self):
""" Reads the gauges status. Checks that gauges
marked as :attr:`~Channel.active` in
:attr:`channel_info` are available; sets them
to inactive otherwise.
Raises
------
:class:`StateError`
Device was in the wrong state.
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
"""
# Check device is ON
if not self.connected:
self.logger.warning('Device is not ON')
raise StateError('Device is not ON')
# Check gauge status
msg = ','.join('0' for _ in range(6))
self.logger.debug('Checking gauge status')
self._send_command('SEN,{}'.format(msg))
reply = self._get_data()
self.logger.debug('Reply from the device: %s', reply)
status = reply.split(',')
for (ch, st) in zip(self.channel_info, status):
if ch.active and st != '2':
self.logger.warning('Channel %s set to active but it is OFF, deactivating', ch.label)
ch.active = False
ch.logging = False
[docs] def program_number(self) -> str:
""" Returns the firmware version.
Returns
-------
str:
The firmware version.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
"""
self._send_command('PNR')
return self._get_data()
[docs] def pressure_gauge(self, gauge_nr) -> Tuple[float, int]:
"""Reads the pressure measured by gauge number
:paramref:`~TPG_256A.pressure_gauge.gauge`.
Arguments
---------
gauge_nr: int
The gauge number, 1 to 6
Returns
-------
[float, int]
(value, status code)
Raises
------
:class:`StateError`
Device was in the wrong state.
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`ValueError`
Invalid :paramref:`gauge_nr`, must be between 1 and 6.
"""
# Check device is ON
if not self.connected:
self.logger.warning('Device is not ON')
raise StateError('Device is not ON')
# Check gauge number
if gauge_nr-1 not in range(6):
message = 'The input gauge number must be between 1 and 6'
raise ValueError(message)
# Perform request
self._send_command('PR' + str(gauge_nr))
reply = self._get_data()
# Save data
status_code = int(reply.split(',')[0])
data = float(reply.split(',')[1])
return data, status_code
[docs] def pressure_gauges(self):
"""Reads the pressure measured by all active gauges.
Saves the data into the :attr:`channel_info` list.
Raises
------
:class:`StateError`
Device was in the wrong state.
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
"""
# Check device is ON
if not self.connected:
self.logger.warning('Device is not ON')
raise StateError('Device is not ON')
self.logger.debug('Pressure readout:')
for ch in self.channel_info:
if ch.active:
# Single readout
value, code = self.pressure_gauge(ch.gauge_number)
ch.data = value
ch.status_code = code
ch.status_str = MEASUREMENT_STATUS[code]
self.logger.debug('{:4}{:10}{:8} {}'.format(
' ',
ch.label,
ch.data,
ch.status_str
))
[docs] def gauge_identification(self):
"""Reads the gauges identification. Saves the
information in :attr:`channel_info`. Checks that
gauges marked as :attr:`~Channel.active` in
:attr:`channel_info` are available; sets them
to inactive otherwise and disables logging.
Raises
------
:class:`StateError`
Device was in the wrong state.
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
"""
# Check device is ON
if not self.connected:
self.logger.warning('Device is not ON')
raise StateError('Device is not ON')
self.logger.info('Retrieving Gauge Identification')
self._send_command('TID')
reply = self._get_data()
self.logger.debug('Gauge Identification String: %s', reply)
id_list = reply.split(',')
for (ch, id_code) in zip(self.channel_info, id_list):
ch.gauge_id = GAUGE_IDS[id_code]
ch.connected = ch.gauge_id != 'No Sensor'
if ch.active and not ch.connected:
self.logger.warning('Sensor %s is set to active, but no sensor was found', ch.label)
ch.active = False
ch.logging = False
def _cr_lf(self, string: str) -> str:
""" Pads :attr:`carriage return<TPG_256A.CR>`
and :attr:`line feed<TPG_256A.LF>` to a
given :paramref:`string`.
Parameters
----------
string : str
String to pad
Returns
-------
str:
The padded string.
"""
return string + self.CR + self.LF
def _send_command(self, command: str):
"""Sends a command and checks if it
is positively acknowledged.
Parameters
----------
command : str, optional
The command to send.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
"""
self.serial.write(str.encode(self._cr_lf(command)))
response = self.serial.readline().decode()
if response == self._cr_lf(self.NAK):
message = 'Serial communication returned negative acknowledge'
self.logger.error(message)
raise IOError(message)
elif response != self._cr_lf(self.ACK):
message = 'Serial communication returned unknown response:\n{}'\
''.format(repr(response))
self.logger.error(message)
raise IOError(message)
def _get_data(self) -> str:
"""Gets the data that is ready on the device.
Returns
-------
str:
The raw data
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
"""
self.serial.write(str.encode(self.ENQ))
data = self.serial.readline().decode()
return data.rstrip(self.LF).rstrip(self.CR)
def _clear_output_buffer(self) -> str:
""" Clears the output buffer.
Returns
-------
str:
The data that was in the buffer.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
"""
time.sleep(0.1)
just_read = 'start value'
out = ''
while just_read != '':
just_read = self.serial.read()
out += just_read
return out