from pathlib import Path
import numpy as np
from pyccapt.calibration.data_tools import data_tools
from pyccapt.calibration.mc import tof_tools
[docs]
def load_data(dataset_path, max_mc, flightPathLength, pulse_mode, tdc, variables, processing_mode=True,
load_tdc_raw=False):
"""Load a calibration dataset and stash it on ``variables``.
Parameters
----------
load_tdc_raw : bool, default False
If True and ``tdc == 'pyccapt'``, also load the raw ``/tdc`` group from
the h5 file. The dld and tdc dataframes are linked via a shared
``event_group_id`` column so dld filtering decisions can later be
propagated to tdc at save time. Stored on ``variables.data_tdc``.
"""
if tdc == 'pyccapt':
# Check that the dataset is a valid pyccapt dataset with .h5 extension
if not dataset_path.endswith(('.h5', '.H5')):
raise ValueError('The dataset should be a valid pyccapt dataset with .h5 extension')
elif tdc == 'leap_epos':
# Check that the dataset is a valid leap_epos dataset with .epos extension
if not dataset_path.endswith(('.epos', '.EPOS')):
raise ValueError('The dataset should be a valid leap_epos dataset with .epos extension')
elif tdc in {'pos', 'leap_pos'}:
# Check that the dataset is a valid pos dataset with .pos extension
if not dataset_path.endswith(('.pos', '.POS')):
raise ValueError('The dataset should be a valid pos dataset with .pos extension')
elif tdc == 'leap_apt':
# Check that the dataset is a valid leap_apt dataset with .apt extension
if not dataset_path.endswith(('.apt', '.APT')):
raise ValueError('The dataset should be a valid leap_apt dataset with .apt extension')
elif tdc == 'ato_v6':
# Check that the dataset is a valid ato_v6 dataset with .ato extension
if not dataset_path.endswith(('.ato', '.ATO')):
raise ValueError('The dataset should be a valid ato_v6 dataset with .ato extension')
if processing_mode:
# Calculate the maximum possible time of flight (TOF)
max_tof = int(tof_tools.mc2tof(max_mc, 1000, 0, 0, flightPathLength))
print('The maximum possible TOF is:', max_tof, 'ns')
print('=============================')
variables.pulse_mode = pulse_mode
dataset_path_obj = Path(dataset_path)
variables.dataset_name = dataset_path_obj.stem
output_dir = dataset_path_obj.parent / variables.dataset_name / 'data_processing'
variables.set_result_data_directory(output_dir)
variables.result_data_name = variables.dataset_name
variables.set_result_directory(output_dir)
print('The data will be saved on the path:', variables.result_data_path)
print('=============================')
print('The dataset name after saving is:', variables.result_data_name)
print('=============================')
print('The figures will be saved on the path:', variables.result_path)
print('=============================')
# Create data frame out of hdf5 file dataset
tdc_df = None
if tdc == 'pyccapt':
try:
loaded = data_tools.load_data(dataset_path, tdc, mode='raw', load_tdc=load_tdc_raw)
if isinstance(loaded, tuple):
dld_group_storage, tdc_df = loaded
else:
dld_group_storage = loaded
print('The data is loaded in raw mode' + (' (with raw tdc)' if tdc_df is not None else ''))
mode = 'raw'
except Exception:
loaded = data_tools.load_data(dataset_path, tdc, mode='processed', load_tdc=load_tdc_raw)
if isinstance(loaded, tuple):
dld_group_storage, tdc_df = loaded
else:
dld_group_storage = loaded
print('The data is loaded in processed mode' + (' (with raw tdc)' if tdc_df is not None else ''))
if 'x (nm)' not in dld_group_storage:
mode = 'raw'
else:
mode = 'processed'
else:
if load_tdc_raw:
print('Note: load_tdc_raw is only supported for pyccapt h5 datasets; ignoring.')
load_data_type = 'leap_pos' if tdc in {'pos', 'leap_pos'} else tdc
dld_group_storage = data_tools.load_data(dataset_path, load_data_type)
if tdc == 'pyccapt' and mode == 'raw':
data = data_tools.remove_invalid_data(dld_group_storage, max_tof)
data = data_tools.pyccapt_raw_to_processed(data)
else:
data = dld_group_storage
elif not processing_mode:
tdc_df = None
max_tof = int(tof_tools.mc2tof(max_mc, 1000, 0, 0, flightPathLength))
variables.pulse_mode = pulse_mode
dataset_path_obj = Path(dataset_path)
variables.dataset_name = dataset_path_obj.stem
output_dir = dataset_path_obj.parent / variables.dataset_name / 'visualization'
variables.set_result_data_directory(output_dir)
variables.result_data_name = variables.dataset_name
variables.set_result_directory(output_dir)
print('The data will be saved on the path:', variables.result_data_path)
print('=============================')
print('The dataset name after saving is:', variables.result_data_name)
print('=============================')
print('The figures will be saved on the path:', variables.result_path)
print('=============================')
# Create data frame out of hdf5 file dataset
if tdc == 'pyccapt':
loaded = data_tools.load_data(dataset_path, tdc, mode='processed', load_tdc=load_tdc_raw)
if isinstance(loaded, tuple):
data, tdc_df = loaded
else:
data = loaded
else:
if load_tdc_raw:
print('Note: load_tdc_raw is only supported for pyccapt h5 datasets; ignoring.')
load_data_type = 'leap_pos' if tdc in {'pos', 'leap_pos'} else tdc
data = data_tools.load_data(dataset_path, load_data_type)
print('Total number of Ions:', len(data))
if tdc_df is not None:
print('Loaded raw tdc rows:', len(tdc_df))
variables.data = data
variables.data_tdc = tdc_df
variables.data_tdc_backup = tdc_df.copy() if tdc_df is not None else None
variables.max_mc = max_mc
variables.max_tof = max_tof
variables.flight_path_length = flightPathLength
variables.pulse_mode = pulse_mode
variables.sync_from_data(update_backups=True)
[docs]
def add_columns(variables, max_mc):
if 'x (nm)' not in variables.data:
variables.data.insert(0, 'x (nm)', np.zeros(len(variables.dld_t)))
if 'y (nm)' not in variables.data:
variables.data.insert(1, 'y (nm)', np.zeros(len(variables.dld_t)))
if 'z (nm)' not in variables.data:
variables.data.insert(2, 'z (nm)', np.zeros(len(variables.dld_t)))
if 'mc (Da)' not in variables.data:
variables.data.insert(4, 'mc (Da)', np.zeros(len(variables.dld_t)))
if 'mc_uc (Da)' not in variables.data:
variables.data.insert(5, 'mc_uc (Da)', variables.mc_uc)
else:
variables.data['mc_uc (Da)'] = variables.mc_uc
if 't_c (ns)' not in variables.data:
variables.data.insert(8, 't_c (ns)', np.zeros(len(variables.dld_t)))
# Remove the data with mc biger than max mc
mask = (variables.data['mc (Da)'].to_numpy() > max_mc.value)
print('The number of data over max_mc:', len(mask[mask == True]))
variables.data.drop(np.where(mask)[0], inplace=True)
variables.data.reset_index(inplace=True, drop=True)
# Remove the data with x,y,t = 0
mask1 = (variables.data['x (nm)'].to_numpy() == 0)
mask2 = (variables.data['y (nm)'].to_numpy() == 0)
mask3 = (variables.data['t (ns)'].to_numpy() == 0)
mask = np.logical_and(mask1, mask2)
mask = np.logical_and(mask, mask3)
print('The number of data with having t, x, and y equal to zero is:', len(mask[mask == True]))
variables.data.drop(np.where(mask)[0], inplace=True)
variables.data.reset_index(inplace=True, drop=True)
variables.data_backup = variables.data.copy()
[docs]
def load_calibrated_h5(dataset_path, variables, *, range_path=None):
"""Load a PyCCAPT ``.h5`` file for raw-data analysis.
Two file layouts are supported:
1. **Calibrated bundle** (preferred) — produced by
``data_tools.save_data(..., save_tdc=True, save_range=True)``. Contains
``/df`` (calibrated dld), and optionally ``/tdc`` (raw delay-line
timestamps linked via ``event_group_id``) and ``/range`` (identified
ion windows).
2. **Pure raw acquisition** — the file as written by the control software:
``/dld`` and ``/tdc`` groups (no ``/df``, no ``/range``). The dld
records are converted to the processed dataframe schema in memory, and
the linked raw tdc rows are kept on ``variables.data_tdc``.
Older datasets that store the range table in a separate
``<dataset>_range.h5`` file are also supported via ``range_path``.
Populates ``variables.data``, ``variables.data_tdc``, ``variables.range_data``,
and the standard backup fields.
"""
import pandas as pd # local import to keep optional Jupyter loaders fast
from pyccapt.calibration.data_tools import data_loadcrop, data_tools
dataset_path_obj = Path(dataset_path)
if not dataset_path_obj.is_file():
raise FileNotFoundError(f"Dataset file not found: {dataset_path}")
variables.dataset_name = dataset_path_obj.stem
output_dir = dataset_path_obj.parent / variables.dataset_name / 'raw_analysis'
variables.set_result_data_directory(output_dir)
variables.set_result_directory(output_dir)
variables.result_data_name = variables.dataset_name
# First try the calibrated bundle layout (/df). If that fails because the
# file only has the raw acquisition groups, fall back to raw mode.
dld_df = None
tdc_df = None
source = "calibrated"
try:
loaded = data_tools.load_data(
str(dataset_path_obj), 'pyccapt', mode='processed', load_tdc=True
)
if isinstance(loaded, tuple):
dld_df, tdc_df = loaded
else:
dld_df, tdc_df = loaded, None
except (KeyError, ValueError) as calibrated_error:
# `pd.read_hdf(..., key='df')` raises ValueError when /df is missing.
# Fall back to raw acquisition layout: /dld + /tdc.
try:
dld_df, tdc_df = data_loadcrop.fetch_dataset_with_tdc(str(dataset_path_obj))
except Exception as raw_error:
raise ValueError(
f"Could not load {dataset_path!r}: not a calibrated bundle "
f"({calibrated_error}) and not a recognizable raw acquisition file "
f"({raw_error})."
) from raw_error
# Apply the standard raw -> processed pipeline so downstream analyses
# see the same dataframe schema as a calibrated load.
dld_df = data_tools.remove_invalid_data(dld_df, max_tof=100000)
dld_df = data_tools.pyccapt_raw_to_processed(dld_df)
source = "raw"
variables.data = dld_df
variables.data_backup = dld_df.copy()
variables.data_tdc = tdc_df
variables.data_tdc_backup = tdc_df.copy() if tdc_df is not None else None
# Range table: prefer /range in the same h5, then explicit range_path,
# then fall back to <dataset>_range.h5 next to the file.
range_df = None
try:
range_df = pd.read_hdf(str(dataset_path_obj), key='range', mode='r')
except (KeyError, ValueError):
range_df = None
if range_df is None and range_path:
range_df = data_tools.read_range(range_path)
if range_df is None:
sibling = dataset_path_obj.with_name(f"{dataset_path_obj.stem}_range.h5")
if sibling.is_file():
range_df = data_tools.read_range(str(sibling))
if range_df is not None:
variables.range_data = range_df
variables.range_data_backup = range_df.copy()
print(f"Loaded {source} dld rows: {len(dld_df)}")
if tdc_df is not None:
print(f"Loaded raw tdc rows: {len(tdc_df)}")
if range_df is not None:
print(f"Loaded range rows: {len(range_df)}")
return dld_df, tdc_df, range_df