import csv
import os
import time
from datetime import datetime
import serial.tools.list_ports
from pyccapt.control.core import runtime
from pyccapt.control.devices.edwards_tic import EdwardsAGC
from pyccapt.control.devices.pfeiffer_gauges import TPG362
[docs]
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def _available_serial_ports_text() -> str:
ports = sorted(port.device for port in serial.tools.list_ports.comports() if getattr(port, "device", ""))
return ", ".join(ports) if ports else "none detected"
def _format_port_error(device_label, port, exc) -> str:
return (
f"{device_label} unavailable on {port}: {exc}. "
f"Available serial ports: {_available_serial_ports_text()}"
)
[docs]
def command_cryovac(cmd, com_port_cryovac):
"""
Execute a command on Cryovac through serial communication.
Args:
cmd: Command to be executed.
com_port_cryovac: Serial communication object.
Returns:
Response code after executing the command.
"""
com_port_cryovac.write((cmd + '\r\n').encode())
time.sleep(0.1)
response = ''
while com_port_cryovac.in_waiting > 0:
response = com_port_cryovac.readline()
if isinstance(response, bytes):
response = response.decode("utf-8")
return response
[docs]
def command_edwards(conf, variables, cmd, E_AGC, status=None):
"""
Execute commands and set flags based on parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
cmd: Command to be executed.
E_AGC: EdwardsAGC instance.
status: Status of the lock.
Returns:
Response code after executing the command.
"""
response = -1
try:
if variables.flag_pump_load_lock_click and variables.flag_pump_load_lock and status == 'load_lock':
if conf['pump_ll'] == "on":
E_AGC.comm('!C910 0')
E_AGC.comm('!C904 0')
variables.flag_pump_load_lock_click = False
variables.flag_pump_load_lock = False
variables.flag_pump_load_lock_led = False
time.sleep(1)
elif variables.flag_pump_load_lock_click and not variables.flag_pump_load_lock and status == 'load_lock':
if conf['pump_ll'] == "on":
E_AGC.comm('!C910 1')
E_AGC.comm('!C904 1')
variables.flag_pump_load_lock_click = False
variables.flag_pump_load_lock = True
variables.flag_pump_load_lock_led = True
time.sleep(1)
if variables.flag_pump_cryo_load_lock_click and variables.flag_pump_cryo_load_lock and status == 'cryo_load_lock':
if conf['pump_cll'] == "on":
E_AGC.comm('!C910 0')
E_AGC.comm('!C904 0')
variables.flag_pump_cryo_load_lock_click = False
variables.flag_pump_cryo_load_lock = False
variables.flag_pump_cryo_load_lock_led = False
time.sleep(1)
elif (variables.flag_pump_cryo_load_lock_click and not variables.flag_pump_cryo_load_lock and
status == 'cryo_load_lock'):
if conf['pump_cll'] == "on":
E_AGC.comm('!C910 1')
E_AGC.comm('!C904 1')
variables.flag_pump_cryo_load_lock_click = False
variables.flag_pump_cryo_load_lock = True
variables.flag_pump_cryo_load_lock_led = True
time.sleep(1)
if conf['COM_PORT_gauge_ll'] != "off" or conf['COM_PORT_gauge_cll'] != "off":
if cmd == 'pressure':
response_tmp = E_AGC.comm('?V911')
response_tmp = float(response_tmp.replace(';', ' ').split()[1])
if response_tmp < 90 and status == 'load_lock':
variables.flag_pump_load_lock_led = False
elif response_tmp >= 90 and status == 'load_lock':
variables.flag_pump_load_lock_led = True
if response_tmp < 90 and status == 'cryo_load_lock':
variables.flag_pump_cryo_load_lock_led = False
elif response_tmp >= 90 and status == 'cryo_load_lock':
variables.flag_pump_cryo_load_lock_led = True
response = E_AGC.comm('?V940')
else:
print('Unknown command for Edwards TIC Load Lock')
except Exception:
response = -1 # Set response to -1 indicate an error
return response
[docs]
def initialize_cryovac(com_port_cryovac, variables):
"""
Initialize the communication port of Cryovac.
Args:
com_port_cryovac: Serial communication object.
variables: Variables instance.
Returns:
None
"""
output = command_cryovac('getOutput', com_port_cryovac)
time.sleep(0.1)
variables.temperature = float(output.split()[0].replace(',', ''))
[docs]
def initialize_edwards_tic_load_lock(conf, variables):
"""
Initialize TIC load lock parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
Returns:
None
"""
E_AGC_ll = EdwardsAGC(variables.COM_PORT_gauge_ll)
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_ll)
variables.vacuum_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
variables.vacuum_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
[docs]
def initialize_edwards_tic_cryo_load_lock(conf, variables):
"""
Initialize TIC cryo load lock parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
Returns:
None
"""
E_AGC_cll = EdwardsAGC(variables.COM_PORT_gauge_cll)
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_cll)
variables.vacuum_cryo_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
variables.vacuum_cryo_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
[docs]
def initialize_edwards_tic_buffer_chamber(conf, variables):
"""
Initialize TIC buffer chamber parameters.
Args:
conf: Configuration parameters.
variables: Variables instance.
Returns:
None
"""
E_AGC_bc = EdwardsAGC(variables.COM_PORT_gauge_bc)
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_bc)
variables.vacuum_buffer_backing = float(response.replace(';', ' ').split()[2]) * 0.01
[docs]
def initialize_pfeiffer_gauges(variables):
"""
Initialize Pfeiffer gauge parameters.
Args:
variables: Variables instance.
Returns:
None
"""
tpg = TPG362(port=variables.COM_PORT_gauge_mc)
value, _ = tpg.pressure_gauge(2)
variables.vacuum_main = float(value)
value, _ = tpg.pressure_gauge(1)
variables.vacuum_buffer = float(value)
[docs]
def state_update(conf, variables, emitter):
"""
Read gauge parameters and update variables.
Args:
conf: Configuration parameters.
variables: Variables instance.
emitter: Emitter instance.
Returns:
None
"""
tpg = None
E_AGC_bc = None
E_AGC_ll = None
E_AGC_cll = None
reported_runtime_issues: set[str] = set()
def report_once(issue_key: str, message: str) -> None:
if issue_key not in reported_runtime_issues:
print(message)
reported_runtime_issues.add(issue_key)
def clear_issue(issue_key: str) -> None:
reported_runtime_issues.discard(issue_key)
if conf['gauges'] == "on":
if conf['COM_PORT_gauge_mc'] != "off":
try:
tpg = TPG362(port=variables.COM_PORT_gauge_mc)
except Exception as e:
print(
f"{bcolors.FAIL}"
f"{_format_port_error('Analysis chamber gauge', variables.COM_PORT_gauge_mc, e)}"
f"{bcolors.ENDC}"
)
tpg = None
if conf['COM_PORT_gauge_bc'] != "off":
try:
E_AGC_bc = EdwardsAGC(variables.COM_PORT_gauge_bc, variables)
except Exception as e:
print(
f"{bcolors.FAIL}"
f"{_format_port_error('Buffer chamber gauge', variables.COM_PORT_gauge_bc, e)}"
f"{bcolors.ENDC}"
)
E_AGC_bc = None
if conf['COM_PORT_gauge_ll'] != "off":
try:
E_AGC_ll = EdwardsAGC(variables.COM_PORT_gauge_ll, variables)
except Exception as e:
print(
f"{bcolors.FAIL}"
f"{_format_port_error('Load-lock gauge', variables.COM_PORT_gauge_ll, e)}"
f"{bcolors.ENDC}"
)
E_AGC_ll = None
if conf['COM_PORT_gauge_cll'] != "off":
try:
E_AGC_cll = EdwardsAGC(variables.COM_PORT_gauge_cll, variables)
except Exception as e:
print(
f"{bcolors.FAIL}"
f"{_format_port_error('Cryo load-lock gauge', variables.COM_PORT_gauge_cll, e)}"
f"{bcolors.ENDC}"
)
E_AGC_cll = None
if conf['cryo'] == "off":
print('The cryo temperature monitoring is off')
else:
try:
com_port_cryovac = serial.Serial(
port=variables.COM_PORT_cryo,
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE
)
initialize_cryovac(com_port_cryovac, variables)
except Exception as e:
com_port_cryovac = None
print(_format_port_error('Cryovac', variables.COM_PORT_cryo, e))
start_time = time.time()
log_time_time_interval = conf['log_time_time_interval']
vacuum_main = -1.0
vacuum_buffer = -1.0
vacuum_buffer_backing = -1.0
vacuum_load_lock = -1.0
vacuum_load_lock_backing = -1.0
vacuum_cryo_load_lock = -1.0
vacuum_cryo_load_lock_backing = -1.0
set_temperature_tmp_cryo = 0
set_temperature_tmp_ll = 0
while emitter.bool_flag_while_loop:
if conf['cryo'] == "on" and com_port_cryovac is not None:
try:
output = command_cryovac('getOutput', com_port_cryovac)
except Exception as e:
print(e)
print("cannot read the cryo temperature")
output = '0'
try:
# Output indices are driven by config.toml cryo_sensor_X_index keys.
# Change those values to remap which device output feeds each sensor slot.
words = output.split()
idx1 = conf.get('cryo_sensor_1_index', 2) # cryo_head_outside
idx2 = conf.get('cryo_sensor_2_index', 1) # cryo_head_inside
idx3 = conf.get('cryo_sensor_3_index', 0) # stage
idx4 = conf.get('cryo_sensor_4_index', 3) # load_lock
temperature_cryo_head = float(words[idx1].replace(',', ''))
temperature_cryo_head_inside = float(words[idx2].replace(',', ''))
temperature_stage = float(words[idx3].replace(',', ''))
temperature_ll = float(words[idx4].replace(',', ''))
except Exception as e:
com_port_cryovac = None
temperature_cryo_head = -1
temperature_cryo_head_inside = -1
temperature_stage = -1
print(e)
# Handle the case where response is not a valid float
temperature = -1
variables.temperature = temperature_stage
emitter.temp_stage.emit(temperature_stage)
emitter.temp_cryo_head_inside.emit(temperature_cryo_head_inside)
emitter.temp_cryo_head.emit(temperature_cryo_head)
emitter.temp_ll.emit(temperature_ll - 273.15) # convert from kelvin to celsius
if variables.set_temperature_flag_cryo:
if variables.set_temperature_cryo != set_temperature_tmp_cryo:
try:
res = command_cryovac(f'Out1Cryo.PID.Setpoint {variables.set_temperature_cryo}',
com_port_cryovac)
print(res)
set_temperature_tmp_cryo = variables.set_temperature_cryo
except Exception as e:
print(e)
print("cannot set the cryo temperature")
elif variables.set_temperature_flag_cryo == False:
variables.set_temperature_cryo = 0
res = command_cryovac(f'Out1Cryo.PID.Setpoint {variables.set_temperature_cryo}', com_port_cryovac)
variables.set_temperature_flag_cryo = None
if variables.set_temperature_flag_ll:
if variables.set_temperature_ll != set_temperature_tmp_ll:
try:
# convert from celcius to kelvin
set_temperature_ll = variables.set_temperature_ll + 273.15
res = command_cryovac(f'Out2LL.PID.Setpoint {set_temperature_ll}', com_port_cryovac)
print(res)
set_temperature_tmp_ll = variables.set_temperature_ll
except Exception as e:
print(e)
print("cannot set the load lock temperature")
elif variables.set_temperature_flag_ll == False:
variables.set_temperature_ll = 0
res = command_cryovac(f'Out2LL.PID.Setpoint {variables.set_temperature_ll}', com_port_cryovac)
variables.set_temperature_flag_ll = None
if conf['COM_PORT_gauge_mc'] != "off" and tpg is not None:
value, _ = tpg.pressure_gauge(2)
try:
vacuum_main = float(value)
clear_issue("vacuum_main")
except Exception as e:
report_once("vacuum_main", f"Error reading Temperature:{e}")
# Handle the case where response is not a valid float
vacuum_main = -1.0
variables.vacuum_main = vacuum_main
emitter.vacuum_main.emit(float(vacuum_main))
value, _ = tpg.pressure_gauge(1)
try:
vacuum_buffer = float(value)
clear_issue("vacuum_buffer")
except Exception as e:
report_once("vacuum_buffer", f"Error reading BC:{e}")
tpg = None
# Handle the case where response is not a valid float
vacuum_buffer = -1.0
variables.vacuum_buffer = vacuum_buffer
emitter.vacuum_buffer.emit(float(vacuum_buffer))
if conf['pump_ll'] != "off" and E_AGC_ll is not None:
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_ll, status='load_lock')
try:
if isinstance(response, str):
vacuum_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
clear_issue("vacuum_load_lock")
else:
raise ValueError("device returned no pressure payload")
except Exception as e:
report_once("vacuum_load_lock", f"Error reading LL:{e}")
E_AGC_ll = None
# Handle the case where response is not a valid float
vacuum_load_lock = -1
try:
if isinstance(response, str):
vacuum_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
clear_issue("vacuum_load_lock_backing")
else:
raise ValueError("device returned no backing-pressure payload")
except Exception as e:
report_once("vacuum_load_lock_backing", f"Error reading LL backing:{e}")
E_AGC_ll = None
# Handle the case where response is not a valid float
vacuum_load_lock_backing = -1
emitter.vacuum_load_lock.emit(vacuum_load_lock)
emitter.vacuum_load_lock_back.emit(vacuum_load_lock_backing)
if conf['pump_cll'] != "off" and E_AGC_cll is not None:
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_cll, status='cryo_load_lock')
try:
if isinstance(response, str):
vacuum_cryo_load_lock = float(response.replace(';', ' ').split()[2]) * 0.01
clear_issue("vacuum_cryo_load_lock")
else:
raise ValueError("device returned no pressure payload")
except Exception as e:
report_once("vacuum_cryo_load_lock", f"Error reading CLL:{e}")
E_AGC_cll = None
# Handle the case where response is not a valid float
vacuum_cryo_load_lock = -1
try:
if isinstance(response, str):
vacuum_cryo_load_lock_backing = float(response.replace(';', ' ').split()[4]) * 0.01
clear_issue("vacuum_cryo_load_lock_backing")
else:
raise ValueError("device returned no backing-pressure payload")
except Exception as e:
report_once("vacuum_cryo_load_lock_backing", f"Error reading CLL backing:{e}")
# Handle the case where response is not a valid float
vacuum_cryo_load_lock_backing = -1
emitter.vacuum_cryo_load_lock.emit(vacuum_cryo_load_lock)
emitter.vacuum_cryo_load_lock_back.emit(vacuum_cryo_load_lock_backing)
if conf['COM_PORT_gauge_bc'] != "off" and E_AGC_bc is not None:
response = command_edwards(conf, variables, 'pressure', E_AGC=E_AGC_bc)
try:
if isinstance(response, str):
vacuum_buffer_backing = float(response.replace(';', ' ').split()[2]) * 0.01
clear_issue("vacuum_buffer_backing")
else:
raise ValueError("device returned no pressure payload")
except Exception as e:
report_once("vacuum_buffer_backing", f"Error reading BC backing:{e}")
# Handle the case where response is not a valid float
vacuum_buffer_backing = -1
variables.vacuum_buffer_backing = vacuum_buffer_backing
emitter.vacuum_buffer_back.emit(vacuum_buffer_backing)
elapsed_time = time.time() - start_time
# Every 30 minutes, log the vacuum levels
if elapsed_time > log_time_time_interval:
start_time = time.time()
try:
log_vacuum_levels(vacuum_main, vacuum_buffer, vacuum_buffer_backing, vacuum_load_lock,
vacuum_load_lock_backing, vacuum_cryo_load_lock, vacuum_cryo_load_lock_backing)
except Exception as e:
print(e)
print("cannot log the vacuum levels")
time.sleep(1)
[docs]
def log_vacuum_levels(main_chamber, buffer_chamber, buffer_chamber_pre, load_lock, load_lock_pre,
cryo_load_lock, cryo_load_lock_pre):
"""
Log vacuum levels to a text file and a CSV file.
Args:
main_chamber (float): Vacuum level of the main chamber.
buffer_chamber (float): Vacuum level of the buffer chamber.
buffer_chamber_pre (float): Vacuum level of the buffer chamber backing pump.
load_lock (float): Vacuum level of the load lock.
load_lock_pre(float): Vacuum level of the load lock backing pump.
cryo_load_lock (float): Vacuum level of the cryo load lock.
cryo_load_lock_pre (float): Vacuum level of the cryo load lock backing pump.
Returns:
None
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_month = datetime.now().strftime("%Y-%m")
path = runtime.project_path("files", "logs", "vacuum")
os.makedirs(path, mode=0o777, exist_ok=True)
txt_file_path = path / f"vacuum_log_{current_month}.txt"
csv_file_path = path / f"vacuum_log_{current_month}.csv"
with open(txt_file_path, "a") as log_file:
log_file.write(f"{timestamp}: Main Chamber={main_chamber}, Buffer Chamber={buffer_chamber}, "
f"Buffer Chamber Pre={buffer_chamber_pre}, Load Lock={load_lock}, "
f"Load Lock Pre={load_lock_pre}, Cryo Load Lock={cryo_load_lock}, "
f"Cryo Load Lock Pre={cryo_load_lock_pre}\n")
row = [timestamp, main_chamber, buffer_chamber, buffer_chamber_pre, load_lock, load_lock_pre, cryo_load_lock,
cryo_load_lock_pre]
header = ["Timestamp", "Main Chamber", "Buffer Chamber", "Buffer Chamber Backing Pump", "Load Lock",
"Load Lock Backing", 'Cryo Load Lock', 'Cryo Load Lock Backing']
file_empty = not os.path.exists(csv_file_path) or os.path.getsize(csv_file_path) == 0
# Write to CSV file
with open(csv_file_path, "a", newline='') as log_file:
csv_writer = csv.writer(log_file)
# Write the header if the file is empty
if file_empty:
csv_writer.writerow(header)
# Write the data row
csv_writer.writerow(row)