import csv
import os
import time
from datetime import datetime
import serial.tools.list_ports
from pyccapt.control.core import runtime
from pyccapt.control.devices.edwards_tic import EdwardsAGC
from pyccapt.control.devices.pfeiffer_gauges import TPG362
[docs]
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def _available_serial_ports_text() -> str:
ports = sorted(port.device for port in serial.tools.list_ports.comports() if getattr(port, "device", ""))
return ", ".join(ports) if ports else "none detected"
def _format_port_error(device_label, port, exc) -> str:
return f"{device_label} unavailable on {port}: {exc}. Available serial ports: {_available_serial_ports_text()}"
def _serial_port_present(port):
if not port:
return False
try:
ports = {getattr(p, "device", "") for p in serial.tools.list_ports.comports()}
except Exception:
return True
return port in ports
def _open_cryovac_serial(port):
"""Open the Cryovac serial port with a few retries for transient enumeration glitches.
On Windows the USB-to-serial bridge for the TIC 500 sometimes briefly
drops out of the COM port table during boot — opening it with no retry
fails with OSError 22 ("A device which does not exist was specified")
even though the device itself is fine and shows up a moment later.
"""
last_exc = None
for attempt in range(3):
if _serial_port_present(port) or attempt > 0:
try:
return serial.Serial(
port=port,
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.5,
write_timeout=1.0,
)
except Exception as e:
last_exc = e
time.sleep(0.6)
if last_exc is None:
last_exc = OSError(f"port '{port}' not in serial enumeration")
raise last_exc
[docs]
def command_cryovac(cmd, com_port_cryovac):
"""
Execute a command on Cryovac through serial communication.
Vendor / origin
---------------
The CLI command set used here (``getOutput``, ``getOutputNames``,
``Out1Cryo.PID.Setpoint``, ``Out2LL.PID.Setpoint``, ``e_mlp``-style
queries, etc.) is defined and owned by **CryoVac GmbH** for their
TIC 500 programmable temperature controller. This function and its
siblings (``initialize_cryovac`` etc.) are thin wrappers around
that ASCII serial protocol; the protocol itself is CryoVac's
property. Refer to the CryoVac TIC 500 manual (e.g. document
5984591) for the authoritative reference.
Args:
cmd: Command to be executed.
com_port_cryovac: Serial communication object.
Returns:
Response code after executing the command.
"""
com_port_cryovac.write((cmd + '\r\n').encode())
time.sleep(0.1)
response = ''
while com_port_cryovac.in_waiting > 0:
response = com_port_cryovac.readline()
if isinstance(response, bytes):
response = response.decode("utf-8")
return response
[docs]
def command_edwards(conf, variables, cmd, E_AGC, status=None):
"""
Execute commands and set flags based on parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
cmd: Command to be executed.
E_AGC: EdwardsAGC instance.
status: Status of the lock.
Returns:
Response code after executing the command.
"""
response = -1
try:
if variables.flag_pump_load_lock_click and variables.flag_pump_load_lock and status == 'load_lock':
if conf['pump_ll'] == "on":
E_AGC.comm('!C910 0')
E_AGC.comm('!C904 0')
variables.flag_pump_load_lock_click = False
variables.flag_pump_load_lock = False
variables.flag_pump_load_lock_led = False
time.sleep(1)
elif variables.flag_pump_load_lock_click and not variables.flag_pump_load_lock and status == 'load_lock':
if conf['pump_ll'] == "on":
E_AGC.comm('!C910 1')
E_AGC.comm('!C904 1')
variables.flag_pump_load_lock_click = False
variables.flag_pump_load_lock = True
variables.flag_pump_load_lock_led = True
time.sleep(1)
if variables.flag_pump_cryo_load_lock_click and variables.flag_pump_cryo_load_lock and status == 'cryo_load_lock':
if conf['pump_cll'] == "on":
E_AGC.comm('!C910 0')
E_AGC.comm('!C904 0')
variables.flag_pump_cryo_load_lock_click = False
variables.flag_pump_cryo_load_lock = False
variables.flag_pump_cryo_load_lock_led = False
time.sleep(1)
elif (
variables.flag_pump_cryo_load_lock_click and not variables.flag_pump_cryo_load_lock and status == 'cryo_load_lock'
):
if conf['pump_cll'] == "on":
E_AGC.comm('!C910 1')
E_AGC.comm('!C904 1')
variables.flag_pump_cryo_load_lock_click = False
variables.flag_pump_cryo_load_lock = True
variables.flag_pump_cryo_load_lock_led = True
time.sleep(1)
if conf['COM_PORT_gauge_ll'] != "off" or conf['COM_PORT_gauge_cll'] != "off":
if cmd == 'pressure':
response_tmp = E_AGC.comm('?V911')
response_tmp = float(response_tmp.replace(';', ' ').split()[1])
if response_tmp < 90 and status == 'load_lock':
variables.flag_pump_load_lock_led = False
elif response_tmp >= 90 and status == 'load_lock':
variables.flag_pump_load_lock_led = True
if response_tmp < 90 and status == 'cryo_load_lock':
variables.flag_pump_cryo_load_lock_led = False
elif response_tmp >= 90 and status == 'cryo_load_lock':
variables.flag_pump_cryo_load_lock_led = True
response = E_AGC.comm('?V940')
else:
print('Unknown command for Edwards TIC Load Lock')
except Exception:
response = -1 # Set response to -1 indicate an error
return response
[docs]
def initialize_cryovac(com_port_cryovac, variables):
"""
Initialize the communication port of Cryovac.
Args:
com_port_cryovac: Serial communication object.
variables: Variables instance.
Returns:
None
"""
# The TIC 500 firmware can fault on screen with "A system crash occurred /
# Diagnostic code: bpt0" if the first command we send arrives concatenated
# with stale bytes left in its macro parser (e.g. a partial line from a
# previous session, or line transitions caused by the host opening the
# port). Drain both directions, send a bare line terminator so any
# in-flight half-line is flushed as an empty macro, then drain again
# before issuing the real query.
time.sleep(0.2)
try:
com_port_cryovac.reset_input_buffer()
com_port_cryovac.reset_output_buffer()
com_port_cryovac.write(b'\r\n')
time.sleep(0.15)
com_port_cryovac.reset_input_buffer()
except Exception:
pass
output = ''
for _ in range(5):
output = command_cryovac('getOutput?', com_port_cryovac)
if output and output.split():
break
time.sleep(0.2)
try:
variables.temperature = float(output.split()[0].replace(',', ''))
except (ValueError, IndexError):
variables.temperature = -1
[docs]
def initialize_edwards_tic_load_lock(conf, variables):
"""
Initialize TIC load lock parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
Returns:
None
"""
E_AGC_ll = EdwardsAGC(variables.COM_PORT_gauge_ll)
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_ll)
variables.vacuum_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
variables.vacuum_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
[docs]
def initialize_edwards_tic_cryo_load_lock(conf, variables):
"""
Initialize TIC cryo load lock parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
Returns:
None
"""
E_AGC_cll = EdwardsAGC(variables.COM_PORT_gauge_cll)
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_cll)
variables.vacuum_cryo_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
variables.vacuum_cryo_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
[docs]
def initialize_edwards_tic_buffer_chamber(conf, variables):
"""
Initialize TIC buffer chamber parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
Returns:
None
"""
E_AGC_bc = EdwardsAGC(variables.COM_PORT_gauge_bc)
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_bc)
variables.vacuum_buffer_backing = float(response.replace(';', ' ').split()[2]) * 0.01
[docs]
def initialize_pfeiffer_gauges(variables):
"""
Initialize Pfeiffer gauge parameters.
Args:
variables: Variables instance.
Returns:
None
"""
tpg = TPG362(port=variables.COM_PORT_gauge_mc)
value, _ = tpg.pressure_gauge(2)
variables.vacuum_main = float(value)
value, _ = tpg.pressure_gauge(1)
variables.vacuum_buffer = float(value)
[docs]
def state_update(conf, variables, emitter):
"""
Read gauge parameters and update variables.
Args:
conf: Configuration parameters.
variables: Variables instance.
emitter: Emitter instance.
Returns:
None
"""
tpg = None
E_AGC_bc = None
E_AGC_ll = None
E_AGC_cll = None
reported_runtime_issues: set[str] = set()
def report_once(issue_key: str, message: str) -> None:
if issue_key not in reported_runtime_issues:
print(message)
reported_runtime_issues.add(issue_key)
def clear_issue(issue_key: str) -> None:
reported_runtime_issues.discard(issue_key)
if conf['gauges'] == "on":
if conf['COM_PORT_gauge_mc'] != "off":
try:
tpg = TPG362(port=variables.COM_PORT_gauge_mc)
except Exception as e:
print(
f"{bcolors.FAIL}"
f"{_format_port_error('Analysis chamber gauge', variables.COM_PORT_gauge_mc, e)}"
f"{bcolors.ENDC}"
)
tpg = None
if conf['COM_PORT_gauge_bc'] != "off":
try:
E_AGC_bc = EdwardsAGC(variables.COM_PORT_gauge_bc, variables)
except Exception as e:
print(
f"{bcolors.FAIL}{_format_port_error('Buffer chamber gauge', variables.COM_PORT_gauge_bc, e)}{bcolors.ENDC}"
)
E_AGC_bc = None
if conf['COM_PORT_gauge_ll'] != "off":
try:
E_AGC_ll = EdwardsAGC(variables.COM_PORT_gauge_ll, variables)
except Exception as e:
print(f"{bcolors.FAIL}{_format_port_error('Load-lock gauge', variables.COM_PORT_gauge_ll, e)}{bcolors.ENDC}")
E_AGC_ll = None
if conf['COM_PORT_gauge_cll'] != "off":
try:
E_AGC_cll = EdwardsAGC(variables.COM_PORT_gauge_cll, variables)
except Exception as e:
print(
f"{bcolors.FAIL}"
f"{_format_port_error('Cryo load-lock gauge', variables.COM_PORT_gauge_cll, e)}"
f"{bcolors.ENDC}"
)
E_AGC_cll = None
if conf['cryo'] == "off":
print('The cryo temperature monitoring is off')
else:
try:
com_port_cryovac = _open_cryovac_serial(variables.COM_PORT_cryo)
initialize_cryovac(com_port_cryovac, variables)
except Exception as e:
com_port_cryovac = None
print(_format_port_error('Cryovac', variables.COM_PORT_cryo, e))
cryovac_reconnect_interval = 30.0
last_cryovac_reconnect_attempt = time.time()
start_time = time.time()
log_time_time_interval = conf['log_time_time_interval']
vacuum_main = -1.0
vacuum_buffer = -1.0
vacuum_buffer_backing = -1.0
vacuum_load_lock = -1.0
vacuum_load_lock_backing = -1.0
vacuum_cryo_load_lock = -1.0
vacuum_cryo_load_lock_backing = -1.0
set_temperature_tmp_cryo = 0
set_temperature_tmp_ll = 0
while emitter.bool_flag_while_loop:
if conf['cryo'] == "on" and com_port_cryovac is None:
now = time.time()
if now - last_cryovac_reconnect_attempt >= cryovac_reconnect_interval:
last_cryovac_reconnect_attempt = now
if _serial_port_present(variables.COM_PORT_cryo):
try:
com_port_cryovac = _open_cryovac_serial(variables.COM_PORT_cryo)
initialize_cryovac(com_port_cryovac, variables)
clear_issue("cryovac_reconnect")
print(f"Cryovac reconnected on {variables.COM_PORT_cryo}")
except Exception as e:
com_port_cryovac = None
report_once(
"cryovac_reconnect",
_format_port_error('Cryovac', variables.COM_PORT_cryo, e),
)
if conf['cryo'] == "on" and com_port_cryovac is not None:
try:
output = command_cryovac('getOutput', com_port_cryovac)
except Exception as e:
print(e)
print("cannot read the cryo temperature")
output = '0'
try:
# Output indices are driven by config.toml cryo_sensor_X_index keys.
# Change those values to remap which device output feeds each sensor slot.
words = output.split()
idx1 = conf.get('cryo_sensor_1_index', 2) # cryo_head_outside
idx2 = conf.get('cryo_sensor_2_index', 1) # cryo_head_inside
idx3 = conf.get('cryo_sensor_3_index', 0) # stage
idx4 = conf.get('cryo_sensor_4_index', 3) # load_lock
temperature_cryo_head = float(words[idx1].replace(',', ''))
temperature_cryo_head_inside = float(words[idx2].replace(',', ''))
temperature_stage = float(words[idx3].replace(',', ''))
temperature_ll = float(words[idx4].replace(',', ''))
except Exception as e:
com_port_cryovac = None
temperature_cryo_head = -1
temperature_cryo_head_inside = -1
temperature_stage = -1
print(e)
# Handle the case where response is not a valid float
temperature = -1
variables.temperature = temperature_stage
emitter.temp_stage.emit(temperature_stage)
emitter.temp_cryo_head_inside.emit(temperature_cryo_head_inside)
emitter.temp_cryo_head.emit(temperature_cryo_head)
emitter.temp_ll.emit(temperature_ll - 273.15) # convert from kelvin to celsius
if variables.set_temperature_flag_cryo:
if variables.set_temperature_cryo != set_temperature_tmp_cryo:
try:
res = command_cryovac(f'Out1Cryo.PID.Setpoint {variables.set_temperature_cryo}', com_port_cryovac)
print(res)
set_temperature_tmp_cryo = variables.set_temperature_cryo
except Exception as e:
print(e)
print("cannot set the cryo temperature")
elif variables.set_temperature_flag_cryo == False:
variables.set_temperature_cryo = 0
res = command_cryovac(f'Out1Cryo.PID.Setpoint {variables.set_temperature_cryo}', com_port_cryovac)
variables.set_temperature_flag_cryo = None
if variables.set_temperature_flag_ll:
if variables.set_temperature_ll != set_temperature_tmp_ll:
try:
# convert from celcius to kelvin
set_temperature_ll = variables.set_temperature_ll + 273.15
res = command_cryovac(f'Out2LL.PID.Setpoint {set_temperature_ll}', com_port_cryovac)
print(res)
set_temperature_tmp_ll = variables.set_temperature_ll
except Exception as e:
print(e)
print("cannot set the load lock temperature")
elif variables.set_temperature_flag_ll == False:
variables.set_temperature_ll = 0
res = command_cryovac(f'Out2LL.PID.Setpoint {variables.set_temperature_ll}', com_port_cryovac)
variables.set_temperature_flag_ll = None
if conf['COM_PORT_gauge_mc'] != "off" and tpg is not None:
value, _ = tpg.pressure_gauge(2)
try:
vacuum_main = float(value)
clear_issue("vacuum_main")
except Exception as e:
report_once("vacuum_main", f"Error reading Temperature:{e}")
# Handle the case where response is not a valid float
vacuum_main = -1.0
variables.vacuum_main = vacuum_main
emitter.vacuum_main.emit(float(vacuum_main))
value, _ = tpg.pressure_gauge(1)
try:
vacuum_buffer = float(value)
clear_issue("vacuum_buffer")
except Exception as e:
report_once("vacuum_buffer", f"Error reading BC:{e}")
tpg = None
# Handle the case where response is not a valid float
vacuum_buffer = -1.0
variables.vacuum_buffer = vacuum_buffer
emitter.vacuum_buffer.emit(float(vacuum_buffer))
if conf['pump_ll'] != "off" and E_AGC_ll is not None:
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_ll, status='load_lock')
try:
if isinstance(response, str):
vacuum_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
clear_issue("vacuum_load_lock")
else:
raise ValueError("device returned no pressure payload")
except Exception as e:
report_once("vacuum_load_lock", f"Error reading LL:{e}")
E_AGC_ll = None
# Handle the case where response is not a valid float
vacuum_load_lock = -1
try:
if isinstance(response, str):
vacuum_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
clear_issue("vacuum_load_lock_backing")
else:
raise ValueError("device returned no backing-pressure payload")
except Exception as e:
report_once("vacuum_load_lock_backing", f"Error reading LL backing:{e}")
E_AGC_ll = None
# Handle the case where response is not a valid float
vacuum_load_lock_backing = -1
variables.vacuum_load_lock = vacuum_load_lock
variables.vacuum_load_lock_backing = vacuum_load_lock_backing
emitter.vacuum_load_lock.emit(vacuum_load_lock)
emitter.vacuum_load_lock_back.emit(vacuum_load_lock_backing)
if conf['pump_cll'] != "off" and E_AGC_cll is not None:
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_cll, status='cryo_load_lock')
try:
if isinstance(response, str):
vacuum_cryo_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
clear_issue("vacuum_cryo_load_lock")
else:
raise ValueError("device returned no pressure payload")
except Exception as e:
report_once("vacuum_cryo_load_lock", f"Error reading CLL:{e}")
E_AGC_cll = None
# Handle the case where response is not a valid float
vacuum_cryo_load_lock = -1
try:
if isinstance(response, str):
vacuum_cryo_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
clear_issue("vacuum_cryo_load_lock_backing")
else:
raise ValueError("device returned no backing-pressure payload")
except Exception as e:
report_once("vacuum_cryo_load_lock_backing", f"Error reading CLL backing:{e}")
# Handle the case where response is not a valid float
vacuum_cryo_load_lock_backing = -1
variables.vacuum_cryo_load_lock = vacuum_cryo_load_lock
variables.vacuum_cryo_load_lock_backing = vacuum_cryo_load_lock_backing
emitter.vacuum_cryo_load_lock.emit(vacuum_cryo_load_lock)
emitter.vacuum_cryo_load_lock_back.emit(vacuum_cryo_load_lock_backing)
if conf['COM_PORT_gauge_bc'] != "off" and E_AGC_bc is not None:
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_bc)
try:
if isinstance(response, str):
vacuum_buffer_backing = float(response.replace(';', ' ').split()[2]) * 0.01
clear_issue("vacuum_buffer_backing")
else:
raise ValueError("device returned no pressure payload")
except Exception as e:
report_once("vacuum_buffer_backing", f"Error reading BC backing:{e}")
# Handle the case where response is not a valid float
vacuum_buffer_backing = -1
variables.vacuum_buffer_backing = vacuum_buffer_backing
emitter.vacuum_buffer_back.emit(vacuum_buffer_backing)
elapsed_time = time.time() - start_time
# Every 30 minutes, log the vacuum levels
if elapsed_time > log_time_time_interval:
start_time = time.time()
try:
log_vacuum_levels(
vacuum_main,
vacuum_buffer,
vacuum_buffer_backing,
vacuum_load_lock,
vacuum_load_lock_backing,
vacuum_cryo_load_lock,
vacuum_cryo_load_lock_backing,
)
except Exception as e:
print(e)
print("cannot log the vacuum levels")
time.sleep(1)
[docs]
def log_vacuum_levels(
main_chamber, buffer_chamber, buffer_chamber_pre, load_lock, load_lock_pre, cryo_load_lock, cryo_load_lock_pre
):
"""
Log vacuum levels to a text file and a CSV file.
Args:
main_chamber (float): Vacuum level of the main chamber.
buffer_chamber (float): Vacuum level of the buffer chamber.
buffer_chamber_pre (float): Vacuum level of the buffer chamber backing pump.
load_lock (float): Vacuum level of the load lock.
load_lock_pre(float): Vacuum level of the load lock backing pump.
cryo_load_lock (float): Vacuum level of the cryo load lock.
cryo_load_lock_pre (float): Vacuum level of the cryo load lock backing pump.
Returns:
None
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_month = datetime.now().strftime("%Y-%m")
path = runtime.project_path("files", "logs", "vacuum")
os.makedirs(path, mode=0o777, exist_ok=True)
txt_file_path = path / f"vacuum_log_{current_month}.txt"
csv_file_path = path / f"vacuum_log_{current_month}.csv"
with open(txt_file_path, "a") as log_file:
log_file.write(
f"{timestamp}: Main Chamber={main_chamber}, Buffer Chamber={buffer_chamber}, "
f"Buffer Chamber Pre={buffer_chamber_pre}, Load Lock={load_lock}, "
f"Load Lock Pre={load_lock_pre}, Cryo Load Lock={cryo_load_lock}, "
f"Cryo Load Lock Pre={cryo_load_lock_pre}\n"
)
row = [
timestamp,
main_chamber,
buffer_chamber,
buffer_chamber_pre,
load_lock,
load_lock_pre,
cryo_load_lock,
cryo_load_lock_pre,
]
header = [
"Timestamp",
"Main Chamber",
"Buffer Chamber",
"Buffer Chamber Backing Pump",
"Load Lock",
"Load Lock Backing",
'Cryo Load Lock',
'Cryo Load Lock Backing',
]
file_empty = not os.path.exists(csv_file_path) or os.path.getsize(csv_file_path) == 0
# Write to CSV file
with open(csv_file_path, "a", newline='') as log_file:
csv_writer = csv.writer(log_file)
# Write the header if the file is empty
if file_empty:
csv_writer.writerow(header)
# Write the data row
csv_writer.writerow(row)