Source code for pyccapt.control.tdc_surface_concept.tdc_surface_concept

import multiprocessing as mp
import os
import time
from queue import Queue

import numpy as np

# local imports
from pyccapt.control.devices import initialize_devices
from pyccapt.control.tdc_surface_concept import scTDC

QUEUE_DATA = 0
QUEUE_ENDOFMEAS = 1
CHUNK_SIZE = 100_000  # Adjust the chunk size if needed

[docs] class BufDataCB4(scTDC.buffered_data_callbacks_pipe): """ The class inherits from python wrapper module scTDC and class: buffered_data_callbacks_pipe """ def __init__(self, lib, dev_desc, data_field_selection, dld_events, max_buffered_data_len=500_000): """ Initialize the base class: scTDC.buffered_data_callbacks_pipe Args: lib (scTDClib): A scTDClib object. dev_desc (int): Device descriptor as returned by sc_tdc_init_inifile(...). data_field_selection (int): A 'bitwise or' combination of SC_DATA_FIELD_xyz constants. dld_events (bool): True to receive DLD events, False to receive TDC events. max_buffered_data_len (int): Number of events buffered before invoking callbacks. """ super().__init__(lib, dev_desc, data_field_selection, max_buffered_data_len, dld_events) self.queue = Queue() self.end_of_meas = False
[docs] def on_data(self, d): """ This class method function: 1. Makes a deep copy of numpy arrays in d 2. Inserts basic values by simple assignment 3. Inserts numpy arrays using the copy method of the source array Args: d (dict): Data dictionary. Returns: None """ dcopy = {} for k in d.keys(): if isinstance(d[k], np.ndarray): dcopy[k] = d[k].copy() else: dcopy[k] = d[k] self.queue.put((QUEUE_DATA, dcopy)) if self.end_of_meas: self.end_of_meas = False self.queue.put((QUEUE_ENDOFMEAS, None))
[docs] def on_end_of_meas(self): """ This class method sets end_of_meas to True. Returns: True (bool) """ self.end_of_meas = True return True
[docs] def errorcheck(device, bufdatacb, bufdatacb_raw, retcode): """ This function checks return codes for errors and does cleanup. Args: retcode (int): Return code. bufdatacb (BufDataCB4): A BufDataCB4 object. bufdatacb_raw (BufDataCB4): A BufDataCB4 object. device (scTDC.Device): A scTDC.Device object. Returns: int: 0 if success return code or return code > 0, -1 if return code is error code or less than 0. """ if retcode < 0: print(device.lib.sc_get_err_msg(retcode)) bufdatacb.close() bufdatacb_raw.close() device.deinitialize() return -1 else: return 0
[docs] def save_chunk_worker(save_queue): while True: task = save_queue.get() if task is None: # Stop signal break chunk_id, path, chunk_data = task # Extract data try: for key, data in chunk_data.items(): np.save(os.path.join(path, f"chunks/{key}_chunk_{chunk_id}.npy"), np.array(data)) print(f"Chunk {chunk_id} saved.") except Exception as e: print(f"Error saving chunk {chunk_id}: {e}") time.sleep(0.5) # Reduce CPU usage in case of continuous requests
[docs] def load_and_concatenate_chunks(path, chunk_id): attr_names = [ "x_bin", "x", "y_bin", "y", "t_bin", "t", "voltage", "voltage_pulse", "laser_pulse", "start_counter", "channel", "time", "tdc_start_counter", "voltage_tdc", "voltage_pulse_tdc", "laser_pulse_tdc" ] all_data = {attr: [] for attr in attr_names} # Initialize storage for i in range(1, chunk_id + 1): for attr in attr_names: chunk_file = os.path.join(path, f"chunks/{attr}_chunk_{i}.npy") if os.path.exists(chunk_file): try: all_data[attr].append(np.load(chunk_file)) # Load NumPy array directly except Exception as e: print(f"Error loading {chunk_file}: {e}") else: print(f"Warning: File '{chunk_file}' not found.") # Convert lists of arrays to single NumPy arrays (faster processing) return tuple(np.concatenate(all_data[attr]) if all_data[attr] else np.array([]) for attr in attr_names)
[docs] def run_experiment_measure(variables, x_plot, y_plot, t_plot, main_v_dc_plot, stop_event): """ Measurement function: This function is called in a process to read data from the queue. Args: variables (share_variables.Variables): A share_variables.Variables object. x_plot (multiprocessing.Array): A multiprocessing.Array object. y_plot (multiprocessing.Array): A multiprocessing.Array object. t_plot (multiprocessing.Array): A multiprocessing.Array object. main_v_dc_plot (multiprocessing.Array): A multiprocessing.Array object. stop_event (multiprocessing.Event): A multiprocessing.Event object. Returns: int: Return code. """ exposure_time = 100 # surface concept tdc specific binning and factors TOFFACTOR = 27.432 / (1000 * 4) # 27.432 ps/bin, tof in ns, data is TDC time sum DETBINS = 4900 BINNINGFAC = 2 XYFACTOR = 80 / DETBINS * BINNINGFAC # XXX mm/bin XYBINSHIFT = DETBINS / BINNINGFAC / 2 # to center detector device = scTDC.Device(autoinit=False) retcode, errmsg = device.initialize() if retcode < 0: print("Error during init:", retcode, errmsg) print(f"{initialize_devices.bcolors.FAIL}Error: Restart the TDC manually " f"(Turn it On and Off){initialize_devices.bcolors.ENDC}") variables.flag_finished_tdc = True if not getattr(variables, "access_override_enabled", False): variables.flag_tdc_failure = True return -1 print("Access Override is active. Continuing without a Surface Concept detector.") variables.flag_tdc_failure = False return 0 else: print("TDC is successfully initialized") variables.flag_tdc_failure = False DATA_FIELD_SEL = (scTDC.SC_DATA_FIELD_DIF1 | scTDC.SC_DATA_FIELD_DIF2 | scTDC.SC_DATA_FIELD_TIME | scTDC.SC_DATA_FIELD_START_COUNTER) DATA_FIELD_SEL_raw = (scTDC.SC_DATA_FIELD_TIME | scTDC.SC_DATA_FIELD_CHANNEL | scTDC.SC_DATA_FIELD_START_COUNTER) bufdatacb = BufDataCB4(device.lib, device.dev_desc, DATA_FIELD_SEL, dld_events=True) bufdatacb_raw = BufDataCB4(device.lib, device.dev_desc, DATA_FIELD_SEL_raw, dld_events=False) # DLD data xx = [] yy = [] tt = [] voltage_data = [] voltage_pulse_data = [] laser_pulse_data = [] start_counter = [] # The binning of DLD events xx_list_bin = [] yy_list_bin = [] tt_list_bin = [] # TDC data (Raw data) channel_data = [] time_data = [] tdc_start_counter = [] voltage_data_tdc = [] voltage_pulse_data_tdc = [] laser_pulse_data_tdc = [] retcode = bufdatacb.start_measurement(exposure_time) if errorcheck(device, bufdatacb, bufdatacb_raw, retcode) < 0: print("Error during read:", retcode, device.lib.sc_get_err_msg(retcode)) print(f"{initialize_devices.bcolors.FAIL}Error: Restart the TDC manually " f"(Turn it On and Off){initialize_devices.bcolors.ENDC}") variables.flag_finished_tdc = True if not getattr(variables, "access_override_enabled", False): variables.flag_tdc_failure = True return -1 print("Access Override is active. Continuing without a Surface Concept detector.") variables.flag_tdc_failure = False return 0 loop_time = 1 / variables.ex_freq events_detected = 0 events_detected_tmp = 0 raw_signal_detected = 0 start_time = time.time() pulse_frequency = variables.pulse_frequency * 1000 loop_counter = 0 loop_delay_counter = 0 chunk_id = 0 save_queue = mp.Queue() save_process = mp.Process(target=save_chunk_worker, args=(save_queue,)) save_process.start() path = variables.path + "/temp_data/" # Create folder to save the data if not os.path.isdir(path): os.makedirs(path, mode=0o777, exist_ok=True) if not os.path.isdir(path + "chunks/"): os.makedirs(path + "chunks/", mode=0o777, exist_ok=True) while not stop_event.is_set(): start_time_loop = time.time() eventtype, data = bufdatacb.queue.get() eventtype_raw, data_raw = bufdatacb_raw.queue.get() # waits until element available specimen_voltage = variables.specimen_voltage voltage_pulse = variables.pulse_voltage laser_pulse = variables.laser_pulse_energy if eventtype == QUEUE_DATA: # correct for binning of surface concept xx_dif = data["dif1"] length = len(xx_dif) if length > 0: events_detected_tmp += length events_detected += length yy_dif = data["dif2"] tt_dif = data["time"] start_counter.extend(data["start_counter"].tolist()) xx_tmp = ((xx_dif - XYBINSHIFT) * XYFACTOR) * 0.1 # from mm to in cm by dividing by 10 yy_tmp = ((yy_dif - XYBINSHIFT) * XYFACTOR) * 0.1 # from mm to in cm by dividing by 10 tt_tmp = tt_dif * TOFFACTOR # in ns dc_voltage_tmp = np.tile(specimen_voltage, len(xx_tmp)) # put data in shared memory for visualization x_plot.put(xx_tmp) y_plot.put(yy_tmp) t_plot.put(tt_tmp) main_v_dc_plot.put(dc_voltage_tmp) # change to list xx_tmp = xx_tmp.tolist() yy_tmp = yy_tmp.tolist() tt_tmp = tt_tmp.tolist() # extend the main list with the new data xx.extend(xx_tmp) yy.extend(yy_tmp) tt.extend(tt_tmp) dc_voltage_tmp = dc_voltage_tmp.tolist() p_voltage_tmp = np.tile(voltage_pulse, len(xx_tmp)).tolist() p_laser_tmp = np.tile(laser_pulse, len(xx_tmp)).tolist() voltage_data.extend(dc_voltage_tmp) voltage_pulse_data.extend(p_voltage_tmp) laser_pulse_data.extend(p_laser_tmp) # The binning of DLD events xx_list_bin.extend(xx_dif.tolist()) yy_list_bin.extend(yy_dif.tolist()) tt_list_bin.extend(tt_dif.tolist()) if eventtype_raw == QUEUE_DATA: channel_data_tmp = data_raw["channel"].tolist() if len(channel_data_tmp) > 0: raw_signal_detected += len(channel_data_tmp) tdc_start_counter.extend(data_raw["start_counter"].tolist()) time_data.extend(data_raw["time"].tolist()) # raw data channel_data.extend(channel_data_tmp) voltage_data_tdc.extend((np.tile(specimen_voltage, len(channel_data_tmp))).tolist()) voltage_pulse_data_tdc.extend((np.tile(voltage_pulse, len(channel_data_tmp))).tolist()) laser_pulse_data_tdc.extend((np.tile(laser_pulse, len(channel_data_tmp))).tolist()) if eventtype == QUEUE_ENDOFMEAS: retcode = bufdatacb.start_measurement(exposure_time, retries=10) # retries is the number of times to retry if retcode < 0: print("Error during read (error code: %s - error msg: %s):" % (retcode, device.lib.sc_get_err_msg(retcode))) # variables.flag_tdc_failure = True break # Calculate the detection rate # Check if the detection rate interval has passed current_time = time.time() if current_time - start_time >= 0.5: detection_rate = events_detected_tmp * 100 / pulse_frequency variables.detection_rate_current = detection_rate * 2 # to get the rate per second variables.detection_rate_current_plot = detection_rate * 2 # to get the rate per second variables.total_ions = events_detected variables.total_raw_signals = raw_signal_detected # Reset the counter and timer events_detected_tmp = 0 start_time = current_time if len(xx) >= CHUNK_SIZE: chunk_id += 1 chunk_data = { "x_bin": xx_list_bin[:CHUNK_SIZE], "x": xx[:CHUNK_SIZE], "y_bin": yy_list_bin[:CHUNK_SIZE], "y": yy[:CHUNK_SIZE], "t_bin": tt_list_bin[:CHUNK_SIZE], "t": tt[:CHUNK_SIZE], "voltage": voltage_data[:CHUNK_SIZE], "voltage_pulse": voltage_pulse_data[:CHUNK_SIZE], "laser_pulse": laser_pulse_data[:CHUNK_SIZE], "start_counter": start_counter[:CHUNK_SIZE], "channel": channel_data[:CHUNK_SIZE], "time": time_data[:CHUNK_SIZE], "tdc_start_counter": tdc_start_counter[:CHUNK_SIZE], "voltage_tdc": voltage_data_tdc[:CHUNK_SIZE], "voltage_pulse_tdc": voltage_pulse_data_tdc[:CHUNK_SIZE], "laser_pulse_tdc": laser_pulse_data_tdc[:CHUNK_SIZE], } # Send chunk data to saving process (non-blocking) save_queue.put((chunk_id, path, chunk_data)) # Remove saved data from memory del xx[:CHUNK_SIZE], yy[:CHUNK_SIZE], tt[:CHUNK_SIZE] del xx_list_bin[:CHUNK_SIZE], yy_list_bin[:CHUNK_SIZE], tt_list_bin[:CHUNK_SIZE] del voltage_data[:CHUNK_SIZE], voltage_pulse_data[:CHUNK_SIZE], laser_pulse_data[:CHUNK_SIZE] del start_counter[:CHUNK_SIZE], channel_data[:CHUNK_SIZE], time_data[:CHUNK_SIZE] del tdc_start_counter[:CHUNK_SIZE], voltage_data_tdc[:CHUNK_SIZE] del voltage_pulse_data_tdc[:CHUNK_SIZE], laser_pulse_data_tdc[:CHUNK_SIZE] if time.time() - start_time_loop > loop_time: loop_delay_counter += 1 loop_counter += 1 print("TDC process: for %s times loop time took longer than %s second" % (loop_delay_counter, loop_time), 'out of %s iteration' % loop_counter) variables.total_ions = events_detected variables.total_raw_signals = raw_signal_detected print("TDC Measurement stopped") if chunk_id > 0 and len(xx) > 0: chunk_id += 1 chunk_data = { "x_bin": xx_list_bin[:CHUNK_SIZE], "x": xx[:CHUNK_SIZE], "y_bin": yy_list_bin[:CHUNK_SIZE], "y": yy[:CHUNK_SIZE], "t_bin": tt_list_bin[:CHUNK_SIZE], "t": tt[:CHUNK_SIZE], "voltage": voltage_data[:CHUNK_SIZE], "voltage_pulse": voltage_pulse_data[:CHUNK_SIZE], "laser_pulse": laser_pulse_data[:CHUNK_SIZE], "start_counter": start_counter[:CHUNK_SIZE], "channel": channel_data[:CHUNK_SIZE], "time": time_data[:CHUNK_SIZE], "tdc_start_counter": tdc_start_counter[:CHUNK_SIZE], "voltage_tdc": voltage_data_tdc[:CHUNK_SIZE], "voltage_pulse_tdc": voltage_pulse_data_tdc[:CHUNK_SIZE], "laser_pulse_tdc": laser_pulse_data_tdc[:CHUNK_SIZE], } save_queue.put((chunk_id, path, chunk_data)) save_queue.put(None) save_process.join() if chunk_id == 0: np.save(variables.path + "/temp_data/x.npy", np.array(xx)) np.save(variables.path + "/temp_data/y.npy", np.array(yy)) np.save(variables.path + "/temp_data/t.npy", np.array(tt)) np.save(variables.path + "/temp_data/voltage.npy", np.array(voltage_data)) np.save(variables.path + "/temp_data/voltage_pulse.npy", np.array(voltage_pulse_data)) np.save(variables.path + "/temp_data/laser_pulse.npy", np.array(laser_pulse_data)) np.save(variables.path + "/temp_data/start_counter.npy", np.array(start_counter)) np.save(variables.path + "/temp_data/x_bin.npy", np.array(xx_list_bin)) np.save(variables.path + "/temp_data/y_bin.npy", np.array(yy_list_bin)) np.save(variables.path + "/temp_data/t_bin.npy", np.array(tt_list_bin)) np.save(variables.path + "/temp_data/channel.npy", np.array(channel_data)) np.save(variables.path + "/temp_data/time.npy", np.array(time_data)) np.save(variables.path + "/temp_data/main_raw_counter.npy", np.array(tdc_start_counter)) np.save(variables.path + "/temp_data/voltage_tdc.npy", np.array(voltage_data_tdc)) np.save(variables.path + "/temp_data/voltage_pulse_tdc.npy", np.array(voltage_pulse_data_tdc)) np.save(variables.path + "/temp_data/laser_pulse_tdc.npy", np.array(laser_pulse_data_tdc)) variables.extend_to('x', xx) variables.extend_to('y', yy) variables.extend_to('t', tt) variables.extend_to('dld_start_counter', start_counter) variables.extend_to('main_v_dc_dld', voltage_data) variables.extend_to('main_v_p_dld', voltage_pulse_data) variables.extend_to('main_l_p_dld', laser_pulse_data) variables.extend_to('channel', channel_data) variables.extend_to('time_data', time_data) variables.extend_to('tdc_start_counter', tdc_start_counter) variables.extend_to('main_v_dc_tdc', voltage_data_tdc) variables.extend_to('main_v_p_tdc', voltage_pulse_data_tdc) variables.extend_to('main_l_p_tdc', laser_pulse_data_tdc) print("data save in share variables") time.sleep(0.1) bufdatacb.close() bufdatacb_raw.close() device.deinitialize() variables.flag_finished_tdc = True return 0
[docs] def experiment_measure(variables, x_plot, y_plot, t_plot, main_v_dc_plot, stop_event): run_experiment_measure(variables, x_plot, y_plot, t_plot, main_v_dc_plot, stop_event)