from __future__ import annotations
import os
from pathlib import Path
import numpy as np
import pandas as pd
from pyccapt.calibration.core.exceptions import CalibrationInputError, CalibrationStateError
from pyccapt.calibration.core.validation import CALIBRATION_MODES, ensure_choice
from pyccapt.calibration.path_utils import build_output_path, ensure_directory
[docs]
def get_project_path() -> str:
"""Return the project root path by walking parents until `setup.py` is found."""
current = Path(__file__).resolve()
for parent in [current.parent, *current.parents]:
if (parent / "setup.py").is_file():
return str(parent)
return str(current.parent)
def _create_default_range_data() -> pd.DataFrame:
"""Create the default unranged entry with stable column dtypes."""
range_data = pd.DataFrame(
{
"name": ["unranged0"],
"ion": ["un"],
"mass": [0.0],
"mc": [0.0],
"mc_low": [0.0],
"mc_up": [400.0],
"color": ["#000000"],
"element": [["unranged"]],
"complex": [[0]],
"isotope": [[0]],
"charge": [0],
}
)
return range_data.astype(
{
"name": "str",
"ion": "str",
"mass": "float64",
"mc": "float64",
"mc_low": "float64",
"mc_up": "float64",
"color": "str",
"element": "object",
"complex": "object",
"isotope": "object",
"charge": "uint32",
}
)
[docs]
class SharedVariablesBase:
"""Shared state helpers used by calibration and visualization workflows."""
_CALIBRATION_ATTR = {
"tof": "dld_t_calib",
"mc": "mc_calib",
}
[docs]
def get_calibration_array(self, calibration_mode: str) -> np.ndarray:
"""Return the selected calibrated array for a calibration mode."""
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
values = getattr(self, self._CALIBRATION_ATTR[mode])
values = np.asarray(values)
if values.size == 0:
raise CalibrationStateError(f"{self._CALIBRATION_ATTR[mode]!r} is empty")
return values
[docs]
def set_peak_range(self, left: float, right: float) -> None:
"""Set selected peak window after validating numeric order."""
try:
left_value = float(left)
right_value = float(right)
except (TypeError, ValueError) as exc:
raise CalibrationInputError("Peak range boundaries must be numeric") from exc
if right_value <= left_value:
raise CalibrationInputError(
f"Invalid peak range: left={left_value}, right={right_value}. "
"'right' must be greater than 'left'."
)
self.selected_x1 = left_value
self.selected_x2 = right_value
[docs]
def clear_peak_range(self) -> None:
"""Reset selected peak window to an unselected state."""
self.selected_x1 = 0
self.selected_x2 = 0
@staticmethod
def _coerce_peak_range(left: float, right: float) -> tuple[float, float]:
"""Validate and normalize numeric peak-range boundaries."""
try:
left_value = float(left)
right_value = float(right)
except (TypeError, ValueError) as exc:
raise CalibrationInputError("Peak range boundaries must be numeric") from exc
if right_value <= left_value:
raise CalibrationInputError(
f"Invalid peak range: left={left_value}, right={right_value}. "
"'right' must be greater than 'left'."
)
return left_value, right_value
[docs]
def set_calibration_peak_range(self, calibration_mode: str, left: float, right: float) -> tuple[float, float]:
"""Store a calibration-only copy of the active peak window for one mode."""
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
left_value, right_value = self._coerce_peak_range(left, right)
self.calibration_peak_ranges[mode] = (left_value, right_value)
return left_value, right_value
[docs]
def get_calibration_peak_range(self, calibration_mode: str) -> tuple[float, float]:
"""Return the active peak window for calibration, preferring the stored snapshot."""
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
stored = self.calibration_peak_ranges.get(mode)
if stored is not None:
return float(stored[0]), float(stored[1])
self.ensure_valid_peak_range()
return float(self.selected_x1), float(self.selected_x2)
[docs]
def clear_calibration_peak_range(self, calibration_mode: str | None = None) -> None:
"""Clear one or all stored calibration-only peak windows."""
if calibration_mode is None:
self.calibration_peak_ranges.clear()
return
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
self.calibration_peak_ranges.pop(mode, None)
[docs]
def ensure_valid_peak_range(self) -> None:
"""Validate that a peak range is selected and logically valid."""
if self.selected_x1 == 0 and self.selected_x2 == 0:
raise CalibrationStateError("No peak range selected")
if self.selected_x2 <= self.selected_x1:
raise CalibrationStateError(
f"Invalid peak range: selected_x1={self.selected_x1}, selected_x2={self.selected_x2}"
)
[docs]
def build_calibration_mask(self, calibration_mode: str) -> np.ndarray:
"""Build a boolean mask from the selected peak range and calibration mode."""
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
data = self.get_calibration_array(mode)
override = self.calibration_selection_masks.get(mode)
if override is not None:
override = np.asarray(override, dtype=bool)
if override.shape != data.shape:
raise CalibrationStateError(
f"Locked calibration mask shape {override.shape} does not match data shape {data.shape}"
)
if not np.any(override):
raise CalibrationStateError(f"Locked calibration mask is empty for calibration_mode={mode!r}")
return override.copy()
left, right = self.get_calibration_peak_range(mode)
mask = np.logical_and(data > left, data < right)
if not np.any(mask):
raise CalibrationStateError(
"Selected peak range does not include any ions for "
f"calibration_mode={calibration_mode!r}"
)
return mask
[docs]
def set_calibration_selection_mask(self, calibration_mode: str, mask: np.ndarray) -> None:
"""Lock a boolean ion-selection mask for repeated calibration steps."""
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
data = self.get_calibration_array(mode)
locked_mask = np.asarray(mask, dtype=bool)
if locked_mask.shape != data.shape:
raise CalibrationInputError(
f"Mask shape {locked_mask.shape} does not match calibration data shape {data.shape}"
)
if not np.any(locked_mask):
raise CalibrationInputError("Locked calibration mask cannot be empty")
self.calibration_selection_masks[mode] = locked_mask.copy()
[docs]
def clear_calibration_selection_mask(self, calibration_mode: str | None = None) -> None:
"""Clear one or all locked calibration ion masks."""
if calibration_mode is None:
self.calibration_selection_masks.clear()
return
mode = ensure_choice(calibration_mode, field_name="calibration_mode", allowed=CALIBRATION_MODES)
self.calibration_selection_masks.pop(mode, None)
@staticmethod
def _column_or_zeros(dataframe: pd.DataFrame, column: str, *, like: str | None = None) -> np.ndarray:
"""Return a dataframe column as ndarray, or zeros matched to another column length."""
if column in dataframe.columns:
return dataframe[column].to_numpy()
if like is not None and like in dataframe.columns:
return np.zeros(len(dataframe[like].to_numpy()))
return np.zeros(len(dataframe))
[docs]
def sync_from_data(self, data: pd.DataFrame | None = None, *, update_backups: bool = False,
clear_selection: bool = True) -> pd.DataFrame:
"""Synchronize shared arrays from the current dataframe after crop/load/reset operations."""
frame = self.data if data is None else data
if frame is None:
raise CalibrationStateError("No dataframe is available to synchronize shared variables")
frame = frame.reset_index(drop=True).copy()
self.data = frame
if update_backups or self.data_backup is None:
self.data_backup = frame.copy()
self.dld_high_voltage = self._column_or_zeros(frame, "high_voltage (V)")
if "pulse_v (V)" in frame.columns:
self.dld_pulse_v = frame["pulse_v (V)"].to_numpy()
elif "pulse" in frame.columns:
self.dld_pulse_v = frame["pulse"].to_numpy()
else:
self.dld_pulse_v = np.zeros(len(frame))
self.dld_pulse_l = self._column_or_zeros(frame, "pulse_l (pJ)", like="high_voltage (V)")
self.dld_t = self._column_or_zeros(frame, "t (ns)")
self.dld_t_c = self._column_or_zeros(frame, "t_c (ns)", like="t (ns)")
self.dld_x_det = self._column_or_zeros(frame, "x_det (cm)", like="t (ns)")
self.dld_y_det = self._column_or_zeros(frame, "y_det (cm)", like="t (ns)")
self.mc = self._column_or_zeros(frame, "mc (Da)", like="t (ns)")
if "mc_uc (Da)" in frame.columns:
self.mc_uc = frame["mc_uc (Da)"].to_numpy()
else:
self.mc_uc = self.mc.copy()
self.dld_t_calib = self.dld_t.copy()
self.mc_calib = self.mc_uc.copy()
if update_backups or self.dld_t_calib_backup.shape != self.dld_t_calib.shape:
self.dld_t_calib_backup = self.dld_t_calib.copy()
if update_backups or self.mc_calib_backup.shape != self.mc_calib.shape:
self.mc_calib_backup = self.mc_calib.copy()
self.x = self._column_or_zeros(frame, "x (nm)", like="t (ns)")
self.y = self._column_or_zeros(frame, "y (nm)", like="t (ns)")
self.z = self._column_or_zeros(frame, "z (nm)", like="t (ns)")
self.mask = None
self.AptHistPlotter = None
self.peak_x = []
self.peak_y = []
self.peak_widths = []
self.x_hist = None
self.y_hist = None
self.h_line_pos = []
self.clear_calibration_selection_mask()
self.clear_calibration_peak_range()
if clear_selection:
self.clear_peak_range()
self.selected_x_fdm = 0
self.selected_y_fdm = 0
self.roi_fdm = 0
return frame
[docs]
def restore_data_from_backup(self) -> pd.DataFrame:
"""Restore the original dataframe snapshot and resynchronize shared arrays."""
if self.data_backup is None:
raise CalibrationStateError("No backup dataframe is available for restore")
return self.sync_from_data(self.data_backup.copy(), update_backups=False, clear_selection=True)
@staticmethod
def _dir_as_string(directory: str | Path) -> str:
"""Return a normalized directory string with a trailing separator."""
path = ensure_directory(directory)
path_str = str(path)
if not path_str.endswith((os.sep, "/", "\\")):
path_str = path_str + os.sep
return path_str
[docs]
def set_result_directory(self, directory: str | Path) -> str:
"""Set and return the normalized directory used for figures and plots."""
self.result_path = self._dir_as_string(directory)
return self.result_path
[docs]
def set_result_data_directory(self, directory: str | Path) -> str:
"""Set and return the normalized directory used for data exports."""
self.result_data_path = self._dir_as_string(directory)
return self.result_data_path
[docs]
def resolve_result_file(self, filename: str) -> str:
"""Resolve `filename` inside `result_path`."""
base = self.result_path or self.last_directory
return str(build_output_path(base, filename))
[docs]
def resolve_result_data_file(self, filename: str) -> str:
"""Resolve `filename` inside `result_data_path`."""
base = self.result_data_path or self.last_directory
return str(build_output_path(base, filename))
[docs]
class Variables(SharedVariablesBase):
"""Container for shared variables across calibration workflows."""
def __init__(self):
self.pulse_mode = None
self.selected_x_fdm = 0
self.selected_y_fdm = 0
self.roi_fdm = 0
self.selected_x1 = 0
self.selected_x2 = 0
self.selected_y1 = 0
self.selected_y2 = 0
self.selected_z1 = 0
self.selected_z2 = 0
self.h_line_pos = []
self.list_material = []
self.charge = []
self.element = []
self.isotope = []
self.peaks_x_selected = []
self.peaks_index_list = []
self.result_path = ""
self.path = ""
self.dataset_name = ""
self.result_data_path = ""
self.result_data_name = ""
self.dld_t = np.zeros(0)
self.dld_t_c = np.zeros(0)
self.dld_x_det = np.zeros(0)
self.dld_y_det = np.zeros(0)
self.x = np.zeros(0)
self.y = np.zeros(0)
self.z = np.zeros(0)
self.dld_high_voltage = np.zeros(0)
self.dld_pulse_v = np.zeros(0)
self.dld_pulse_l = np.zeros(0)
self.dld_t_calib = np.zeros(0)
self.dld_t_calib_backup = np.zeros(0)
self.mc = np.zeros(0)
self.mc_uc = np.zeros(0)
self.mc_calib = np.zeros(0)
self.mc_calib_backup = np.zeros(0)
self.max_peak = 0
self.max_tof = None
self.peaks_index = 0
self.peak_x = []
self.peak_y = []
self.peak_widths = []
self.x_hist = None
self.y_hist = None
self.AptHistPlotter = None
self.ions_list_data = None
self.last_directory = get_project_path()
self.plotly_3d_reconstruction = None
self.data = None
self.data_backup = None
self.data_tdc = None
self.data_tdc_backup = None
self.max_mc = 400
self.flight_path_length = None
self.mask = None
self.range_data = _create_default_range_data()
self.range_data_backup = None
self.animation_detector_html = None
self.calibration_selection_masks = {}
self.calibration_peak_ranges = {}
self.bowl_sampling_mode = "polar"
@property
def dld_highVoltage(self):
"""Backward-compatible alias for legacy camelCase field names."""
return self.dld_high_voltage
@dld_highVoltage.setter
def dld_highVoltage(self, value):
self.dld_high_voltage = value
@property
def data_name(self):
"""Backward-compatible alias for `dataset_name`."""
return self.dataset_name
@data_name.setter
def data_name(self, value):
self.dataset_name = value