"""Data loading and persistence helpers for calibration workflows."""
from __future__ import annotations
import os
from pathlib import Path
import warnings
import ast
import re
import h5py
import numpy as np
import pandas as pd
import scipy.io
from pyccapt.calibration.data_tools import ato_tools
from pyccapt.calibration.leap_tools import ccapt_tools, leap_tools
from pyccapt.calibration.mc import tof_tools
def _as_path(path_value: str | Path) -> Path:
"""Return `path_value` as a normalized filesystem path."""
return Path(path_value).expanduser()
def _resolve_variable_output_file(variables, *, filename: str, data_directory: bool) -> str:
"""Build an output file path from shared variables with compatibility fallbacks."""
resolver_name = "resolve_result_data_file" if data_directory else "resolve_result_file"
resolver = getattr(variables, resolver_name, None)
if callable(resolver):
return resolver(filename)
base_attr = "result_data_path" if data_directory else "result_path"
base_path = str(getattr(variables, base_attr, "")).strip()
if base_path:
return str(_as_path(base_path) / filename)
return filename
[docs]
def read_hdf5(filename: str | Path):
"""
Read non-pandas HDF5 content into a dictionary of dataframes.
Returns:
dict[str, pd.DataFrame] | None
"""
file_path = _as_path(filename)
try:
dataframe_storage: dict[str, pd.DataFrame] = {}
group_dict: dict[str, list[str]] = {}
with h5py.File(file_path, "r") as hdf:
groups = list(hdf.keys())
for item in groups:
group_dict[item] = list(hdf[item].keys())
for key, value in group_dict.items():
for item in value:
dataset = pd.DataFrame(np.array(hdf[f"{key}/{item}"]), columns=["values"])
dataframe_storage[f"{key}/{item}"] = dataset
return dataframe_storage
except FileNotFoundError:
print("[*] HDF5 File could not be found")
except IndexError:
print("[*] No Group keys could be found in HDF5 File")
return None
[docs]
def read_range(filename: str | Path) -> pd.DataFrame:
"""Read saved range definitions from `.h5`, `.rrng`, or legacy `.rng` files."""
file_path = _as_path(filename)
try:
suffix = file_path.suffix.lower()
if suffix == ".h5":
return _normalize_range_dataframe(pd.read_hdf(file_path, mode="r"))
if suffix == ".rrng":
return _normalize_range_dataframe(leap_tools.read_rrng(str(file_path)))
if suffix == ".rng":
return _normalize_range_dataframe(leap_tools.read_rng(str(file_path)))
raise ValueError(f"Unsupported range file extension: {file_path.suffix!r}")
except FileNotFoundError as error:
print("[*] Range file could not be found")
print(error)
raise
def _normalize_range_dataframe(range_df: pd.DataFrame) -> pd.DataFrame:
"""Backfill legacy range schema fields required by modern visualization helpers."""
if not isinstance(range_df, pd.DataFrame):
return range_df
def _coerce_list(value):
if isinstance(value, (list, tuple, np.ndarray)):
return list(value)
text = str(value).strip()
if not text:
return []
if text.startswith("[") and text.endswith("]"):
try:
parsed = ast.literal_eval(text)
if isinstance(parsed, (list, tuple, np.ndarray)):
return [str(item).strip() for item in parsed]
except (ValueError, SyntaxError):
pass
return [text]
def _ion_to_neutral_formula(ion_label):
text = str(ion_label).strip()
if not text:
return ""
text = text.replace("$", "")
text = re.sub(r"_\{([^}]*)\}", r"\1", text)
text = re.sub(r"\^\{[^}]*\}", "", text)
text = text.replace("{", "").replace("}", "")
text = re.sub(r"\s*[+-]+\s*$", "", text)
text = re.sub(r"\s*\d*[+-]+\s*$", "", text)
return text.strip()
def _derive_neutral_name(row):
elements = _coerce_list(row.get("element", []))
complexes = _coerce_list(row.get("complex", []))
if elements:
formula_parts = []
for index, element in enumerate(elements):
stoich = 1
if index < len(complexes):
try:
stoich = int(float(complexes[index]))
except (TypeError, ValueError):
stoich = 1
suffix = "" if stoich <= 1 else str(stoich)
formula_parts.append(f"{element}{suffix}")
formula = "".join(formula_parts).strip()
if formula:
return formula
if "ion" in row:
neutral = _ion_to_neutral_formula(row.get("ion", ""))
if neutral:
return neutral
if "ion_name" in row:
neutral = _ion_to_neutral_formula(row.get("ion_name", ""))
if neutral:
return neutral
return ""
normalized = range_df.copy()
derived_names = normalized.apply(_derive_neutral_name, axis=1)
if "name" not in normalized.columns:
normalized["name"] = derived_names
else:
raw_names = normalized["name"].astype(str).replace("", np.nan)
looks_charged = raw_names.fillna("").str.contains(r"[\^$]|\+|-", regex=True)
normalized["name"] = raw_names
normalized.loc[looks_charged, "name"] = derived_names[looks_charged]
normalized["name"] = normalized["name"].fillna(derived_names)
normalized["name"] = normalized["name"].astype(str).replace("", np.nan)
normalized.loc[normalized["name"].str.lower() == "nan", "name"] = np.nan
placeholder_names = pd.Series(
[f"range_{index}" for index in range(len(normalized))],
index=normalized.index,
)
normalized["name"] = normalized["name"].fillna(placeholder_names)
return normalized
[docs]
def read_mat_files(filename: str | Path):
"""Read a `.mat` file and return its dictionary contents."""
file_path = _as_path(filename)
try:
return scipy.io.loadmat(file_path)
except FileNotFoundError:
print("[*] Mat File could not be found")
return None
[docs]
def convert_mat_to_df(hdf5_file_response: dict) -> pd.DataFrame:
"""Convert loaded `.mat` content to a dataframe and persist it as HDF5."""
if hdf5_file_response is None or "None" not in hdf5_file_response:
raise ValueError("Expected key 'None' in .mat content")
pd_dataframe = pd.DataFrame(hdf5_file_response["None"])
store_df_to_hdf(pd_dataframe, "dataframe/isotope", "isotopeTable.h5")
return pd_dataframe
[docs]
def store_df_to_hdf(dataframe, key, filename):
"""
Store dataframe to HDF5.
Supports both modern argument order `(dataframe, key, filename)` and the
legacy order `(filename, dataframe, key)` for backwards compatibility.
"""
if isinstance(dataframe, (str, Path)):
# Legacy order: (filename, dataframe, key)
filename, dataframe, key = dataframe, key, filename
if not isinstance(dataframe, pd.DataFrame):
raise TypeError("dataframe must be a pandas.DataFrame")
file_path = _as_path(filename)
file_path.parent.mkdir(parents=True, exist_ok=True)
dataframe.to_hdf(file_path, key=str(key), mode="w")
[docs]
def store_df_to_csv(data: pd.DataFrame, path: str | Path) -> None:
"""Store dataframe to CSV with project-default encoding and separator."""
file_path = _as_path(path)
file_path.parent.mkdir(parents=True, exist_ok=True)
data.to_csv(file_path, encoding="utf-8", index=False, sep=";")
[docs]
def remove_invalid_data(dld_group_storage: pd.DataFrame, max_tof: float) -> pd.DataFrame:
"""Remove invalid TOF and detector rows and return the cleaned dataframe."""
mask_1 = dld_group_storage["t (ns)"].to_numpy() > max_tof
mask_2 = dld_group_storage["t (ns)"].to_numpy() < 50
mask_3 = (
(dld_group_storage["x_det (cm)"].to_numpy() == 0)
& (dld_group_storage["y_det (cm)"].to_numpy() == 0)
& (dld_group_storage["t (ns)"].to_numpy() == 0)
)
mask_4 = dld_group_storage["high_voltage (V)"].to_numpy() < 0
mask_5 = (
(dld_group_storage["x_det (cm)"].to_numpy() == 0)
& (dld_group_storage["y_det (cm)"].to_numpy() == 0)
)
mask_f_1 = np.logical_or(mask_1, mask_2)
mask_f_2 = np.logical_or(mask_3, mask_4)
mask_f_2 = np.logical_or(mask_f_2, mask_5)
mask = np.logical_or(mask_f_1, mask_f_2)
num_over_max_tof = int(np.count_nonzero(mask))
dld_group_storage.drop(np.where(mask)[0], inplace=True)
dld_group_storage.reset_index(inplace=True, drop=True)
print("The number of data that is removed:", num_over_max_tof)
return dld_group_storage
def _slice_data_for_export(
data: pd.DataFrame,
start_index: int | None = None,
end_index: int | None = None,
) -> pd.DataFrame:
"""Return an index-sliced copy for export using inclusive end semantics."""
if not isinstance(data, pd.DataFrame):
raise TypeError("data must be a pandas.DataFrame")
if start_index in (None, "") and end_index in (None, ""):
return data
total_rows = len(data)
if total_rows == 0:
raise ValueError("Cannot slice an empty dataset for export")
start = 0 if start_index in (None, "") else int(start_index)
end = (total_rows - 1) if end_index in (None, "", -1) else int(end_index)
if start < 0 or start >= total_rows:
raise ValueError(f"start_index must be between 0 and {total_rows - 1}")
if end == -1:
end = total_rows - 1
if end < 0 or end >= total_rows:
raise ValueError(f"end_index must be between 0 and {total_rows - 1}, or -1 for the last row")
if end < start:
raise ValueError("end_index must be greater than or equal to start_index")
return data.iloc[start:end + 1].reset_index(drop=True).copy()
[docs]
def save_data(
data,
variables,
name=None,
hdf=True,
epos=False,
pos=False,
ato_6v=False,
csv=False,
temp=False,
start_index=0,
end_index=-1,
save_tdc=False,
save_range=False,
):
"""Persist data in one or more supported export formats.
Parameters
----------
save_tdc : bool, default False
If True and ``hdf=True`` and ``variables.data_tdc`` was loaded, also
write a filtered ``/tdc`` group into the same HDF5 file. The tdc rows
are filtered so that only those whose linked dld row still exists in
``data`` are kept; "orphan" tdc rows (pulses that never produced a dld
event in the first place) are always preserved.
save_range : bool, default False
If True and ``hdf=True`` and ``variables.range_data`` is non-empty,
also write the current range table under ``/range`` so the calibrated,
raw, and ranging information all live in a single h5 file.
"""
if name is not None:
data_name = name
elif temp:
data_name = f"{variables.result_data_name}_temp"
else:
data_name = variables.result_data_name
export_data = _slice_data_for_export(data, start_index=start_index, end_index=end_index)
if not any([hdf, epos, pos, ato_6v, csv]):
print("No export format was selected.")
return None
saved_outputs: dict[str, str] = {}
if hdf:
output_h5 = _resolve_variable_output_file(variables, filename=f"{data_name}.h5", data_directory=True)
store_df_to_hdf(export_data, "df", output_h5)
saved_outputs["hdf"] = output_h5
if save_tdc:
tdc_full = getattr(variables, "data_tdc", None)
if tdc_full is None:
warnings.warn(
"save_tdc=True but variables.data_tdc is None; no raw tdc to save.",
RuntimeWarning,
stacklevel=2,
)
elif "event_group_id" not in export_data.columns:
warnings.warn(
"save_tdc=True but exported dld has no event_group_id column; "
"raw tdc was not loaded with the linking flag enabled.",
RuntimeWarning,
stacklevel=2,
)
else:
from pyccapt.calibration.data_tools.data_loadcrop import filter_tdc_by_dld
tdc_filtered = filter_tdc_by_dld(export_data, tdc_full)
# Append /tdc to the same h5 file (mode='a' to preserve /df).
tdc_filtered.to_hdf(_as_path(output_h5), key="tdc", mode="a")
saved_outputs["hdf_tdc"] = f"{output_h5} (/tdc, {len(tdc_filtered)} rows)"
if save_range:
range_table = getattr(variables, "range_data", None)
if range_table is None or len(range_table) == 0:
warnings.warn(
"save_range=True but variables.range_data is empty.",
RuntimeWarning,
stacklevel=2,
)
else:
range_table.to_hdf(_as_path(output_h5), key="range", mode="a")
saved_outputs["hdf_range"] = f"{output_h5} (/range, {len(range_table)} rows)"
if epos:
output_epos = _resolve_variable_output_file(variables, filename=f"{data_name}.epos", data_directory=True)
output_epos_path = _as_path(output_epos)
output_epos_path.parent.mkdir(parents=True, exist_ok=True)
ccapt_tools.ccapt_to_epos(
export_data,
path=str(output_epos_path.parent) + os.sep,
name=output_epos_path.name,
)
saved_outputs["epos"] = str(output_epos_path)
if pos:
output_pos = _resolve_variable_output_file(variables, filename=f"{data_name}.pos", data_directory=True)
output_pos_path = _as_path(output_pos)
output_pos_path.parent.mkdir(parents=True, exist_ok=True)
ccapt_tools.ccapt_to_pos(
export_data,
path=str(output_pos_path.parent) + os.sep,
name=output_pos_path.name,
)
saved_outputs["pos"] = str(output_pos_path)
if ato_6v:
output_ato = _resolve_variable_output_file(variables, filename=f"{data_name}.ato", data_directory=True)
output_ato_path = _as_path(output_ato)
output_ato_path.parent.mkdir(parents=True, exist_ok=True)
ato_tools.ccapt_to_ato(
export_data,
path=str(output_ato_path.parent),
name=output_ato_path.name,
)
saved_outputs["ato"] = str(output_ato_path)
if csv:
output_csv = _resolve_variable_output_file(
variables, filename=f"{data_name}.csv", data_directory=True
)
store_df_to_csv(export_data, output_csv)
saved_outputs["csv"] = output_csv
for format_name, output_path in saved_outputs.items():
print(f"Saved {format_name.upper()}: {output_path}")
return None
[docs]
def load_data(dataset_path, data_type, mode="processed", *, load_tdc=False, tdc_extract_mode="tdc_sc"):
"""Load supported dataset formats into the pyccapt dataframe convention.
When ``load_tdc=True`` (only supported for ``data_type='pyccapt'`` in
``mode='raw'``), also reads the ``/tdc`` group and returns
``(dld_df, tdc_df)``. Both dataframes carry a shared ``event_group_id``
column so the dld→tdc link survives downstream cropping.
"""
if data_type in {"leap_pos", "leap_epos"}:
if load_tdc:
raise ValueError("load_tdc is only supported for pyccapt h5 datasets")
if data_type == "leap_epos":
return ccapt_tools.epos_to_ccapt(dataset_path)
print("The dataset should contains at least epos information to use all possible analysis")
return ccapt_tools.pos_to_ccapt(dataset_path)
if data_type == "leap_apt":
if load_tdc:
raise ValueError("load_tdc is only supported for pyccapt h5 datasets")
return ccapt_tools.apt_to_ccapt(dataset_path)
if data_type == "ato_v6":
if load_tdc:
raise ValueError("load_tdc is only supported for pyccapt h5 datasets")
return ato_tools.ato_to_ccapt(dataset_path, mode="pyccapt")
if data_type == "pyccapt":
if mode == "raw":
from pyccapt.calibration.data_tools import data_loadcrop
if load_tdc:
return data_loadcrop.fetch_dataset_with_tdc(
dataset_path, tdc_extract_mode=tdc_extract_mode
)
return data_loadcrop.fetch_dataset_from_dld_grp(dataset_path)
if mode == "processed":
data = pd.read_hdf(dataset_path, key="df", mode="r")
if "pulse_v (V)" not in data.columns:
if "pulse" in data.columns:
insert_at = data.columns.get_loc("pulse") + 1
data.insert(insert_at, "pulse_v (V)", data["pulse"])
data.drop("pulse", axis=1, inplace=True)
else:
warnings.warn(
"Main dataframe does not have 'pulse' or 'pulse_v (V)'; using zeros for pulse_v (V).",
RuntimeWarning,
stacklevel=2,
)
pulse_v = np.zeros(len(data), dtype=float)
insert_at = data.columns.get_loc("high_voltage (V)") + 1 if "high_voltage (V)" in data.columns else len(data.columns)
data.insert(insert_at, "pulse_v (V)", pulse_v)
if "pulse_l (pJ)" not in data.columns:
pulse_l = np.zeros(len(data), dtype=float)
data.insert(data.columns.get_loc("pulse_v (V)") + 1, "pulse_l (pJ)", pulse_l)
if load_tdc:
try:
tdc_df = pd.read_hdf(dataset_path, key="tdc", mode="r")
except (KeyError, ValueError):
warnings.warn(
f"load_tdc=True but {dataset_path!r} has no /tdc group. "
"Returning dld only.",
RuntimeWarning,
stacklevel=2,
)
return data
return data, tdc_df
return data
raise ValueError(f"Unsupported mode for data_type='pyccapt': {mode!r}")
raise ValueError(f"Unsupported data_type: {data_type!r}")
[docs]
def pyccapt_raw_to_processed(data):
"""Convert a raw pyccapt dataframe to the processed schema."""
data_processed = pd.DataFrame()
data_processed["x (nm)"] = np.zeros(len(data))
data_processed["y (nm)"] = np.zeros(len(data))
data_processed["z (nm)"] = np.zeros(len(data))
data_processed["mc (Da)"] = np.zeros(len(data))
data_processed["mc_uc (Da)"] = np.zeros(len(data))
data_processed["high_voltage (V)"] = data["high_voltage (V)"].to_numpy()
data_processed["pulse_v (V)"] = data["pulse_v (V)"].to_numpy()
data_processed["pulse_l (pJ)"] = data["pulse_l (pJ)"].to_numpy()
data_processed["t (ns)"] = data["t (ns)"].to_numpy()
data_processed["t_c (ns)"] = np.zeros(len(data))
data_processed["x_det (cm)"] = data["x_det (cm)"].to_numpy()
data_processed["y_det (cm)"] = data["y_det (cm)"].to_numpy()
data_processed["delta_p"] = np.zeros(len(data))
data_processed["multi"] = np.zeros(len(data))
data_processed["start_counter"] = data["start_counter"].to_numpy()
if "event_group_id" in data.columns:
data_processed["event_group_id"] = data["event_group_id"].to_numpy()
return data_processed
[docs]
def save_range(variables) -> None:
"""Save range data as both HDF5 and CSV."""
if getattr(variables, "range_data", None) is None or variables.range_data.empty:
print("No range table is loaded, so nothing was saved.")
return
h5_file = _resolve_variable_output_file(
variables,
filename=f"{variables.dataset_name}_range.h5",
data_directory=True,
)
csv_file = _resolve_variable_output_file(
variables,
filename=f"{variables.dataset_name}_range.csv",
data_directory=True,
)
store_df_to_hdf(variables.range_data, "df", h5_file)
store_df_to_csv(variables.range_data, csv_file)
print(f"Saved range HDF: {h5_file}")
print(f"Saved range CSV: {csv_file}")