from collections.abc import Mapping
from copy import copy
import fast_histogram
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import colors
from matplotlib.tri import Triangulation
from scipy.optimize import curve_fit
from scipy.signal import find_peaks, peak_prominences, peak_widths
from pyccapt.calibration.core.exceptions import CalibrationInputError
from pyccapt.calibration.core.parallel import parallel_map
from pyccapt.calibration.core.validation import (
BOWL_FIT_MODES,
BOWL_SAMPLE_METHODS,
CALIBRATION_MODES,
SAMPLE_METHODS,
VOLTAGE_MODES,
ensure_choice,
ensure_matching_lengths,
ensure_non_empty_array,
ensure_positive,
normalize_sampling_mode,
normalize_voltage_model,
)
from pyccapt.calibration.path_utils import save_figure
from pyccapt.calibration.core.correction_models import (
_predict_bowl_model,
_predict_voltage_model,
bowl_corr,
hybrid_calibration_model,
refine_correction_nelder_mead,
robust_fit,
robust_voltage_fit,
voltage_corr,
)
from pyccapt.calibration.core.diagnostics import (
initial_calibration,
plot_fdm,
plot_selected_statistic,
)
def _extract_peak_mask_and_values(variables, calibration_mode):
"""Return the selected peak mask and values for the requested calibration mode."""
mask = variables.build_calibration_mask(calibration_mode)
values = variables.get_calibration_array(calibration_mode)[mask]
return mask, values
def _resolve_sample_size(sample_size, population_size):
"""Resolve a valid sample size, auto-scaling down if needed."""
if population_size <= 0:
raise CalibrationInputError("No ions available for calibration")
requested = int(ensure_positive(sample_size, field_name="sample_size"))
if requested > population_size:
return max(1, population_size // 10)
return requested
def _build_histogram_bins(values, bin_size):
"""Build stable histogram bin edges/count from the actual data span."""
data = ensure_non_empty_array(values, field_name="histogram_values")
lower = float(np.min(data))
upper = float(np.max(data))
span = max(upper - lower, float(bin_size))
n_bins = max(10, int(np.ceil(span / float(bin_size))))
edges = np.linspace(lower, upper, n_bins + 1)
return edges, n_bins
def _resolve_peak_location(values, method, bin_size, fast_calibration=False):
"""Resolve the reference peak location by histogram/mean/median."""
ensure_choice(method, field_name="maximum_cal_method", allowed=SAMPLE_METHODS)
data = ensure_non_empty_array(values, field_name="peak_values")
if fast_calibration and data.size > 10:
data = np.random.choice(data, int(data.size * 0.1), replace=False)
if method == "mean":
return float(np.mean(data))
if method == "median":
return float(np.median(data))
bins, n_bins = _build_histogram_bins(data, bin_size)
hist = fast_histogram.histogram1d(
data,
bins=n_bins,
range=(np.min(data), np.max(data)),
)
peaks, properties = find_peaks(hist, height=0)
if len(peaks) == 0:
return float(np.mean(data))
index_peak_max_ini = np.argmax(properties["peak_heights"])
return float(bins[peaks[index_peak_max_ini]])
def _radial_bowl_corr(data_xy, a, b, c, d, e):
"""Radial-dominant bowl model where r^2 drives the primary curvature."""
x = np.asarray(data_xy[0], dtype=float)
y = np.asarray(data_xy[1], dtype=float)
r2 = x**2 + y**2
return a + b * r2 + c * (r2**2) + d * x + e * y
[docs]
def voltage_correction(
dld_highVoltage_peak,
dld_t_peak,
variables,
maximum_location,
index_fig,
figname,
sample_size,
mode,
calibration_mode,
sample_range_max,
bin_size,
plot=True,
save=False,
fig_size=(5, 5),
model='curve_fit',
):
"""
Performs voltage correction and plots the graph based on the passed arguments.
Parameters:
- dld_highVoltage_peak (array): Array of high voltage peaks.
- dld_t_peak (array): Array of t peaks.
- maximum_location (float): Maximum location value.
- index_fig (string): Index of the saved plot.
- figname (string): Name of the saved plot image.
- sample_size (string): Sample size.
- mode (string): Mode ('ion_seq'/'voltage').
- calibration_mode (string): Type of calibration mode (tof/mc).
- sample_range_max (string): Type of peak_x mode (histogram/mean/median).
- outlier_remove (bool): Indicates whether to remove outliers. Default is True.
- plot (bool): Indicates whether to plot the graph. Default is True.
- save (bool): Indicates whether to save the plot. Default is False.
- fig_size (tuple): Figure size in inches. Default is (7, 5).
- model (string): Type of model ('curve_fit'/'robust_fit'). Default is 'curve_fit'.
- bin_size (float): Size of the bin.
Returns:
- fitresult (array): Corrected voltage array.
"""
model = normalize_voltage_model(model)
ensure_choice(mode, field_name="mode", allowed=VOLTAGE_MODES)
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(sample_range_max, field_name="sample_range_max", allowed=SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
dld_highVoltage_peak = ensure_non_empty_array(dld_highVoltage_peak, field_name="dld_highVoltage_peak")
dld_t_peak = ensure_non_empty_array(dld_t_peak, field_name="dld_t_peak")
ensure_matching_lengths(
dld_highVoltage_peak,
dld_t_peak,
field_names=("dld_highVoltage_peak", "dld_t_peak"),
)
# Each segment below is independent of every other and the inner work is
# NumPy + scipy.signal.find_peaks -- both release the GIL, so we run the
# per-segment evaluation through parallel_map(gil_releasing=True) and let
# the auto-serial fallback kick in for tiny datasets.
if mode == 'ion_seq':
num_segments = int(len(dld_highVoltage_peak) / sample_size) + 1
def _segment_indices(i: int):
start = i * sample_size
stop = start + sample_size
return dld_highVoltage_peak[start:stop], dld_t_peak[start:stop]
segments = [_segment_indices(i) for i in range(num_segments)]
elif mode == 'voltage':
v_min = np.min(dld_highVoltage_peak)
v_max = np.max(dld_highVoltage_peak)
num_segments = int((v_max - v_min) / sample_size) + 1
def _segment_by_voltage(i: int):
lo = v_min + i * sample_size
hi = v_min + (i + 1) * sample_size
mask = np.logical_and(dld_highVoltage_peak >= lo, dld_highVoltage_peak < hi)
return dld_highVoltage_peak[mask], dld_t_peak[mask]
segments = [_segment_by_voltage(i) for i in range(num_segments)]
else:
segments = []
def _evaluate_segment(segment):
v_selected, t_selected = segment
# Empty trailing segments are common when sample_size doesn't divide
# the row count evenly; the original code returned NaN here, which
# then broke curve_fit downstream. Drop them.
if v_selected.size == 0 or t_selected.size == 0:
return None
if sample_range_max == 'histogram':
try:
bins, _ = _build_histogram_bins(t_selected, bin_size)
y, x = np.histogram(t_selected, bins=bins)
peaks, properties = find_peaks(y, height=0)
index_peak_max_ini = np.argmax(properties['peak_heights'])
max_peak = peaks[index_peak_max_ini]
t_value = x[max_peak] / maximum_location
mask_v = np.logical_and(
t_selected >= x[max_peak] - bin_size,
t_selected <= x[max_peak] + bin_size,
)
if mode == 'ion_seq' and v_selected[mask_v].size == 0:
mask_v = np.logical_and(
t_selected >= x[max_peak] - 2 * bin_size,
t_selected <= x[max_peak] + 2 * bin_size,
)
if v_selected[mask_v].size == 0:
mask_v = np.logical_and(
t_selected >= x[max_peak] - 4 * bin_size,
t_selected <= x[max_peak] + 4 * bin_size,
)
v_value = float(np.mean(v_selected[mask_v]))
except ValueError:
t_value = float(np.mean(t_selected)) / maximum_location
v_value = float(np.mean(v_selected))
elif sample_range_max == 'mean':
t_value = float(np.mean(t_selected)) / maximum_location
v_value = float(np.mean(v_selected))
elif sample_range_max == 'median':
t_value = float(np.median(t_selected)) / maximum_location
v_value = float(np.median(v_selected))
else:
return None
return t_value, v_value
pairs = parallel_map(_evaluate_segment, segments, gil_releasing=True)
dld_t_peak_list = [pair[0] for pair in pairs if pair is not None]
high_voltage_mean_list = [pair[1] for pair in pairs if pair is not None]
if model == 'curve_fit':
fitresult, _ = curve_fit(voltage_corr, np.array(high_voltage_mean_list), np.array(dld_t_peak_list))
elif model == 'robust_fit':
fitresult = robust_voltage_fit(np.array(high_voltage_mean_list), np.array(dld_t_peak_list))
if plot or save:
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
label = 't'
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
label = 'mc'
x = plt.scatter(
np.array(high_voltage_mean_list) / 1000,
np.array(dld_t_peak_list) * maximum_location,
color="forestgreen",
label=r"$%s_{wp}$" % label,
s=5,
)
# x = plt.scatter(np.array(high_voltage_mean_list) / 1000, maximum_location / np.array(dld_t_peak_list),
# color="forestgreen", label=r"$%s_{wp}$" % label, s=5)
ax1.set_xlabel("Voltage (kV)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
ax2 = ax1.twinx()
f_v = _predict_voltage_model(model, fitresult, np.array(high_voltage_mean_list))
y = ax2.plot(np.array(high_voltage_mean_list) / 1000, np.sqrt(f_v), color='r', label=r"$C_V$")
# y = ax2.plot(np.array(high_voltage_mean_list) / 1000, f_v, color='r', label=r"$C_V$")
ax2.set_ylabel(r"$C_V$", color="red", fontsize=10) # Get the current axis
ax2.tick_params(axis='y', colors='red') # Change color and thickness of tick labels on y-axis
ax2.spines['right'].set_color('red') # Change color of right border
plt.legend(handles=[x, y[0]], loc='lower left', markerscale=5.0, prop={'size': 10})
if save:
save_figure(
fig1,
directory=variables.result_path,
stem=f"vol_corr_{figname}_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
return fitresult
[docs]
def voltage_corr_main(
dld_highVoltage,
variables,
sample_size,
mode,
calibration_mode,
index_fig,
plot,
save,
maximum_cal_method='mean',
maximum_sample_method='mean',
fig_size=(5, 5),
fast_calibration=False,
bin_size=0.01,
model='curve_fit',
peak_maximum=0,
calibration_apply=True,
refine_nelder_mead=False,
):
"""
Perform voltage correction on the given data.
Args:
dld_highVoltage (numpy.ndarray): Array of high voltages.
sample_size (int): Size of the sample.
mode (str): Mode of the correction.
calibration_mode (str): Calibration mode ('tof' or 'mc').
index_fig (int): Index of the figure.
plot (bool): Whether to plot the results.
save (bool): Whether to save the plots.
noise_remove (bool, optional): Whether to remove noise. Defaults to True.
maximum_cal_method (str, optional): Maximum calculation method ('mean', 'histogram', 'median').
maximum_sample_method (str, optional): Sample range maximum ('mean', 'histogram', 'median').
fig_size (tuple, optional): Size of the figure. Defaults to (5, 5).
fast_calibration (bool, optional): Whether to perform fast calibration. Defaults to False.
bin_size (float, optional): Size of the bin. Defaults to 0.01.
"""
model = normalize_voltage_model(model)
ensure_choice(mode, field_name="mode", allowed=VOLTAGE_MODES)
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(maximum_cal_method, field_name="maximum_cal_method", allowed=SAMPLE_METHODS)
ensure_choice(maximum_sample_method, field_name="maximum_sample_method", allowed=SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
all_calibration_values = variables.get_calibration_array(calibration_mode)
ensure_matching_lengths(
dld_highVoltage,
all_calibration_values,
field_names=("dld_highVoltage", f"{calibration_mode}_calibration_values"),
)
left_edge, right_edge = variables.get_calibration_peak_range(calibration_mode)
print('The left and right side of the main peak is:', left_edge, right_edge)
mask_temporal, dld_peak_b = _extract_peak_mask_and_values(variables, calibration_mode)
dld_highVoltage_peak_v = np.asarray(dld_highVoltage)[mask_temporal]
print('The number of ions is:', len(dld_highVoltage_peak_v))
sample_size = _resolve_sample_size(sample_size, len(dld_highVoltage_peak_v))
print('The number of samples is:', int(len(dld_highVoltage_peak_v) / sample_size))
if peak_maximum == 0:
maximum_location = _resolve_peak_location(
dld_peak_b,
method=maximum_cal_method,
bin_size=bin_size,
fast_calibration=fast_calibration,
)
else:
maximum_location = float(peak_maximum)
print('The maximum/mean/median of histogram is located at:', maximum_location)
print('The high voltage ranges are:', np.min(dld_highVoltage_peak_v), np.max(dld_highVoltage_peak_v))
mean_before = np.mean(dld_peak_b)
print('The mean of tof/mc before voltage calibration is:', mean_before)
fitresult = voltage_correction(
dld_highVoltage_peak_v,
dld_peak_b,
variables,
maximum_location,
index_fig=index_fig,
figname='voltage_corr',
sample_size=sample_size,
mode=mode,
calibration_mode=calibration_mode,
sample_range_max=maximum_sample_method,
bin_size=bin_size,
plot=plot,
save=save,
fig_size=fig_size,
model=model,
)
calibration_mc_tof = np.copy(variables.dld_t_calib) if calibration_mode == 'tof' else np.copy(variables.mc_calib)
print('The fit result is:', fitresult)
# Optional per-stage Nelder-Mead refinement of the voltage polynomial,
# adapted from APyT's `optimize_correction(mode='voltage')`
# (sebi-85/apyt: apyt/spectrum/align.py). Only the curve_fit model exposes
# interpretable polynomial coefficients; for other models the flag is a
# no-op and a notice is printed.
if refine_nelder_mead:
if model == 'curve_fit':
voltage_peak = np.asarray(dld_highVoltage_peak_v, dtype=float)
peak_raw = np.asarray(dld_peak_b, dtype=float)
sqrt_cal = calibration_mode == 'tof'
def _apply_voltage_correction(coeffs):
factor = voltage_corr(voltage_peak, *coeffs)
factor = np.where(factor > 0, factor, np.nan)
if sqrt_cal:
factor = np.sqrt(factor)
return peak_raw / factor
fitresult, refine_info = refine_correction_nelder_mead(
np.asarray(fitresult, dtype=float),
_apply_voltage_correction,
peak_raw,
fwhm_bin_size=float(bin_size) if float(bin_size) > 0 else 0.01,
)
print(
'Nelder-Mead voltage refine: status=%s baseline_FWHM=%.6f refined_FWHM=%.6f'
% (refine_info['status'], refine_info['baseline_fwhm'], refine_info['refined_fwhm'])
)
if refine_info['accepted']:
print('Refined voltage coefficients:', fitresult)
else:
print(
f"Nelder-Mead voltage refine requested but model='{model}' has no polynomial coefficients; skipping."
)
mask_fv = np.ones_like(dld_highVoltage, dtype=bool)
f_v = _predict_voltage_model(model, fitresult, np.asarray(dld_highVoltage)[mask_fv])
if calibration_mode == 'tof':
correction_factor = np.sqrt(f_v)
else:
correction_factor = f_v
print("Maximum value of correction factor:", np.max(correction_factor))
print("Minimum value of correction factor:", np.min(correction_factor))
calibration_mc_tof[mask_fv] = calibration_mc_tof[mask_fv] / correction_factor
if plot or save:
# Plot how correction factor for selected peak_x
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
if len(dld_highVoltage_peak_v) > 1000:
mask = np.random.randint(0, len(dld_highVoltage_peak_v), 1000)
else:
mask = np.arange(len(dld_highVoltage_peak_v))
x = plt.scatter(dld_highVoltage_peak_v[mask] / 1000, dld_peak_b[mask], color="blue", label=r"$t$", s=1)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel("Voltage (V)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
# Plot high voltage curve
ax2 = ax1.twinx()
f_v_plot = _predict_voltage_model(model, fitresult, dld_highVoltage_peak_v)
correction_plot = 1 / np.sqrt(f_v_plot) if calibration_mode == 'tof' else 1 / f_v_plot
y = ax2.plot(dld_highVoltage_peak_v / 1000, correction_plot, color='r', label=r"$C_{V}^{-1}$")
# y = ax2.plot(dld_highVoltage_peak_v / 1000, f_v_plot, color='r', label=r"$C_{V}^{-1}$")
ax2.set_ylabel(r"$C_{V}^{-1}$", color="red", fontsize=10)
ax2.tick_params(axis='y', colors='red') # Change color and thickness of tick labels on y-axis
ax2.spines['right'].set_color('red') # Change color of right border
plt.legend(handles=[x, y[0]], loc='upper left', markerscale=5.0, prop={'size': 10})
if save:
save_figure(
fig1,
directory=variables.result_path,
stem=f"vol_corr_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
plt.show()
# Plot corrected tof/mc vs. uncalibrated tof/mc
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
x = plt.scatter(dld_highVoltage_peak_v[mask] / 1000, dld_peak_b[mask], color="blue", label='t', s=1)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel("Voltage (kV)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
dld_t_plot = dld_peak_b * correction_plot
# dld_t_plot = dld_peak_b * f_v_plot
y = plt.scatter(dld_highVoltage_peak_v[mask] / 1000, dld_t_plot[mask], color="red", label=r"$t_{C_{V}}$", s=1)
plt.legend(handles=[x, y], loc='upper right', markerscale=5.0, prop={'size': 10})
if save:
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_V_corr_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
mean_after = np.mean(calibration_mc_tof[mask_temporal])
print('The mean of tof/mc after voltage calibration is:', mean_after)
print('The difference between the mean of tof/mc before and after voltage calibration is:', mean_after - mean_before)
if calibration_apply:
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
elif calibration_mode == 'mc':
variables.mc_calib = calibration_mc_tof
return f_v
def _resolve_sampling_mode(sampling_mode, variables=None):
"""Resolve sampling mode with a variable-level default fallback."""
configured = sampling_mode
if configured is None and variables is not None:
configured = getattr(variables, "bowl_sampling_mode", "polar")
mode = normalize_sampling_mode(configured)
if variables is not None and hasattr(variables, "bowl_sampling_mode"):
variables.bowl_sampling_mode = mode
return mode
def _cell_peak_value(values, maximum_location, sample_range_max, bin_size):
if sample_range_max == 'mean':
return float(np.mean(values)) / maximum_location
values = np.asarray(values, dtype=float)
if values.size == 0:
return float('nan')
value_span = float(np.max(values) - np.min(values))
if not np.isfinite(value_span) or value_span <= 0:
return float(np.mean(values)) / maximum_location
bins, n_bins = _build_histogram_bins(values, bin_size)
y_hist = fast_histogram.histogram1d(
values,
bins=n_bins,
range=(np.min(values), np.max(values)),
)
peaks, properties = find_peaks(y_hist, height=0)
if len(peaks) == 0:
return float(np.mean(values)) / maximum_location
index_peak_max_ini = np.argmax(properties['peak_heights'])
return float(bins[peaks[index_peak_max_ini]]) / maximum_location
def _iter_polar_cells(radial_distance, sample_size, det_diam):
r_max_data = float(np.max(radial_distance)) if len(radial_distance) else 0.0
r_max_detector = max(0.0, float(det_diam) / 2.0)
r_max = max(r_max_data, r_max_detector, float(sample_size))
n_rings = max(1, int(np.ceil(r_max / float(sample_size))))
ring_edges = np.linspace(0.0, r_max, n_rings + 1)
for r_idx in range(n_rings):
r_min_i = float(ring_edges[r_idx])
r_max_i = float(ring_edges[r_idx + 1])
r_mid = 0.5 * (r_min_i + r_max_i)
circumference = 2.0 * np.pi * max(r_mid, 0.5 * float(sample_size))
n_sectors = int(np.clip(np.ceil(circumference / float(sample_size)), 8, 96))
for theta_idx in range(n_sectors):
theta_min = float(theta_idx * 2.0 * np.pi / n_sectors)
theta_max = float((theta_idx + 1) * 2.0 * np.pi / n_sectors)
yield r_min_i, r_max_i, theta_min, theta_max
def _collect_spatial_samples(
dld_x,
dld_y,
dld_t,
maximum_location,
sample_range_max,
sample_size,
bin_size,
*,
sampling_mode,
det_diam,
dld_v=None,
):
# The inner work for each cell is a histogram + peak find + mean / median
# call on a small subarray. Every cell is independent of every other, and
# the inner kernel is NumPy/SciPy (GIL-releasing), so this is the textbook
# case for parallel_map(gil_releasing=True). On a 80 mm detector at 2 mm
# sampling we get O(2000) cells per call -- typically 4-8x speedup with
# threads, automatic serial fallback for tiny inputs.
if sampling_mode == 'polar':
radial_distance = np.hypot(dld_x, dld_y)
polar_angle = np.mod(np.arctan2(dld_y, dld_x), 2.0 * np.pi)
cells = list(_iter_polar_cells(radial_distance, sample_size, det_diam))
def _eval_polar_cell(bounds):
r_min_i, r_max_i, theta_min, theta_max = bounds
mask = (
(radial_distance >= r_min_i)
& (radial_distance < r_max_i)
& (polar_angle >= theta_min)
& (polar_angle < theta_max)
)
if not np.any(mask):
return None
cell_t = dld_t[mask]
normalized_value = _cell_peak_value(cell_t, maximum_location, sample_range_max, bin_size)
if not np.isfinite(normalized_value) or normalized_value <= 0:
return None
row = (
float(np.median(dld_x[mask])),
float(np.median(dld_y[mask])),
normalized_value,
float(np.mean(dld_v[mask])) if dld_v is not None else None,
)
return row
results = parallel_map(_eval_polar_cell, cells, gil_releasing=True)
else:
cell_size = float(sample_size)
x_min = float(np.floor(np.min(dld_x)))
x_max = float(np.ceil(np.max(dld_x)))
y_min = float(np.floor(np.min(dld_y)))
y_max = float(np.ceil(np.max(dld_y)))
n_cols = max(1, int(np.ceil((x_max - x_min) / cell_size)))
n_rows = max(1, int(np.ceil((y_max - y_min) / cell_size)))
x_bin = np.floor((dld_x - x_min) / cell_size).astype(int)
y_bin = np.floor((dld_y - y_min) / cell_size).astype(int)
valid = (x_bin >= 0) & (x_bin < n_cols) & (y_bin >= 0) & (y_bin < n_rows)
if not np.any(valid):
results: list = []
else:
cell_id = y_bin[valid] * n_cols + x_bin[valid]
order = np.argsort(cell_id, kind='mergesort')
x_sorted = dld_x[valid][order]
y_sorted = dld_y[valid][order]
t_sorted = dld_t[valid][order]
v_sorted = dld_v[valid][order] if dld_v is not None else None
cell_sorted = cell_id[order]
_, starts, counts = np.unique(cell_sorted, return_index=True, return_counts=True)
# Stays serial: cartesian cells operate on pre-sorted contiguous
# slices so each cell is sub-millisecond. Thread dispatch overhead
# exceeds the per-cell work, so parallelism here regresses
# wall-clock time on every realistic dataset.
results = []
for start, count in zip(starts, counts):
stop = start + count
cell_t = t_sorted[start:stop]
normalized_value = _cell_peak_value(cell_t, maximum_location, sample_range_max, bin_size)
if not np.isfinite(normalized_value) or normalized_value <= 0:
results.append(None)
continue
results.append(
(
float(np.median(x_sorted[start:stop])),
float(np.median(y_sorted[start:stop])),
normalized_value,
float(np.mean(v_sorted[start:stop])) if v_sorted is not None else None,
)
)
x_samples: list = []
y_samples: list = []
t_samples: list = []
v_samples: list = []
for row in results:
if row is None:
continue
x_samples.append(row[0])
y_samples.append(row[1])
t_samples.append(row[2])
if dld_v is not None:
v_samples.append(row[3])
result = {
'x': np.asarray(x_samples, dtype=float),
'y': np.asarray(y_samples, dtype=float),
't': np.asarray(t_samples, dtype=float),
}
if dld_v is not None:
result['v'] = np.asarray(v_samples, dtype=float)
return result
[docs]
def bowl_correction(
dld_x_bowl,
dld_y_bowl,
dld_t_bowl,
variables,
det_diam,
maximum_location,
sample_range_max,
sample_size,
calibration_mode,
fit_mode,
index_fig,
plot,
save,
fig_size=(7, 5),
bin_size=0.01,
sampling_mode='polar',
):
"""
Perform bowl correction on the input data.
Args:
dld_x_bowl (numpy.ndarray): X coordinates of the data points.
dld_y_bowl (numpy.ndarray): Y coordinates of the data points.
dld_t_bowl (numpy.ndarray): Time values of the data points.
det_diam (float): Diameter of the detector.
maximum_location (float): Maximum location for normalization.
sample_range_max (str, optional): Sample range maximum ('mean' or 'histogram').
sample_size (int): Size of each rectangle in mm.
calibration_mode (str): Calibration mode ('tof' or 'mc').
fit_mode (str): Fit mode ('curve_fit' or 'hemisphere_fit').
index_fig (int): Index for figure naming.
plot (bool): Flag indicating whether to plot the surface.
save (bool): Flag indicating whether to save the plot.
fig_size (tuple): Size of the figure.
bin_size (float): Size of the bin.
Returns:
parameters (numpy.ndarray): Optimized parameters of the bowl correction.
"""
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(fit_mode, field_name="fit_mode", allowed=BOWL_FIT_MODES)
ensure_choice(sample_range_max, field_name="sample_range_max", allowed=BOWL_SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
dld_x_bowl = ensure_non_empty_array(dld_x_bowl, field_name="dld_x_bowl")
dld_y_bowl = ensure_non_empty_array(dld_y_bowl, field_name="dld_y_bowl")
dld_t_bowl = ensure_non_empty_array(dld_t_bowl, field_name="dld_t_bowl")
ensure_matching_lengths(
dld_x_bowl,
dld_y_bowl,
dld_t_bowl,
field_names=("dld_x_bowl", "dld_y_bowl", "dld_t_bowl"),
)
sampling_mode = _resolve_sampling_mode(sampling_mode, variables)
samples = _collect_spatial_samples(
np.asarray(dld_x_bowl, dtype=float),
np.asarray(dld_y_bowl, dtype=float),
np.asarray(dld_t_bowl, dtype=float),
maximum_location,
sample_range_max,
sample_size,
bin_size,
sampling_mode=sampling_mode,
det_diam=det_diam,
)
x_samples = samples['x']
y_samples = samples['y']
t_samples = samples['t']
if len(x_samples) == 0:
raise CalibrationInputError('No detector windows contained ions for bowl correction')
print('x_sample_list max and min:', np.max(x_samples), np.min(x_samples))
print('y_sample_list max and min:', np.max(y_samples), np.min(y_samples))
print('dld_t_peak_list max and min:', np.max(t_samples), np.min(t_samples))
r2_samples = x_samples**2 + y_samples**2
radial_design = np.column_stack(
[
np.ones(len(x_samples), dtype=float),
r2_samples,
r2_samples**2,
x_samples,
y_samples,
]
)
if fit_mode == 'curve_fit':
if sampling_mode == 'polar':
radial_parameters, _ = curve_fit(
_radial_bowl_corr,
[x_samples, y_samples],
t_samples,
)
parameters = {
'model': 'radial_curve_fit',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters, _ = curve_fit(bowl_corr, [x_samples, y_samples], t_samples)
elif fit_mode == 'ml_fit':
parameters = hybrid_calibration_model(x_samples, y_samples, t_samples)
elif fit_mode == 'robust_fit':
if sampling_mode == 'polar':
radial_parameters = _robust_joint_linear_fit(radial_design, t_samples)
parameters = {
'model': 'radial_linear',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters = robust_fit(x_samples, y_samples, t_samples)
if plot or save:
if calibration_mode == 'tof':
label = 't'
elif calibration_mode == 'mc':
label = 'mc'
model_x_data = np.array(x_samples)
model_y_data = np.array(y_samples)
X, Y = np.meshgrid(model_x_data, model_y_data)
Z = _predict_bowl_model(fit_mode, parameters, X.ravel(), Y.ravel()).reshape(X.shape)
fig, ax = plt.subplots(figsize=fig_size, subplot_kw=dict(projection="3d"), constrained_layout=True)
box = ax.get_position()
ax.set_position([box.x0 + 0.1, box.y0 + 0.1, box.width * 0.75, box.height * 0.75])
scat = ax.scatter(
model_x_data, model_y_data, zs=1 / np.array(t_samples), color="forestgreen", label=r"$%s_{wp}$" % label, s=3
)
fig.add_axes(ax)
cmap = copy(plt.cm.plasma)
cmap.set_bad(cmap(0))
x_flat = X.flatten()
y_flat = Y.flatten()
z_flat = Z.flatten()
triangles = Triangulation(x_flat, y_flat)
# surf = ax.plot_surface(X, Y, 1 / Z, color='red', alpha=0.05, label='bowl')
surf = ax.plot_trisurf(x_flat, y_flat, 1 / z_flat, triangles=triangles.triangles, cmap=cmap, alpha=0.6)
ax.set_xlabel(r'$X_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_ylabel(r'$Y_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_zlabel(r"${C_B}$", fontsize=10, labelpad=5, color='red')
ax.zaxis.line.set_color('red')
# Change z-axis tick label color
for tick in ax.get_zaxis().get_ticklabels():
tick.set_color('red')
ax.view_init(elev=7, azim=-41)
if save:
save_figure(
fig,
directory=variables.result_path,
stem=f"bowl_corr_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
print('The parameters of the bowl correction are:', parameters)
return parameters
[docs]
def bowl_correction_main(
dld_x,
dld_y,
dld_highVoltage,
variables,
det_diam,
sample_size,
fit_mode,
calibration_mode,
index_fig,
plot,
save,
maximum_cal_method='mean',
maximum_sample_method='mean',
fig_size=(5, 5),
fast_calibration=False,
bin_size=0.01,
peak_maximum=0,
calibration_apply=True,
sampling_mode='polar',
refine_nelder_mead=False,
):
"""
Perform bowl correction on the input data and plot the results.
Args:
dld_x (numpy.ndarray): X positions.
dld_y (numpy.ndarray): Y positions.
dld_highVoltage (numpy.ndarray): High voltage values.
det_diam (float): Detector diameter.
sample_size (int): Sample size.
fit_mode (str): Fit mode ('curve_fit', 'ml_fit', or 'robust_fit').
calibration_mode (str): Calibration mode ('tof' or 'mc').
index_fig (int): Index figure.
plot (bool): Flag indicating whether to plot the results.
save (bool): Flag indicating whether to save the plots.
maximum_cal_method (str, optional): Maximum calculation method ('mean' or 'histogram').
maximum_sample_method (str, optional): Sample range maximum ('mean' or 'histogram').
fig_size (tuple, optional): Figure size.
fast_calibration (bool, optional): Flag indicating whether to perform fast calibration.
bin_size (float, optional): Size of the bin.
Returns:
None
"""
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(fit_mode, field_name="fit_mode", allowed=BOWL_FIT_MODES)
ensure_choice(maximum_cal_method, field_name="maximum_cal_method", allowed=SAMPLE_METHODS)
ensure_choice(maximum_sample_method, field_name="maximum_sample_method", allowed=BOWL_SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
dld_x = np.asarray(dld_x) * 10 # convert cm to mm
dld_y = np.asarray(dld_y) * 10 # convert cm to mm
dld_highVoltage = np.asarray(dld_highVoltage)
all_calibration_values = variables.get_calibration_array(calibration_mode)
ensure_matching_lengths(
dld_x,
dld_y,
dld_highVoltage,
all_calibration_values,
field_names=("dld_x", "dld_y", "dld_highVoltage", f"{calibration_mode}_calibration_values"),
)
left_edge, right_edge = variables.get_calibration_peak_range(calibration_mode)
print('The left and right side of the main peak is:', left_edge, right_edge)
mask_temporal, dld_peak = _extract_peak_mask_and_values(variables, calibration_mode)
print('The number of ions is:', len(dld_peak))
if peak_maximum == 0:
maximum_location = _resolve_peak_location(
dld_peak,
method=maximum_cal_method,
bin_size=bin_size,
fast_calibration=fast_calibration,
)
else:
maximum_location = float(peak_maximum)
print('The maximum/mean of peak is located at:', maximum_location)
dld_x_peak = dld_x[mask_temporal]
dld_y_peak = dld_y[mask_temporal]
dld_highVoltage_peak = dld_highVoltage[mask_temporal]
mean_before = np.mean(dld_peak)
print('The mean of tof before bowl calibration is:', mean_before)
parameters = bowl_correction(
dld_x_peak,
dld_y_peak,
dld_peak,
variables,
det_diam,
maximum_location,
maximum_sample_method,
sample_size=sample_size,
calibration_mode=calibration_mode,
fit_mode=fit_mode,
index_fig=index_fig,
plot=plot,
save=save,
fig_size=fig_size,
bin_size=bin_size,
sampling_mode=sampling_mode,
)
print('The fit result is:', parameters)
# Optional per-stage Nelder-Mead refinement of the bowl polynomial,
# adapted from APyT's `optimize_correction(mode='flight')`
# (sebi-85/apyt: apyt/spectrum/align.py). Only the standard 6-parameter
# curve_fit model is refined here; radial/RANSAC/ML variants are skipped
# with a notice.
if refine_nelder_mead:
coeffs_array = np.asarray(parameters, dtype=float).ravel() if not isinstance(parameters, Mapping) else None
if fit_mode == 'curve_fit' and coeffs_array is not None and coeffs_array.size == 6:
x_peak = np.asarray(dld_x_peak, dtype=float)
y_peak = np.asarray(dld_y_peak, dtype=float)
peak_raw = np.asarray(dld_peak, dtype=float)
def _apply_bowl_correction(coeffs):
factor = bowl_corr([x_peak, y_peak], *coeffs)
factor = np.where(factor > 0, factor, np.nan)
return peak_raw / factor
refined_coeffs, refine_info = refine_correction_nelder_mead(
coeffs_array,
_apply_bowl_correction,
peak_raw,
fwhm_bin_size=float(bin_size) if float(bin_size) > 0 else 0.01,
)
print(
'Nelder-Mead bowl refine: status=%s baseline_FWHM=%.6f refined_FWHM=%.6f'
% (refine_info['status'], refine_info['baseline_fwhm'], refine_info['refined_fwhm'])
)
if refine_info['accepted']:
parameters = refined_coeffs
print('Refined bowl coefficients:', parameters)
else:
print(
f"Nelder-Mead bowl refine requested but fit_mode='{fit_mode}' has no 6-param polynomial coefficients; skipping."
)
mask_fv = np.ones_like(dld_x, dtype=bool)
f_bowl = _predict_bowl_model(fit_mode, parameters, dld_x[mask_fv], dld_y[mask_fv])
calibration_mc_tof = np.copy(variables.dld_t_calib) if calibration_mode == 'tof' else np.copy(variables.mc_calib)
print("Maximum value of f_bowl:", np.max(f_bowl))
print("Minimum value of f_bowl:", np.min(f_bowl))
calibration_mc_tof[mask_fv] = calibration_mc_tof[mask_fv] / f_bowl
# calibration_mc_tof[mask_fv] = calibration_mc_tof[mask_fv] * f_bowl
mean_after = np.mean(calibration_mc_tof[mask_temporal])
print('The mean of tof/mc after bowl calibration is:', mean_after)
print('The difference between the mean of tof before and after bowl calibration is:', mean_after - mean_before)
if plot or save:
# Plot how bowl correct tof/mc vs high voltage
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
mask = np.random.randint(0, len(dld_highVoltage_peak), 10000)
x = plt.scatter(dld_highVoltage_peak[mask] / 1000, dld_peak[mask], color="blue", label=r"$t$", s=1)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel("Voltage (kV)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
y = plt.scatter(dld_highVoltage_peak[mask] / 1000, dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1)
plt.legend(handles=[x, y], loc='upper right', markerscale=5.0, prop={'size': 10})
if save:
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
# Plot how bowl correction correct tof/mc vs dld_x position
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
x = plt.scatter(dld_x_peak[mask], dld_peak[mask], color="blue", label=r"$t$", s=1, alpha=0.5)
y = plt.scatter(dld_x_peak[mask], dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1, alpha=0.5)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel(r"$X_{det}$ (mm)", fontsize=10)
plt.grid(color='aqua', alpha=0.3, linestyle='-.', linewidth=0.4)
plt.legend(handles=[x, y], loc='upper right', markerscale=5.0, prop={'size': 10})
if save:
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_p_x_det_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
# Plot how bowl correction correct tof/mc vs dld_x position
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
x = plt.scatter(dld_y_peak[mask], dld_peak[mask], color="blue", label=r"$t$", s=1, alpha=0.5)
y = plt.scatter(dld_y_peak[mask], dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1, alpha=0.5)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel(r"$y_{det}$ (mm)", fontsize=10)
plt.grid(color='aqua', alpha=0.3, linestyle='-.', linewidth=0.4)
plt.legend(handles=[x, y], loc='upper right', markerscale=5.0, prop={'size': 10})
if save:
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_p_y_det_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
fig, ax = plt.subplots(figsize=fig_size, subplot_kw=dict(projection="3d"), constrained_layout=True)
# Adjust the subplot parameters to make the plot smaller
box = ax.get_position()
ax.set_position([box.x0 + 0.1, box.y0 + 0.1, box.width * 0.75, box.height * 0.75])
mask = np.random.randint(0, len(dld_highVoltage_peak), 500)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
scat_1 = ax.scatter(dld_x_peak[mask], dld_y_peak[mask], zs=dld_peak[mask], color="blue", label=r"$t$", s=1)
scat_2 = ax.scatter(dld_x_peak[mask], dld_y_peak[mask], zs=dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1)
plt.legend(handles=[scat_1, scat_2], loc='upper left', markerscale=5.0, prop={'size': 10})
ax.set_xlabel(r'$X_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_ylabel(r'$Y_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_zlabel(r"Time of Flight (ns)", fontsize=10, labelpad=5)
ax.view_init(elev=7, azim=-41)
if save:
save_figure(
fig,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_3d_{index_fig}",
formats=("pdf", "png"),
dpi=600,
)
if plot:
plt.show()
if calibration_apply:
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
elif calibration_mode == 'mc':
variables.mc_calib = calibration_mc_tof
return f_bowl
def _auto_detect_peaks(calibration_array, n_peaks=3, prominence=100, distance=500, hist_bin_size=0.1):
"""Auto-detect the top N prominent peaks in the calibration spectrum."""
arr = np.asarray(calibration_array, dtype=float)
arr = arr[np.isfinite(arr)]
if arr.size < 50:
raise CalibrationInputError("Not enough valid ions for multi-peak calibration")
mc_min = float(np.percentile(arr, 0.1))
mc_max = float(np.percentile(arr, 99.9))
trimmed = arr[(arr >= mc_min) & (arr <= mc_max)]
if trimmed.size < 50:
raise CalibrationInputError("Not enough ions remain after trimming for multi-peak calibration")
hist_edges, _n_bins = _build_histogram_bins(trimmed, max(hist_bin_size, 1e-6))
# fast_histogram.histogram1d is ~10x faster than np.histogram for
# evenly-spaced bins and dominates a large fraction of peak-detection
# time when ``calibration_array`` is in the millions.
hist_y = fast_histogram.histogram1d(
trimmed, bins=int(_n_bins),
range=(float(hist_edges[0]), float(hist_edges[-1])),
)
hist_x = (hist_edges[:-1] + hist_edges[1:]) / 2
if hist_y.size >= 5:
kernel_size = min(9, hist_y.size if hist_y.size % 2 == 1 else hist_y.size - 1)
kernel_size = max(3, kernel_size)
kernel = np.ones(kernel_size, dtype=float) / float(kernel_size)
hist_y_eval = np.convolve(hist_y.astype(float), kernel, mode='same')
else:
hist_y_eval = hist_y.astype(float)
peaks_found = np.array([], dtype=int)
prominence_trials = [
max(1.0, float(prominence)),
max(1.0, float(prominence) * 0.5),
max(1.0, float(prominence) * 0.25),
max(1.0, float(np.max(hist_y_eval)) * 0.05),
]
for prominence_value in prominence_trials:
peaks_found, _ = find_peaks(
hist_y_eval,
prominence=prominence_value,
distance=max(1, int(distance)),
height=0,
)
if len(peaks_found) > 0:
break
if len(peaks_found) == 0:
peaks_found, _ = find_peaks(
hist_y_eval,
prominence=max(1.0, float(np.max(hist_y_eval)) * 0.02),
distance=max(1, int(distance) // 2),
height=0,
)
if len(peaks_found) == 0:
raise CalibrationInputError("No peaks found for multi-peak calibration")
prom = peak_prominences(hist_y_eval, peaks_found)
pw = peak_widths(hist_y_eval, peaks_found, rel_height=0.5)
sorted_idx = np.argsort(prom[0])[::-1]
top_n = min(n_peaks, len(sorted_idx))
results = []
for pi in sorted_idx[:top_n]:
peak_pos = float(hist_x[peaks_found[pi]])
left = float(np.interp(pw[2][pi], np.arange(len(hist_x)), hist_x))
right = float(np.interp(pw[3][pi], np.arange(len(hist_x)), hist_x))
width = max(right - left, float(hist_bin_size) * 3)
margin = max(float(hist_bin_size) * 2, width * 0.25)
x1 = max(mc_min, left - margin)
x2 = min(mc_max, right + margin)
if x2 <= x1:
continue
candidate = {
'position': peak_pos,
'x1': x1,
'x2': x2,
'prominence': float(prom[0][pi]),
'width': float(width),
}
if any(not (candidate['x2'] <= existing['x1'] or existing['x2'] <= candidate['x1']) for existing in results):
continue
results.append(candidate)
if not results:
raise CalibrationInputError("Peak windows could not be resolved for multi-peak calibration")
return results
[docs]
def auto_detect_reference_peaks(
calibration_array, n_peaks=6, prominence=100, distance=500, hist_bin_size=0.1,
event_mask=None,
):
"""Public helper for stable multi-peak evaluation windows used by auto calibration.
``event_mask`` (optional) is a boolean array the same length as
``calibration_array``: True keeps the ion, False ignores it. Use this
to detect peaks on calibration-trustworthy ions only (single-hit /
excluded-correlations subsets from
``pyccapt.calibration.core.event_filters``). Default ``None``
preserves legacy behavior.
"""
arr = calibration_array
if event_mask is not None:
mask = np.asarray(event_mask, dtype=bool)
arr_np = np.asarray(arr)
if mask.size == arr_np.size:
arr = arr_np[mask]
return _auto_detect_peaks(
arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
hist_bin_size=hist_bin_size,
)
[docs]
def recompute_peak_window(
variables,
calibration_mode: str = 'mc',
*,
prominence: float = 100,
distance: int = 10,
hist_bin_size: float = 0.05,
lim=None,
window_inflate: float = 2.0,
target_position=None,
) -> tuple:
"""Lightweight peak-window recompute. NO Matplotlib.
Detects peaks via ``auto_detect_reference_peaks`` on the currently
calibrated array and updates ``variables.selected_x1/x2`` to the
dominant peak (or, if ``target_position`` is set, the peak closest
to that mass).
This is the headless equivalent of ``mc_plot.hist_plot(...,
plot_show=False)`` for use inside auto-calibration loops where the
Matplotlib stack adds significant per-iteration overhead.
Parameters
----------
variables : Variables
Calibration state container.
calibration_mode : {'mc', 'tof'}
prominence, distance, hist_bin_size : passed to peak detection.
lim : float, optional
Upper limit on values considered. Defaults to ``variables.max_tof``
for 'tof' and 400.0 for 'mc'.
window_inflate : float
Multiplier on the FWHM-based window width. 2.0 keeps the window
wide enough for stable MRP scoring on tight peaks.
target_position : float, optional
If supplied, pick the peak nearest this m/c instead of the most
prominent one. Useful for keeping the same physical peak window
across calibration iterations as the peak drifts.
Returns
-------
(x1, x2, position) : tuple of floats
"""
mode = ensure_choice(calibration_mode, field_name='calibration_mode',
allowed=CALIBRATION_MODES)
arr = np.asarray(variables.get_calibration_array(mode), dtype=float)
arr = arr[np.isfinite(arr) & (arr > 0)]
if lim is None:
lim = float(variables.max_tof) if mode == 'tof' else 400.0
arr = arr[arr < lim]
if arr.size < 50:
raise CalibrationInputError(
"Not enough ions for recompute_peak_window"
)
try:
peaks = auto_detect_reference_peaks(
arr, n_peaks=8, prominence=float(prominence),
distance=int(distance), hist_bin_size=float(hist_bin_size),
)
except CalibrationInputError:
peaks = []
if not peaks:
# Fallback: histogram-max
hist, edges = np.histogram(arr, bins=1000)
idx = int(np.argmax(hist))
center = 0.5 * (edges[idx] + edges[idx + 1])
width = (edges[-1] - edges[0]) * 0.02
x1, x2 = center - width, center + width
position = center
else:
if target_position is not None:
best = min(peaks, key=lambda p: abs(float(p['position']) - float(target_position)))
else:
best = max(peaks, key=lambda p: float(p.get('prominence', 0.0)))
x1 = float(best['x1'])
x2 = float(best['x2'])
position = float(best['position'])
if window_inflate != 1.0:
half = (x2 - x1) * 0.5 * float(window_inflate)
x1 = position - half
x2 = position + half
variables.selected_x1 = float(x1)
variables.selected_x2 = float(x2)
try:
variables.set_calibration_peak_range(mode, float(x1), float(x2))
except Exception:
pass
return float(x1), float(x2), float(position)
[docs]
def multi_peak_voltage_corr_main(
dld_highVoltage,
variables,
calibration_mode='mc',
model='robust_fit',
bin_size=0.01,
n_peaks=3,
prominence=100,
distance=500,
):
"""Voltage correction using multiple auto-detected peaks simultaneously."""
model = normalize_voltage_model(model)
calib_arr = variables.get_calibration_array(calibration_mode)
dld_hv = np.asarray(dld_highVoltage, dtype=float)
detected = _auto_detect_peaks(
calib_arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
)
all_v_means = []
all_t_normalized = []
peak_info = []
for pk in detected:
x1, x2, peak_pos = pk['x1'], pk['x2'], pk['position']
mask = (calib_arr > x1) & (calib_arr < x2)
peak_ions = calib_arr[mask]
voltage_ions = dld_hv[mask]
if len(peak_ions) < 100:
continue
maximum_location = _resolve_peak_location(peak_ions, 'histogram', bin_size)
if maximum_location == 0:
continue
sample_size = max(1, int(len(voltage_ions) / 100))
for i in range(0, len(voltage_ions), sample_size):
chunk_v = voltage_ions[i : i + sample_size]
chunk_t = peak_ions[i : i + sample_size]
if len(chunk_t) < 5:
continue
mean_voltage = float(np.mean(chunk_v))
normalized_value = float(np.mean(chunk_t)) / maximum_location
if not np.isfinite(mean_voltage) or not np.isfinite(normalized_value) or normalized_value <= 0:
continue
all_v_means.append(mean_voltage)
all_t_normalized.append(normalized_value)
peak_info.append(
{
'position': peak_pos,
'x1': x1,
'x2': x2,
'n_ions': int(np.sum(mask)),
}
)
if len(all_v_means) < 3:
raise CalibrationInputError("Not enough data points from multi-peak detection for voltage correction")
v_arr = np.asarray(all_v_means)
t_arr = np.asarray(all_t_normalized)
if model == 'robust_fit':
fitresult = robust_voltage_fit(v_arr, t_arr)
elif model == 'curve_fit':
fitresult, _ = curve_fit(voltage_corr, v_arr, t_arr)
predicted_fit = _predict_voltage_model(model, fitresult, v_arr)
if not np.all(np.isfinite(np.asarray(predicted_fit, dtype=float))):
raise CalibrationInputError("Voltage fit returned invalid parameters")
f_v = np.clip(
_predict_voltage_model(model, fitresult, dld_hv),
np.finfo(float).eps,
None,
)
correction_factor = np.sqrt(f_v) if calibration_mode == 'tof' else f_v
source = variables.dld_t_calib if calibration_mode == 'tof' else variables.mc_calib
calibration_mc_tof = source / correction_factor
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
else:
variables.mc_calib = calibration_mc_tof
return fitresult, peak_info
[docs]
def multi_peak_bowl_corr_main(
dld_x,
dld_y,
dld_highVoltage,
variables,
det_diam,
calibration_mode='mc',
fit_mode='robust_fit',
sample_size=9,
bin_size=0.01,
n_peaks=3,
prominence=100,
distance=500,
sampling_mode='polar',
):
"""Bowl correction using multiple auto-detected peaks simultaneously."""
ensure_choice(fit_mode, field_name="fit_mode", allowed=BOWL_FIT_MODES)
calib_arr = variables.get_calibration_array(calibration_mode)
sampling_mode = _resolve_sampling_mode(sampling_mode, variables)
dld_x_mm = np.asarray(dld_x) * 10
dld_y_mm = np.asarray(dld_y) * 10
detected = _auto_detect_peaks(
calib_arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
)
all_x_samples = []
all_y_samples = []
all_t_normalized = []
n_used = 0
for pk in detected:
x1, x2 = pk['x1'], pk['x2']
mask = (calib_arr > x1) & (calib_arr < x2)
peak_t = calib_arr[mask]
peak_x = dld_x_mm[mask]
peak_y = dld_y_mm[mask]
if len(peak_t) < 100:
continue
maximum_location = _resolve_peak_location(peak_t, 'histogram', bin_size)
if maximum_location == 0:
continue
peak_samples = _collect_spatial_samples(
peak_x,
peak_y,
peak_t,
maximum_location,
'mean',
sample_size,
bin_size,
sampling_mode=sampling_mode,
det_diam=det_diam,
)
if len(peak_samples['x']) == 0:
continue
all_x_samples.extend(peak_samples['x'].tolist())
all_y_samples.extend(peak_samples['y'].tolist())
all_t_normalized.extend(peak_samples['t'].tolist())
n_used += 1
if len(all_x_samples) < 5:
raise CalibrationInputError("Not enough spatial data from multi-peak detection for bowl correction")
x_arr = np.array(all_x_samples)
y_arr = np.array(all_y_samples)
t_arr = np.array(all_t_normalized)
if fit_mode == 'curve_fit':
if sampling_mode == 'polar':
radial_parameters, _ = curve_fit(_radial_bowl_corr, [x_arr, y_arr], t_arr)
parameters = {
'model': 'radial_curve_fit',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters, _ = curve_fit(bowl_corr, [x_arr, y_arr], t_arr)
elif fit_mode == 'ml_fit':
parameters = hybrid_calibration_model(x_arr, y_arr, t_arr)
elif fit_mode == 'robust_fit':
if sampling_mode == 'polar':
r2_arr = x_arr**2 + y_arr**2
radial_design = np.column_stack(
[
np.ones(len(x_arr), dtype=float),
r2_arr,
r2_arr**2,
x_arr,
y_arr,
]
)
radial_parameters = _robust_joint_linear_fit(radial_design, t_arr)
parameters = {
'model': 'radial_linear',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters = robust_fit(x_arr, y_arr, t_arr)
if isinstance(parameters, dict):
is_finite = np.all(np.isfinite(np.asarray(parameters.get('parameters', []), dtype=float)))
elif fit_mode == 'robust_fit':
trial_prediction = _predict_bowl_model(fit_mode, parameters, x_arr, y_arr)
is_finite = np.all(np.isfinite(np.asarray(trial_prediction, dtype=float)))
else:
is_finite = np.all(np.isfinite(np.asarray(parameters, dtype=float)))
if not is_finite:
raise CalibrationInputError("Bowl fit returned invalid parameters")
f_bowl = np.clip(
_predict_bowl_model(fit_mode, parameters, dld_x_mm, dld_y_mm),
np.finfo(float).eps,
None,
)
source = variables.dld_t_calib if calibration_mode == 'tof' else variables.mc_calib
calibration_mc_tof = source / f_bowl
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
else:
variables.mc_calib = calibration_mc_tof
return parameters, n_used
def _joint_feature_matrix(voltage_values, x_values, y_values, voltage_center, voltage_scale, spatial_scale):
"""Build a smooth joint voltage-plus-detector feature matrix."""
v = (np.asarray(voltage_values, dtype=float) - float(voltage_center)) / float(voltage_scale)
x = np.asarray(x_values, dtype=float) / float(spatial_scale)
y = np.asarray(y_values, dtype=float) / float(spatial_scale)
r2 = x**2 + y**2
return np.column_stack(
[
np.ones(len(v), dtype=float),
v,
v**2,
x,
y,
x**2,
y**2,
x * y,
r2,
v * r2,
v * x,
v * y,
]
)
def _robust_joint_linear_fit(feature_matrix, target):
"""Fit a smooth joint correction surface while rejecting large residual outliers."""
x_data = np.asarray(feature_matrix, dtype=float)
y_data = np.asarray(target, dtype=float)
params, *_ = np.linalg.lstsq(x_data, y_data, rcond=None)
for _ in range(3):
residual = y_data - x_data @ params
median = float(np.median(residual))
mad = float(np.median(np.abs(residual - median)))
scale = max(1.4826 * mad, np.finfo(float).eps)
good = np.abs(residual - median) <= 3.5 * scale
if np.count_nonzero(good) < x_data.shape[1]:
break
params, *_ = np.linalg.lstsq(x_data[good], y_data[good], rcond=None)
return params
[docs]
def joint_voltage_bowl_corr_main(
dld_x,
dld_y,
dld_highVoltage,
variables,
det_diam,
calibration_mode='mc',
sample_size=9,
bin_size=0.01,
n_peaks=4,
prominence=100,
distance=500,
sampling_mode='polar',
):
"""Fit a smooth combined voltage-plus-detector correction surface from several peaks."""
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
calib_arr = variables.get_calibration_array(calibration_mode)
sampling_mode = _resolve_sampling_mode(sampling_mode, variables)
dld_x_mm = np.asarray(dld_x, dtype=float) * 10
dld_y_mm = np.asarray(dld_y, dtype=float) * 10
dld_highVoltage = np.asarray(dld_highVoltage, dtype=float)
ensure_matching_lengths(
dld_x_mm,
dld_y_mm,
dld_highVoltage,
calib_arr,
field_names=("dld_x", "dld_y", "dld_highVoltage", f"{calibration_mode}_calibration_values"),
)
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
bin_size = ensure_positive(bin_size, field_name="bin_size")
detected = _auto_detect_peaks(
calib_arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
hist_bin_size=bin_size,
)
all_x_samples = []
all_y_samples = []
all_v_samples = []
all_t_normalized = []
peak_info = []
for peak in detected:
mask = (calib_arr > peak['x1']) & (calib_arr < peak['x2'])
peak_t = calib_arr[mask]
peak_x = dld_x_mm[mask]
peak_y = dld_y_mm[mask]
peak_v = dld_highVoltage[mask]
if len(peak_t) < 100:
continue
maximum_location = _resolve_peak_location(peak_t, 'histogram', bin_size)
if maximum_location <= 0 or not np.isfinite(maximum_location):
continue
peak_samples = _collect_spatial_samples(
peak_x,
peak_y,
peak_t,
maximum_location,
'mean',
sample_size,
bin_size,
sampling_mode=sampling_mode,
det_diam=det_diam,
dld_v=peak_v,
)
n_cells = len(peak_samples['x'])
if n_cells == 0:
continue
all_x_samples.extend(peak_samples['x'].tolist())
all_y_samples.extend(peak_samples['y'].tolist())
all_v_samples.extend(peak_samples['v'].tolist())
all_t_normalized.extend(peak_samples['t'].tolist())
if n_cells > 0:
peak_info.append(
{
'position': peak['position'],
'x1': peak['x1'],
'x2': peak['x2'],
'n_ions': int(np.sum(mask)),
'n_cells': int(n_cells),
}
)
if len(all_t_normalized) < 20:
raise CalibrationInputError("Not enough samples from multi-peak detection for joint voltage/bowl refinement")
v_arr = np.asarray(all_v_samples, dtype=float)
x_arr = np.asarray(all_x_samples, dtype=float)
y_arr = np.asarray(all_y_samples, dtype=float)
t_arr = np.asarray(all_t_normalized, dtype=float)
voltage_center = float(np.median(v_arr))
voltage_scale = max(float(np.std(v_arr)), np.finfo(float).eps)
spatial_scale = max(
float(np.nanmax(np.sqrt(x_arr**2 + y_arr**2))) if len(x_arr) else 0.0,
float(det_diam) / 2.0,
1.0,
)
feature_matrix = _joint_feature_matrix(v_arr, x_arr, y_arr, voltage_center, voltage_scale, spatial_scale)
parameters = _robust_joint_linear_fit(feature_matrix, t_arr)
if not np.all(np.isfinite(parameters)):
raise CalibrationInputError("Joint voltage/bowl fit returned invalid parameters")
all_features = _joint_feature_matrix(
dld_highVoltage,
dld_x_mm,
dld_y_mm,
voltage_center,
voltage_scale,
spatial_scale,
)
correction = np.clip(all_features @ parameters, np.finfo(float).eps, None)
source = variables.dld_t_calib if calibration_mode == 'tof' else variables.mc_calib
corrected = source / correction
if calibration_mode == 'tof':
variables.dld_t_calib = corrected
else:
variables.mc_calib = corrected
model = {
'parameters': [float(value) for value in parameters],
'feature_names': ['bias', 'v', 'v2', 'x', 'y', 'x2', 'y2', 'xy', 'r2', 'v_r2', 'v_x', 'v_y'],
'voltage_center': voltage_center,
'voltage_scale': voltage_scale,
'spatial_scale': spatial_scale,
'peaks_used': len(peak_info),
}
return model, peak_info