import time
import serial
# Code translations constants
MEASUREMENT_STATUS = {
0: 'Measurement data okay',
1: 'Underrange',
2: 'Overrange',
3: 'Sensor error',
4: 'Sensor off (IKR, PKR, IMR, PBR)',
5: 'No sensor (output: 5,2.0000E-2 [mbar])',
6: 'Identification error'
}
GAUGE_IDS = {
'TPR': 'Pirani Gauge or Pirani Capacitive gauge',
'IKR9': 'Cold Cathode Gauge 10E-9 ',
'IKR11': 'Cold Cathode Gauge 10E-11 ',
'PKR': 'FullRange CC Gauge',
'PBR': 'FullRange BA Gauge',
'IMR': 'Pirani / High Pressure Gauge',
'CMR': 'Linear gauge',
'noSEn': 'no SEnsor',
'noid': 'no identifier'
}
PRESSURE_UNITS = {0: 'mbar/bar', 1: 'Torr', 2: 'Pascal'}
[docs]
class TPG26x(object):
"""
Abstract class that implements the common driver for the TPG 261 and
TPG 262 dual channel measurement and control unit.
"""
ETX = chr(3) # \x03
CR = chr(13)
LF = chr(10)
ENQ = chr(5) # \x05
ACK = chr(6) # \x06
NAK = chr(21) # \x15
def __init__(self, port='/dev/ttyUSB0', baudrate=9600):
"""
Initialize the TPG26x driver.
Args:
port (str or int): The COM port to open.
baudrate (int): Data transmission rate.
"""
self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1)
def _cr_lf(self, string):
"""
Pad carriage return and line feed to a string.
Args:
string (str): String to pad.
Returns:
str: Padded string.
"""
return string + self.CR + self.LF
def _send_command(self, command):
"""
Send a command and check if it is positively acknowledged.
Args:
command (str): The command to be sent.
Raises:
IOError: If the negative acknowledged or an unknown response is returned.
"""
self.serial.write(self._cr_lf(command).encode())
response = self.serial.readline().decode()
if response == self._cr_lf(self.NAK):
raise IOError('Serial communication returned negative acknowledge')
elif response != self._cr_lf(self.ACK):
raise IOError(f'Serial communication returned unknown response: {response!r}')
def _get_data(self):
"""
Get the data that is ready on the device.
Returns:
str: Raw data from serial communication line.
"""
self.serial.write(self.ENQ.encode())
data = self.serial.readline().decode()
return data.rstrip(self.LF).rstrip(self.CR)
def _clear_output_buffer(self):
"""
Clear the output buffer.
"""
time.sleep(0.1)
just_read = 'start value'
out = ''
while just_read != '':
just_read = self.serial.read()
out += just_read
return out
[docs]
def program_number(self):
"""
Return the firmware version.
Returns:
str: The firmware version.
"""
self._send_command('PNR')
return self._get_data()
[docs]
def pressure_gauge(self, gauge=1):
"""
Return the pressure measured by gauge X.
Args:
gauge (int): The gauge number, 1 or 2.
Returns:
tuple: The value of pressure along with status code and message.
"""
if gauge not in [1, 2]:
raise ValueError("The input gauge number can only be 1 or 2")
self._send_command(f'PR{gauge}')
reply = self._get_data()
status_code = int(reply.split(',')[0])
value = float(reply.split(',')[1])
return value, (status_code, MEASUREMENT_STATUS[status_code])
[docs]
def pressure_gauges(self):
"""
Return the pressures measured by the gauges.
Returns:
tuple: The values of both gauges along with their status codes and messages.
"""
self._send_command('PRX')
reply = self._get_data()
status_code1 = int(reply.split(',')[0])
value1 = float(reply.split(',')[1])
status_code2 = int(reply.split(',')[2])
value2 = float(reply.split(',')[3])
return (value1, (status_code1, MEASUREMENT_STATUS[status_code1]),
value2, (status_code2, MEASUREMENT_STATUS[status_code2]))
[docs]
def gauge_identification(self):
"""
Return the gauge identification.
Returns:
tuple: Identification codes and messages for both gauges.
"""
self._send_command('TID')
reply = self._get_data()
id1, id2 = reply.split(',')
return id1, GAUGE_IDS[id1], id2, GAUGE_IDS[id2]
[docs]
def pressure_unit(self):
"""
Return the pressure unit.
Returns:
str: The pressure unit.
"""
self._send_command('UNI')
unit_code = int(self._get_data())
return PRESSURE_UNITS[unit_code]
[docs]
def rs232_communication_test(self):
"""
Test the RS232 communication.
Returns:
bool: The status of the communication test.
"""
self._send_command('RST')
self.serial.write(self.ENQ)
self._clear_output_buffer()
test_string_out = ''
for char in 'a1':
self.serial.write(char)
test_string_out += self._get_data().rstrip(self.ENQ)
self._send_command(self.ETX)
return test_string_out == 'a1'
[docs]
class TPG362(TPG26x):
"""
Driver for the TPG 261 dual channel measurement and control unit.
Inherits from TPG26x.
"""
def __init__(self, port='/dev/ttyUSB0', baudrate=9600):
"""
Initialize the TPG362 driver.
Args:
port (str or int): The COM port to open.
baudrate (int): Data transmission rate.
"""
super(TPG362, self).__init__(port=port, baudrate=baudrate)