Source code for pyccapt.control.core.control_data_tool

import os

import h5py
import numpy as np


[docs] def rename_subcategory(hdf5_file_path, old_name, new_name): """ rename subcategory Args: hdf5_file_path: path to the hdf5 file old_name: old name of the subcategory new_name: new name of the subcategory Returns: None """ with h5py.File(hdf5_file_path, 'r+') as file: # data_x = file['dld/x'] # del file[new_name] # file.create_dataset(new_name, data=np.zeros(len(data_x)), dtype=np.int64) if old_name in file: file[new_name] = file[old_name] del file[old_name] print(f"Subcategory '{old_name}' renamed to '{new_name}'") else: print(f"Subcategory '{old_name}' not found in the HDF5 file.")
[docs] def correct_surface_concept_old_data(hdf5_file_path): """ correct surface concept old data Args: hdf5_file_path: path to the hdf5 file Returns: None """ # surface concept tdc specific binning and factors TOFFACTOR = 27.432 / (1000.0 * 4.0) # 27.432 ps/bin, tof in ns, data is TDC time sum DETBINS = 4900.0 BINNINGFAC = 2.0 XYFACTOR = 80.0 / DETBINS * BINNINGFAC # XXX mm/bin XYBINSHIFT = DETBINS / BINNINGFAC / 2.0 # to center detector with h5py.File(hdf5_file_path, 'r+') as file: data_x = file['dld/x'] data_y = file['dld/y'] data_t = file['dld/t'] data_x = np.array(data_x) data_y = np.array(data_y) data_t = np.array(data_t) modified_t = (data_t.astype(np.float64) * TOFFACTOR) del file['dld/t'] file.create_dataset('dld/t', data=modified_t, dtype=np.float64) modified_x = ((data_x.astype(np.float64) - XYBINSHIFT) * XYFACTOR) / 10.0 del file['dld/x'] file.create_dataset('dld/x', data=modified_x, dtype=np.float64) modified_y = ((data_y.astype(np.float64) - XYBINSHIFT) * XYFACTOR) / 10.0 del file['dld/y'] file.create_dataset('dld/y', data=modified_y, dtype=np.float64)
[docs] def copy_npy_to_hdf_surface_concept(path, hdf5_file_name): """ copy npy data to hdf5 file for surface concept TDC Args: path: path to the npy files hdf5_file_name: name of the hdf5 file Returns: None """ # TOFFACTOR = 27.432 / (1000 * 4) # 27.432 ps/bin, tof in ns, data is TDC time sum # DETBINS = 4900 # BINNINGFAC = 2 # XYFACTOR = 80 / DETBINS * BINNINGFAC # XXX mm/bin # XYBINSHIFT = DETBINS / BINNINGFAC / 2 # to center detector hdf5_file_path = path + hdf5_file_name high_voltage = np.load(path + 'voltage_data.npy') voltage_pulse = np.load(path + 'voltage_pulse_data.npy') laser_pulse = np.load(path + 'laser_pulse_data.npy') start_counter = np.load(path + 'start_counter.npy') t = np.load(path + 't_data.npy') x_det = np.load(path + 'x_data.npy') y_det = np.load(path + 'y_data.npy') channel = np.load(path + 'channel_data.npy') high_voltage_tdc = np.load(path + 'voltage_data_tdc.npy') voltage_pulse_tdc = np.load(path + 'voltage_pulse_data_tdc.npy') laser_pulse_tdc = np.load(path + 'laser_pulse_data_tdc.npy') start_counter_tdc = np.load(path + 'tdc_start_counter.npy') time_data = np.load(path + 'time_data.npy') # xx_tmp = (((x_det - XYBINSHIFT) * XYFACTOR) * 0.1) # from mm to in cm by dividing by 10 # yy_tmp = (((y_det - XYBINSHIFT) * XYFACTOR) * 0.1) # from mm to in cm by dividing by 10 # tt_tmp = (t * TOFFACTOR) # in ns with h5py.File(hdf5_file_path, 'r+') as file: del file['dld/t'] del file['dld/x'] del file['dld/y'] del file['dld/voltage_pulse'] del file['dld/laser_pulse'] del file['dld/high_voltage'] del file['dld/start_counter'] file.create_dataset('dld/t', data=t, dtype=np.float64) file.create_dataset('dld/x', data=x_det, dtype=np.float64) file.create_dataset('dld/y', data=y_det, dtype=np.float64) file.create_dataset('dld/voltage_pulse', data=voltage_pulse, dtype=np.float64) file.create_dataset('dld/laser_pulse', data=laser_pulse, dtype=np.float64) file.create_dataset('dld/high_voltage', data=high_voltage, dtype=np.float64) file.create_dataset('dld/start_counter', data=start_counter, dtype=np.uint64) del file['tdc/channel'] del file['tdc/high_voltage'] del file['tdc/voltage_pulse'] del file['tdc/laser_pulse'] del file['tdc/start_counter'] del file['tdc/time_data'] file.create_dataset('tdc/channel', data=channel, dtype=np.uint32) file.create_dataset('tdc/high_voltage', data=high_voltage_tdc, dtype=np.float64) file.create_dataset('tdc/voltage_pulse', data=voltage_pulse_tdc, dtype=np.float64) file.create_dataset('tdc/laser_pulse', data=laser_pulse_tdc, dtype=np.float64) file.create_dataset('tdc/start_counter', data=start_counter_tdc, dtype=np.uint64) file.create_dataset('tdc/time_data', data=time_data, dtype=np.uint64)
[docs] def load_and_copy_chunks_to_hdf(path, hdf5_file_path, chunk_id): with h5py.File(hdf5_file_path, 'r+') as hdf_file: # Delete existing datasets (if needed) for group in ['dld', 'tdc']: if group in hdf_file: for name in list(hdf_file[group].keys()): del hdf_file[f'{group}/{name}'] # Create empty datasets with appropriate shapes and dtypes def create_empty_dataset(group_name, chunk_name, dataset_name, dtype): total_size = 0 for i in range(1, chunk_id + 1): chunk_file = path + f"/{chunk_name}_chunk_{i}.npy" if os.path.exists(chunk_file): chunk_data = np.load(chunk_file) total_size += chunk_data.shape[0] if total_size > 0: hdf_file.create_dataset(f'{group_name}/{dataset_name}', (total_size,), dtype=dtype) print(f"Created empty dataset '{group_name}/{dataset_name}' with shape {total_size}.") create_empty_dataset('dld', 't', 't', np.float64) create_empty_dataset('dld', 'x', 'x', np.float64) create_empty_dataset('dld', 'y', 'y', np.float64) create_empty_dataset('dld', 'voltage_pulse', 'voltage_pulse', np.float64) create_empty_dataset('dld', 'laser_pulse', 'laser_pulse', np.float64) create_empty_dataset('dld', 'voltage', 'high_voltage', np.float64) create_empty_dataset('dld', 'start_counter', 'start_counter', np.uint64) create_empty_dataset('tdc', 'channel', 'channel', np.uint32) create_empty_dataset('tdc', 'voltage_tdc', 'high_voltage', np.float64) create_empty_dataset('tdc', 'voltage_pulse_tdc', 'voltage_pulse', np.float64) create_empty_dataset('tdc', 'laser_pulse_tdc', 'laser_pulse', np.float64) create_empty_dataset('tdc', 'tdc_start_counter', 'start_counter', np.uint64) create_empty_dataset('tdc', 'time', 'time', np.uint64) # Write data chunk by chunk def write_chunked_data(group_name, dataset_name, dataset_name_new): offset = 0 for i in range(1, chunk_id + 1): chunk_file = path + f"/{dataset_name_new}_chunk_{i}.npy" if os.path.exists(chunk_file): chunk_data = np.load(chunk_file) chunk_size = chunk_data.shape[0] hdf_file[f'{group_name}/{dataset_name}'][offset:offset + chunk_size] = chunk_data offset += chunk_size else: print(f"File '{chunk_file}' not found.") print(f"Written {dataset_name} data.") write_chunked_data('dld', 't', 't') write_chunked_data('dld', 'x', 'x') write_chunked_data('dld', 'y', 'y') write_chunked_data('dld', 'voltage_pulse', 'voltage_pulse') write_chunked_data('dld', 'laser_pulse', 'laser_pulse') write_chunked_data('dld', 'high_voltage', 'voltage') write_chunked_data('dld', 'start_counter', 'start_counter') write_chunked_data('tdc', 'channel', 'channel') write_chunked_data('tdc', 'high_voltage', 'voltage_tdc') write_chunked_data('tdc', 'voltage_pulse', 'voltage_pulse_tdc') write_chunked_data('tdc', 'start_counter', 'tdc_start_counter') write_chunked_data('tdc', 'time', 'time') write_chunked_data('tdd', 'laser_pulse', 'laser_pulse_tdc')
[docs] def crop_dataset_to_new_file(original_path, new_path, num_of_samples): """ Crop dataset and save to a new file. Args: original_path: Path to the original dataset. new_path: Path to save the cropped dataset. num_of_samples: Number of samples to keep. Returns: None """ with h5py.File(original_path, 'r') as original_file, h5py.File(new_path, 'w') as new_file: num_events = original_file['apt/num_events'] num_raw_signals = original_file['apt/num_raw_signals'] assert len(num_events) == len(num_raw_signals), "Length of num_events and num_raw_signals should be the same." count = 0 count_raw = 0 index = None index_event = None index_raw = None for i in range(len(num_events)): count += num_events[i] count_raw += num_raw_signals[i] if count > num_of_samples: index = i index_event = count index_raw = count_raw break if index is not None: # Copy cropped data to the new file for key in original_file['apt']: cropped_data = original_file['apt/%s' % key][:index + 1] new_file.create_dataset(f'apt/{key}', data=cropped_data, dtype=original_file['apt/%s' % key].dtype) for key in original_file['dld']: cropped_data = original_file['dld/%s' % key][:index_event + 1] new_file.create_dataset(f'dld/{key}', data=cropped_data, dtype=original_file['dld/%s' % key].dtype) for key in original_file['tdc']: cropped_data = original_file['tdc/%s' % key][:index_raw + 1] new_file.create_dataset(f'tdc/{key}', data=cropped_data, dtype=original_file['tdc/%s' % key].dtype) print("Cropped dataset written to the new file.") else: print("Number of samples requested exceeds the dataset size. No cropping performed.")
[docs] def concatenate_datasets(hdf5_file_path_1, hdf_file_path_2, index_2): """Concatenate datasets in nested HDF5 groups.""" with h5py.File(hdf5_file_path_1, 'r+') as file_1, h5py.File(hdf_file_path_2, 'r') as file_2: for group_key in file_1.keys(): print(f"Processing group: {group_key}") if group_key in file_2: for dataset_key in file_1[group_key].keys(): if dataset_key in file_2[group_key]: print(f" Concatenating dataset: {dataset_key}") try: data_1 = file_1[group_key][dataset_key][:] data_2 = file_2[group_key][dataset_key][index_2:] concatenated_data = np.concatenate((data_1, data_2)) del file_1[group_key][dataset_key] file_1[group_key].create_dataset(dataset_key, data=concatenated_data, dtype=data_1.dtype) except TypeError: print(f" {dataset_key} is not a dataset.") else: print(f" Dataset '{dataset_key}' not found in group '{group_key}' of file_2.") else: print(f" Group '{group_key}' not found in file_2.")
if __name__ == '__main__': name = '2469_Apr-16-2025_10-44_NiC9_6060_93min' path = 'T:/Ortner/03_APT/Oxcart/%s/' % name name = name + '.h5' # new_path = 'C:/Users/LokalAdmin/Downloads//%s' % 'cropped_' + name # name = '%s.h5' % name # # copy_npy_to_hdf(path, name) # # # rename_subcategory(path + name, old_name='dld', new_name='dld_1') # # copy_npy_to_hdf_surface_concept(path+'/temp_data/', name) # # rename_subcategory(path + name, old_name='tdc/voltage_laser', new_name='tdc/laser_pulse') load_and_copy_chunks_to_hdf(path + '/temp_data/chunks/', path + name, 900) #1273 # crop_dataset_to_new_file(path, new_path, 500000) # data_path_1 = 'E://2440-2441//2440_Mar-27-2025_09-15_NiC9_Pd_HPCF_4.h5' # data_path_2 = 'E://2440-2441//2441_Mar-27-2025_09-25_NiC9_Pd_HPCF_4_2.h5' # # data_path_1 = os.path.abspath(data_path_1) # data_path_2 = os.path.abspath(data_path_2) # concatenate_datasets(data_path_1, data_path_2, 1163) print('Done')