from copy import copy
import fast_histogram
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import colors, rcParams
from matplotlib.tri import Triangulation
from scipy.optimize import curve_fit
from scipy.signal import find_peaks, peak_prominences, peak_widths
from pyccapt.calibration.core.exceptions import CalibrationInputError
from pyccapt.calibration.core.validation import (
BOWL_FIT_MODES,
BOWL_SAMPLE_METHODS,
SAMPLE_METHODS,
VOLTAGE_MODES,
ensure_choice,
ensure_matching_lengths,
ensure_non_empty_array,
ensure_positive,
normalize_sampling_mode,
normalize_voltage_model,
)
from pyccapt.calibration.path_utils import save_figure
from pyccapt.calibration.core.correction_models import (
_predict_bowl_model,
_predict_voltage_model,
bowl_corr,
hybrid_calibration_model,
robust_fit,
robust_voltage_fit,
voltage_corr,
)
from pyccapt.calibration.core.diagnostics import (
initial_calibration,
plot_fdm,
plot_selected_statistic,
)
def _extract_peak_mask_and_values(variables, calibration_mode):
"""Return the selected peak mask and values for the requested calibration mode."""
mask = variables.build_calibration_mask(calibration_mode)
values = variables.get_calibration_array(calibration_mode)[mask]
return mask, values
def _resolve_sample_size(sample_size, population_size):
"""Resolve a valid sample size, auto-scaling down if needed."""
if population_size <= 0:
raise CalibrationInputError("No ions available for calibration")
requested = int(ensure_positive(sample_size, field_name="sample_size"))
if requested > population_size:
return max(1, population_size // 10)
return requested
def _build_histogram_bins(values, bin_size):
"""Build stable histogram bin edges/count from the actual data span."""
data = ensure_non_empty_array(values, field_name="histogram_values")
lower = float(np.min(data))
upper = float(np.max(data))
span = max(upper - lower, float(bin_size))
n_bins = max(10, int(np.ceil(span / float(bin_size))))
edges = np.linspace(lower, upper, n_bins + 1)
return edges, n_bins
def _resolve_peak_location(values, method, bin_size, fast_calibration=False):
"""Resolve the reference peak location by histogram/mean/median."""
ensure_choice(method, field_name="maximum_cal_method", allowed=SAMPLE_METHODS)
data = ensure_non_empty_array(values, field_name="peak_values")
if fast_calibration and data.size > 10:
data = np.random.choice(data, int(data.size * 0.1), replace=False)
if method == "mean":
return float(np.mean(data))
if method == "median":
return float(np.median(data))
bins, n_bins = _build_histogram_bins(data, bin_size)
hist = fast_histogram.histogram1d(
data,
bins=n_bins,
range=(np.min(data), np.max(data)),
)
peaks, properties = find_peaks(hist, height=0)
if len(peaks) == 0:
return float(np.mean(data))
index_peak_max_ini = np.argmax(properties["peak_heights"])
return float(bins[peaks[index_peak_max_ini]])
def _radial_bowl_corr(data_xy, a, b, c, d, e):
"""Radial-dominant bowl model where r^2 drives the primary curvature."""
x = np.asarray(data_xy[0], dtype=float)
y = np.asarray(data_xy[1], dtype=float)
r2 = x ** 2 + y ** 2
return a + b * r2 + c * (r2 ** 2) + d * x + e * y
[docs]
def voltage_correction(dld_highVoltage_peak, dld_t_peak, variables, maximum_location, index_fig, figname, sample_size,
mode, calibration_mode, sample_range_max, bin_size, plot=True, save=False,
fig_size=(5, 5), model='curve_fit'):
"""
Performs voltage correction and plots the graph based on the passed arguments.
Parameters:
- dld_highVoltage_peak (array): Array of high voltage peaks.
- dld_t_peak (array): Array of t peaks.
- maximum_location (float): Maximum location value.
- index_fig (string): Index of the saved plot.
- figname (string): Name of the saved plot image.
- sample_size (string): Sample size.
- mode (string): Mode ('ion_seq'/'voltage').
- calibration_mode (string): Type of calibration mode (tof/mc).
- sample_range_max (string): Type of peak_x mode (histogram/mean/median).
- outlier_remove (bool): Indicates whether to remove outliers. Default is True.
- plot (bool): Indicates whether to plot the graph. Default is True.
- save (bool): Indicates whether to save the plot. Default is False.
- fig_size (tuple): Figure size in inches. Default is (7, 5).
- model (string): Type of model ('curve_fit'/'robust_fit'). Default is 'curve_fit'.
- bin_size (float): Size of the bin.
Returns:
- fitresult (array): Corrected voltage array.
"""
model = normalize_voltage_model(model)
ensure_choice(mode, field_name="mode", allowed=VOLTAGE_MODES)
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(sample_range_max, field_name="sample_range_max", allowed=SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
dld_highVoltage_peak = ensure_non_empty_array(dld_highVoltage_peak, field_name="dld_highVoltage_peak")
dld_t_peak = ensure_non_empty_array(dld_t_peak, field_name="dld_t_peak")
ensure_matching_lengths(
dld_highVoltage_peak,
dld_t_peak,
field_names=("dld_highVoltage_peak", "dld_t_peak"),
)
dld_t_peak_list = []
high_voltage_mean_list = []
if mode == 'ion_seq':
for i in range(int(len(dld_highVoltage_peak) / sample_size) + 1):
dld_highVoltage_peak_selected = dld_highVoltage_peak[i * sample_size:(i + 1) * sample_size]
dld_t_peak_selected = dld_t_peak[i * sample_size:(i + 1) * sample_size]
if sample_range_max == 'histogram':
try:
bins, _ = _build_histogram_bins(dld_t_peak_selected, bin_size)
y, x = np.histogram(dld_t_peak_selected, bins=bins)
peaks, properties = find_peaks(y, height=0)
index_peak_max_ini = np.argmax(properties['peak_heights'])
max_peak = peaks[index_peak_max_ini]
dld_t_peak_list.append(x[max_peak] / maximum_location)
# dld_t_peak_list.append(maximum_location / x[max_peak])
mask_v = np.logical_and((dld_t_peak_selected >= x[max_peak] - bin_size)
, (dld_t_peak_selected <= x[max_peak] + bin_size))
if len(dld_highVoltage_peak_selected[mask_v]) == 0:
mask_v = np.logical_and((dld_t_peak_selected >= x[max_peak] - 2 * bin_size)
, (dld_t_peak_selected <= x[max_peak] + 2 * bin_size))
if len(dld_highVoltage_peak_selected[mask_v]) == 0:
print('length of mask voltage', len(dld_highVoltage_peak_selected[mask_v]))
mask_v = np.logical_and((dld_t_peak_selected >= x[max_peak] - 4 * bin_size)
, (dld_t_peak_selected <= x[max_peak] + 4 * bin_size))
print('length of mask voltage after increase the window',
len(dld_highVoltage_peak_selected[mask_v]))
high_voltage_mean_list.append(np.mean(dld_highVoltage_peak_selected[mask_v]))
except ValueError:
# print('cannot find the maximum')
dld_t_mean = np.mean(dld_t_peak_selected)
dld_t_peak_list.append(dld_t_mean / maximum_location)
# dld_t_peak_list.append(maximum_location / dld_t_mean)
high_voltage_mean = np.mean(dld_highVoltage_peak_selected)
high_voltage_mean_list.append(high_voltage_mean)
elif sample_range_max == 'mean':
dld_t_mean = np.mean(dld_t_peak_selected)
dld_t_peak_list.append(dld_t_mean / maximum_location)
# dld_t_peak_list.append(maximum_location / dld_t_mean)
high_voltage_mean = np.mean(dld_highVoltage_peak_selected)
high_voltage_mean_list.append(high_voltage_mean)
elif sample_range_max == 'median':
dld_t_mean = np.median(dld_t_peak_selected)
dld_t_peak_list.append(dld_t_mean / maximum_location)
high_voltage_mean = np.median(dld_highVoltage_peak_selected)
high_voltage_mean_list.append(high_voltage_mean)
elif mode == 'voltage':
for i in range(int((np.max(dld_highVoltage_peak) - np.min(dld_highVoltage_peak)) / sample_size) + 1):
mask = np.logical_and((dld_highVoltage_peak >= (np.min(dld_highVoltage_peak) + (i) * sample_size)),
(dld_highVoltage_peak < (np.min(dld_highVoltage_peak) + (i + 1) * sample_size)))
dld_highVoltage_peak_selected = dld_highVoltage_peak[mask]
dld_t_peak_selected = dld_t_peak[mask]
if sample_range_max == 'histogram':
try:
bins, _ = _build_histogram_bins(dld_t_peak_selected, bin_size)
y, x = np.histogram(dld_t_peak_selected, bins=bins)
peaks, properties = find_peaks(y, height=0)
index_peak_max_ini = np.argmax(properties['peak_heights'])
max_peak = peaks[index_peak_max_ini]
dld_t_peak_list.append(x[max_peak] / maximum_location)
# dld_t_peak_list.append(maximum_location / x[max_peak])
mask_v = np.logical_and((dld_t_peak_selected >= x[max_peak] - bin_size)
, (dld_t_peak_selected <= x[max_peak] + bin_size))
high_voltage_mean_list.append(np.mean(dld_highVoltage_peak_selected[mask_v]))
except ValueError:
dld_t_mean = np.mean(dld_t_peak_selected)
dld_t_peak_list.append(dld_t_mean / maximum_location)
high_voltage_mean = np.mean(dld_highVoltage_peak_selected)
high_voltage_mean_list.append(high_voltage_mean)
elif sample_range_max == 'mean':
dld_t_mean = np.mean(dld_t_peak_selected)
dld_t_peak_list.append(dld_t_mean / maximum_location)
# dld_t_peak_list.append(maximum_location / dld_t_mean)
high_voltage_mean = np.mean(dld_highVoltage_peak_selected)
high_voltage_mean_list.append(high_voltage_mean)
elif sample_range_max == 'median':
dld_t_mean = np.median(dld_t_peak_selected)
dld_t_peak_list.append(dld_t_mean / maximum_location)
# dld_t_peak_list.append(maximum_location / dld_t_mean)
high_voltage_mean = np.median(dld_highVoltage_peak_selected)
high_voltage_mean_list.append(high_voltage_mean)
if model == 'curve_fit':
fitresult, _ = curve_fit(voltage_corr, np.array(high_voltage_mean_list), np.array(dld_t_peak_list))
elif model == 'robust_fit':
fitresult = robust_voltage_fit(np.array(high_voltage_mean_list), np.array(dld_t_peak_list))
if plot or save:
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
label = 't'
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
label = 'mc'
x = plt.scatter(np.array(high_voltage_mean_list) / 1000, np.array(dld_t_peak_list) * maximum_location,
color="forestgreen", label=r"$%s_{wp}$" % label, s=5)
# x = plt.scatter(np.array(high_voltage_mean_list) / 1000, maximum_location / np.array(dld_t_peak_list),
# color="forestgreen", label=r"$%s_{wp}$" % label, s=5)
ax1.set_xlabel("Voltage (kV)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
ax2 = ax1.twinx()
f_v = _predict_voltage_model(model, fitresult, np.array(high_voltage_mean_list))
y = ax2.plot(np.array(high_voltage_mean_list) / 1000, np.sqrt(f_v), color='r', label=r"$C_V$")
# y = ax2.plot(np.array(high_voltage_mean_list) / 1000, f_v, color='r', label=r"$C_V$")
ax2.set_ylabel(r"$C_V$", color="red", fontsize=10) # Get the current axis
ax2.tick_params(axis='y', colors='red') # Change color and thickness of tick labels on y-axis
ax2.spines['right'].set_color('red') # Change color of right border
plt.legend(handles=[x, y[0]], loc='lower left', markerscale=5., prop={'size': 10})
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig1,
directory=variables.result_path,
stem=f"vol_corr_{figname}_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
return fitresult
[docs]
def voltage_corr_main(dld_highVoltage, variables, sample_size, mode, calibration_mode, index_fig, plot, save,
maximum_cal_method='mean', maximum_sample_method='mean', fig_size=(5, 5), fast_calibration=False,
bin_size=0.01, model='curve_fit', peak_maximum=0, calibration_apply=True):
"""
Perform voltage correction on the given data.
Args:
dld_highVoltage (numpy.ndarray): Array of high voltages.
sample_size (int): Size of the sample.
mode (str): Mode of the correction.
calibration_mode (str): Calibration mode ('tof' or 'mc').
index_fig (int): Index of the figure.
plot (bool): Whether to plot the results.
save (bool): Whether to save the plots.
noise_remove (bool, optional): Whether to remove noise. Defaults to True.
maximum_cal_method (str, optional): Maximum calculation method ('mean', 'histogram', 'median').
maximum_sample_method (str, optional): Sample range maximum ('mean', 'histogram', 'median').
fig_size (tuple, optional): Size of the figure. Defaults to (5, 5).
fast_calibration (bool, optional): Whether to perform fast calibration. Defaults to False.
bin_size (float, optional): Size of the bin. Defaults to 0.01.
"""
model = normalize_voltage_model(model)
ensure_choice(mode, field_name="mode", allowed=VOLTAGE_MODES)
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(maximum_cal_method, field_name="maximum_cal_method", allowed=SAMPLE_METHODS)
ensure_choice(maximum_sample_method, field_name="maximum_sample_method", allowed=SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
all_calibration_values = variables.get_calibration_array(calibration_mode)
ensure_matching_lengths(
dld_highVoltage,
all_calibration_values,
field_names=("dld_highVoltage", f"{calibration_mode}_calibration_values"),
)
left_edge, right_edge = variables.get_calibration_peak_range(calibration_mode)
print('The left and right side of the main peak is:', left_edge, right_edge)
mask_temporal, dld_peak_b = _extract_peak_mask_and_values(variables, calibration_mode)
dld_highVoltage_peak_v = np.asarray(dld_highVoltage)[mask_temporal]
print('The number of ions is:', len(dld_highVoltage_peak_v))
sample_size = _resolve_sample_size(sample_size, len(dld_highVoltage_peak_v))
print('The number of samples is:', int(len(dld_highVoltage_peak_v) / sample_size))
if peak_maximum == 0:
maximum_location = _resolve_peak_location(
dld_peak_b,
method=maximum_cal_method,
bin_size=bin_size,
fast_calibration=fast_calibration,
)
else:
maximum_location = float(peak_maximum)
print('The maximum/mean/median of histogram is located at:', maximum_location)
print('The high voltage ranges are:', np.min(dld_highVoltage_peak_v), np.max(dld_highVoltage_peak_v))
mean_before = np.mean(dld_peak_b)
print('The mean of tof/mc before voltage calibration is:', mean_before)
fitresult = voltage_correction(dld_highVoltage_peak_v, dld_peak_b, variables,
maximum_location, index_fig=index_fig,
figname='voltage_corr',
sample_size=sample_size, mode=mode, calibration_mode=calibration_mode,
sample_range_max=maximum_sample_method, bin_size=bin_size,
plot=plot, save=save, fig_size=fig_size, model=model)
calibration_mc_tof = np.copy(variables.dld_t_calib) if calibration_mode == 'tof' else np.copy(variables.mc_calib)
print('The fit result is:', fitresult)
mask_fv = np.ones_like(dld_highVoltage, dtype=bool)
f_v = _predict_voltage_model(model, fitresult, np.asarray(dld_highVoltage)[mask_fv])
if calibration_mode == 'tof':
correction_factor = np.sqrt(f_v)
else:
correction_factor = f_v
print("Maximum value of correction factor:", np.max(correction_factor))
print("Minimum value of correction factor:", np.min(correction_factor))
calibration_mc_tof[mask_fv] = calibration_mc_tof[mask_fv] / correction_factor
if plot or save:
# Plot how correction factor for selected peak_x
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
if len(dld_highVoltage_peak_v) > 1000:
mask = np.random.randint(0, len(dld_highVoltage_peak_v), 1000)
else:
mask = np.arange(len(dld_highVoltage_peak_v))
x = plt.scatter(dld_highVoltage_peak_v[mask] / 1000, dld_peak_b[mask], color="blue", label=r"$t$", s=1)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel("Voltage (V)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
# Plot high voltage curve
ax2 = ax1.twinx()
f_v_plot = _predict_voltage_model(model, fitresult, dld_highVoltage_peak_v)
correction_plot = 1 / np.sqrt(f_v_plot) if calibration_mode == 'tof' else 1 / f_v_plot
y = ax2.plot(dld_highVoltage_peak_v / 1000, correction_plot, color='r', label=r"$C_{V}^{-1}$")
# y = ax2.plot(dld_highVoltage_peak_v / 1000, f_v_plot, color='r', label=r"$C_{V}^{-1}$")
ax2.set_ylabel(r"$C_{V}^{-1}$", color="red", fontsize=10)
ax2.tick_params(axis='y', colors='red') # Change color and thickness of tick labels on y-axis
ax2.spines['right'].set_color('red') # Change color of right border
plt.legend(handles=[x, y[0]], loc='upper left', markerscale=5., prop={'size': 10})
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig1,
directory=variables.result_path,
stem=f"vol_corr_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
plt.show()
# Plot corrected tof/mc vs. uncalibrated tof/mc
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
x = plt.scatter(dld_highVoltage_peak_v[mask] / 1000, dld_peak_b[mask], color="blue", label='t', s=1)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel("Voltage (kV)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
dld_t_plot = dld_peak_b * correction_plot
# dld_t_plot = dld_peak_b * f_v_plot
y = plt.scatter(dld_highVoltage_peak_v[mask] / 1000, dld_t_plot[mask], color="red", label=r"$t_{C_{V}}$",
s=1)
plt.legend(handles=[x, y], loc='upper right', markerscale=5., prop={'size': 10})
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_V_corr_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
mean_after = np.mean(calibration_mc_tof[mask_temporal])
print('The mean of tof/mc after voltage calibration is:', mean_after)
print('The difference between the mean of tof/mc before and after voltage calibration is:',
mean_after - mean_before)
if calibration_apply:
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
elif calibration_mode == 'mc':
variables.mc_calib = calibration_mc_tof
return f_v
def _resolve_sampling_mode(sampling_mode, variables=None):
"""Resolve sampling mode with a variable-level default fallback."""
configured = sampling_mode
if configured is None and variables is not None:
configured = getattr(variables, "bowl_sampling_mode", "polar")
mode = normalize_sampling_mode(configured)
if variables is not None and hasattr(variables, "bowl_sampling_mode"):
variables.bowl_sampling_mode = mode
return mode
def _cell_peak_value(values, maximum_location, sample_range_max, bin_size):
if sample_range_max == 'mean':
return float(np.mean(values)) / maximum_location
values = np.asarray(values, dtype=float)
if values.size == 0:
return float('nan')
value_span = float(np.max(values) - np.min(values))
if not np.isfinite(value_span) or value_span <= 0:
return float(np.mean(values)) / maximum_location
bins, n_bins = _build_histogram_bins(values, bin_size)
y_hist = fast_histogram.histogram1d(
values,
bins=n_bins,
range=(np.min(values), np.max(values)),
)
peaks, properties = find_peaks(y_hist, height=0)
if len(peaks) == 0:
return float(np.mean(values)) / maximum_location
index_peak_max_ini = np.argmax(properties['peak_heights'])
return float(bins[peaks[index_peak_max_ini]]) / maximum_location
def _iter_polar_cells(radial_distance, sample_size, det_diam):
r_max_data = float(np.max(radial_distance)) if len(radial_distance) else 0.0
r_max_detector = max(0.0, float(det_diam) / 2.0)
r_max = max(r_max_data, r_max_detector, float(sample_size))
n_rings = max(1, int(np.ceil(r_max / float(sample_size))))
ring_edges = np.linspace(0.0, r_max, n_rings + 1)
for r_idx in range(n_rings):
r_min_i = float(ring_edges[r_idx])
r_max_i = float(ring_edges[r_idx + 1])
r_mid = 0.5 * (r_min_i + r_max_i)
circumference = 2.0 * np.pi * max(r_mid, 0.5 * float(sample_size))
n_sectors = int(np.clip(np.ceil(circumference / float(sample_size)), 8, 96))
for theta_idx in range(n_sectors):
theta_min = float(theta_idx * 2.0 * np.pi / n_sectors)
theta_max = float((theta_idx + 1) * 2.0 * np.pi / n_sectors)
yield r_min_i, r_max_i, theta_min, theta_max
def _collect_spatial_samples(
dld_x,
dld_y,
dld_t,
maximum_location,
sample_range_max,
sample_size,
bin_size,
*,
sampling_mode,
det_diam,
dld_v=None,
):
x_samples = []
y_samples = []
t_samples = []
v_samples = []
if sampling_mode == 'polar':
radial_distance = np.hypot(dld_x, dld_y)
polar_angle = np.mod(np.arctan2(dld_y, dld_x), 2.0 * np.pi)
for r_min_i, r_max_i, theta_min, theta_max in _iter_polar_cells(radial_distance, sample_size, det_diam):
mask = (
(radial_distance >= r_min_i)
& (radial_distance < r_max_i)
& (polar_angle >= theta_min)
& (polar_angle < theta_max)
)
if not np.any(mask):
continue
cell_t = dld_t[mask]
normalized_value = _cell_peak_value(cell_t, maximum_location, sample_range_max, bin_size)
if not np.isfinite(normalized_value) or normalized_value <= 0:
continue
x_samples.append(float(np.median(dld_x[mask])))
y_samples.append(float(np.median(dld_y[mask])))
t_samples.append(normalized_value)
if dld_v is not None:
v_samples.append(float(np.mean(dld_v[mask])))
else:
cell_size = float(sample_size)
x_min = float(np.floor(np.min(dld_x)))
x_max = float(np.ceil(np.max(dld_x)))
y_min = float(np.floor(np.min(dld_y)))
y_max = float(np.ceil(np.max(dld_y)))
n_cols = max(1, int(np.ceil((x_max - x_min) / cell_size)))
n_rows = max(1, int(np.ceil((y_max - y_min) / cell_size)))
x_bin = np.floor((dld_x - x_min) / cell_size).astype(int)
y_bin = np.floor((dld_y - y_min) / cell_size).astype(int)
valid = (x_bin >= 0) & (x_bin < n_cols) & (y_bin >= 0) & (y_bin < n_rows)
if np.any(valid):
cell_id = y_bin[valid] * n_cols + x_bin[valid]
order = np.argsort(cell_id, kind='mergesort')
x_sorted = dld_x[valid][order]
y_sorted = dld_y[valid][order]
t_sorted = dld_t[valid][order]
v_sorted = dld_v[valid][order] if dld_v is not None else None
cell_sorted = cell_id[order]
_, starts, counts = np.unique(cell_sorted, return_index=True, return_counts=True)
for start, count in zip(starts, counts):
stop = start + count
cell_t = t_sorted[start:stop]
normalized_value = _cell_peak_value(cell_t, maximum_location, sample_range_max, bin_size)
if not np.isfinite(normalized_value) or normalized_value <= 0:
continue
x_samples.append(float(np.median(x_sorted[start:stop])))
y_samples.append(float(np.median(y_sorted[start:stop])))
t_samples.append(normalized_value)
if v_sorted is not None:
v_samples.append(float(np.mean(v_sorted[start:stop])))
result = {
'x': np.asarray(x_samples, dtype=float),
'y': np.asarray(y_samples, dtype=float),
't': np.asarray(t_samples, dtype=float),
}
if dld_v is not None:
result['v'] = np.asarray(v_samples, dtype=float)
return result
[docs]
def bowl_correction(dld_x_bowl, dld_y_bowl, dld_t_bowl, variables, det_diam, maximum_location, sample_range_max,
sample_size, calibration_mode, fit_mode, index_fig, plot, save, fig_size=(7, 5), bin_size=0.01,
sampling_mode='polar'):
"""
Perform bowl correction on the input data.
Args:
dld_x_bowl (numpy.ndarray): X coordinates of the data points.
dld_y_bowl (numpy.ndarray): Y coordinates of the data points.
dld_t_bowl (numpy.ndarray): Time values of the data points.
det_diam (float): Diameter of the detector.
maximum_location (float): Maximum location for normalization.
sample_range_max (str, optional): Sample range maximum ('mean' or 'histogram').
sample_size (int): Size of each rectangle in mm.
calibration_mode (str): Calibration mode ('tof' or 'mc').
fit_mode (str): Fit mode ('curve_fit' or 'hemisphere_fit').
index_fig (int): Index for figure naming.
plot (bool): Flag indicating whether to plot the surface.
save (bool): Flag indicating whether to save the plot.
fig_size (tuple): Size of the figure.
bin_size (float): Size of the bin.
Returns:
parameters (numpy.ndarray): Optimized parameters of the bowl correction.
"""
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(fit_mode, field_name="fit_mode", allowed=BOWL_FIT_MODES)
ensure_choice(sample_range_max, field_name="sample_range_max", allowed=BOWL_SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
dld_x_bowl = ensure_non_empty_array(dld_x_bowl, field_name="dld_x_bowl")
dld_y_bowl = ensure_non_empty_array(dld_y_bowl, field_name="dld_y_bowl")
dld_t_bowl = ensure_non_empty_array(dld_t_bowl, field_name="dld_t_bowl")
ensure_matching_lengths(
dld_x_bowl,
dld_y_bowl,
dld_t_bowl,
field_names=("dld_x_bowl", "dld_y_bowl", "dld_t_bowl"),
)
sampling_mode = _resolve_sampling_mode(sampling_mode, variables)
samples = _collect_spatial_samples(
np.asarray(dld_x_bowl, dtype=float),
np.asarray(dld_y_bowl, dtype=float),
np.asarray(dld_t_bowl, dtype=float),
maximum_location,
sample_range_max,
sample_size,
bin_size,
sampling_mode=sampling_mode,
det_diam=det_diam,
)
x_samples = samples['x']
y_samples = samples['y']
t_samples = samples['t']
if len(x_samples) == 0:
raise CalibrationInputError('No detector windows contained ions for bowl correction')
print('x_sample_list max and min:', np.max(x_samples), np.min(x_samples))
print('y_sample_list max and min:', np.max(y_samples), np.min(y_samples))
print('dld_t_peak_list max and min:', np.max(t_samples), np.min(t_samples))
r2_samples = x_samples ** 2 + y_samples ** 2
radial_design = np.column_stack(
[
np.ones(len(x_samples), dtype=float),
r2_samples,
r2_samples ** 2,
x_samples,
y_samples,
]
)
if fit_mode == 'curve_fit':
if sampling_mode == 'polar':
radial_parameters, _ = curve_fit(
_radial_bowl_corr,
[x_samples, y_samples],
t_samples,
)
parameters = {
'model': 'radial_curve_fit',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters, _ = curve_fit(bowl_corr, [x_samples, y_samples], t_samples)
elif fit_mode == 'ml_fit':
parameters = hybrid_calibration_model(x_samples, y_samples, t_samples)
elif fit_mode == 'robust_fit':
if sampling_mode == 'polar':
radial_parameters = _robust_joint_linear_fit(radial_design, t_samples)
parameters = {
'model': 'radial_linear',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters = robust_fit(x_samples, y_samples, t_samples)
if plot or save:
if calibration_mode == 'tof':
label = 't'
elif calibration_mode == 'mc':
label = 'mc'
model_x_data = np.array(x_samples)
model_y_data = np.array(y_samples)
X, Y = np.meshgrid(model_x_data, model_y_data)
Z = _predict_bowl_model(fit_mode, parameters, X.ravel(), Y.ravel()).reshape(X.shape)
fig, ax = plt.subplots(figsize=fig_size, subplot_kw=dict(projection="3d"), constrained_layout=True)
box = ax.get_position()
ax.set_position([box.x0 + 0.1, box.y0 + 0.1, box.width * 0.75, box.height * 0.75])
scat = ax.scatter(model_x_data, model_y_data, zs=1 / np.array(t_samples), color="forestgreen",
label=r"$%s_{wp}$" % label, s=3)
fig.add_axes(ax)
cmap = copy(plt.cm.plasma)
cmap.set_bad(cmap(0))
x_flat = X.flatten()
y_flat = Y.flatten()
z_flat = Z.flatten()
triangles = Triangulation(x_flat, y_flat)
# surf = ax.plot_surface(X, Y, 1 / Z, color='red', alpha=0.05, label='bowl')
surf = ax.plot_trisurf(
x_flat, y_flat, 1 / z_flat, triangles=triangles.triangles, cmap=cmap, alpha=0.6
)
ax.set_xlabel(r'$X_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_ylabel(r'$Y_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_zlabel(r"${C_B}$", fontsize=10, labelpad=5, color='red')
ax.zaxis.line.set_color('red')
# Change z-axis tick label color
for tick in ax.get_zaxis().get_ticklabels():
tick.set_color('red')
ax.view_init(elev=7, azim=-41)
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig,
directory=variables.result_path,
stem=f"bowl_corr_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
print('The parameters of the bowl correction are:', parameters)
return parameters
[docs]
def bowl_correction_main(dld_x, dld_y, dld_highVoltage, variables, det_diam, sample_size, fit_mode, calibration_mode,
index_fig, plot, save, maximum_cal_method='mean', maximum_sample_method='mean',
fig_size=(5, 5), fast_calibration=False, bin_size=0.01, peak_maximum=0, calibration_apply=True,
sampling_mode='polar'):
"""
Perform bowl correction on the input data and plot the results.
Args:
dld_x (numpy.ndarray): X positions.
dld_y (numpy.ndarray): Y positions.
dld_highVoltage (numpy.ndarray): High voltage values.
det_diam (float): Detector diameter.
sample_size (int): Sample size.
fit_mode (str): Fit mode ('curve_fit', 'ml_fit', or 'robust_fit').
calibration_mode (str): Calibration mode ('tof' or 'mc').
index_fig (int): Index figure.
plot (bool): Flag indicating whether to plot the results.
save (bool): Flag indicating whether to save the plots.
maximum_cal_method (str, optional): Maximum calculation method ('mean' or 'histogram').
maximum_sample_method (str, optional): Sample range maximum ('mean' or 'histogram').
fig_size (tuple, optional): Figure size.
fast_calibration (bool, optional): Flag indicating whether to perform fast calibration.
bin_size (float, optional): Size of the bin.
Returns:
None
"""
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
ensure_choice(fit_mode, field_name="fit_mode", allowed=BOWL_FIT_MODES)
ensure_choice(maximum_cal_method, field_name="maximum_cal_method", allowed=SAMPLE_METHODS)
ensure_choice(maximum_sample_method, field_name="maximum_sample_method", allowed=BOWL_SAMPLE_METHODS)
bin_size = ensure_positive(bin_size, field_name="bin_size")
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
dld_x = np.asarray(dld_x) * 10 # convert cm to mm
dld_y = np.asarray(dld_y) * 10 # convert cm to mm
dld_highVoltage = np.asarray(dld_highVoltage)
all_calibration_values = variables.get_calibration_array(calibration_mode)
ensure_matching_lengths(
dld_x,
dld_y,
dld_highVoltage,
all_calibration_values,
field_names=("dld_x", "dld_y", "dld_highVoltage", f"{calibration_mode}_calibration_values"),
)
left_edge, right_edge = variables.get_calibration_peak_range(calibration_mode)
print('The left and right side of the main peak is:', left_edge, right_edge)
mask_temporal, dld_peak = _extract_peak_mask_and_values(variables, calibration_mode)
print('The number of ions is:', len(dld_peak))
if peak_maximum == 0:
maximum_location = _resolve_peak_location(
dld_peak,
method=maximum_cal_method,
bin_size=bin_size,
fast_calibration=fast_calibration,
)
else:
maximum_location = float(peak_maximum)
print('The maximum/mean of peak is located at:', maximum_location)
dld_x_peak = dld_x[mask_temporal]
dld_y_peak = dld_y[mask_temporal]
dld_highVoltage_peak = dld_highVoltage[mask_temporal]
mean_before = np.mean(dld_peak)
print('The mean of tof before bowl calibration is:', mean_before)
parameters = bowl_correction(dld_x_peak, dld_y_peak, dld_peak, variables, det_diam, maximum_location,
maximum_sample_method, sample_size=sample_size, calibration_mode=calibration_mode,
fit_mode=fit_mode, index_fig=index_fig, plot=plot, save=save, fig_size=fig_size,
bin_size=bin_size, sampling_mode=sampling_mode)
print('The fit result is:', parameters)
mask_fv = np.ones_like(dld_x, dtype=bool)
f_bowl = _predict_bowl_model(fit_mode, parameters, dld_x[mask_fv], dld_y[mask_fv])
calibration_mc_tof = np.copy(variables.dld_t_calib) if calibration_mode == 'tof' else np.copy(variables.mc_calib)
print("Maximum value of f_bowl:", np.max(f_bowl))
print("Minimum value of f_bowl:", np.min(f_bowl))
calibration_mc_tof[mask_fv] = calibration_mc_tof[mask_fv] / f_bowl
# calibration_mc_tof[mask_fv] = calibration_mc_tof[mask_fv] * f_bowl
mean_after = np.mean(calibration_mc_tof[mask_temporal])
print('The mean of tof/mc after bowl calibration is:', mean_after)
print('The difference between the mean of tof before and after bowl calibration is:',
mean_after - mean_before)
if plot or save:
# Plot how bowl correct tof/mc vs high voltage
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
mask = np.random.randint(0, len(dld_highVoltage_peak), 10000)
x = plt.scatter(dld_highVoltage_peak[mask] / 1000, dld_peak[mask], color="blue", label=r"$t$", s=1)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel("Voltage (kV)", fontsize=10)
plt.grid(alpha=0.3, linestyle='-.', linewidth=0.4)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
y = plt.scatter(dld_highVoltage_peak[mask] / 1000, dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1)
plt.legend(handles=[x, y], loc='upper right', markerscale=5., prop={'size': 10})
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
# Plot how bowl correction correct tof/mc vs dld_x position
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
x = plt.scatter(dld_x_peak[mask], dld_peak[mask], color="blue", label=r"$t$", s=1, alpha=0.5)
y = plt.scatter(dld_x_peak[mask], dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1, alpha=0.5)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel(r"$X_{det}$ (mm)", fontsize=10)
plt.grid(color='aqua', alpha=0.3, linestyle='-.', linewidth=0.4)
plt.legend(handles=[x, y], loc='upper right', markerscale=5., prop={'size': 10})
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_p_x_det_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
# Plot how bowl correction correct tof/mc vs dld_x position
fig1, ax1 = plt.subplots(figsize=fig_size, constrained_layout=True)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
x = plt.scatter(dld_y_peak[mask], dld_peak[mask], color="blue", label=r"$t$", s=1, alpha=0.5)
y = plt.scatter(dld_y_peak[mask], dld_t_plot, color="red", label=r"$t_{C_{B}}$", s=1, alpha=0.5)
if calibration_mode == 'tof':
ax1.set_ylabel("Time of Flight (ns)", fontsize=10)
elif calibration_mode == 'mc':
ax1.set_ylabel("mc (Da)", fontsize=10)
ax1.set_xlabel(r"$y_{det}$ (mm)", fontsize=10)
plt.grid(color='aqua', alpha=0.3, linestyle='-.', linewidth=0.4)
plt.legend(handles=[x, y], loc='upper right', markerscale=5., prop={'size': 10})
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig1,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_p_y_det_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
fig, ax = plt.subplots(figsize=fig_size, subplot_kw=dict(projection="3d"), constrained_layout=True)
# Adjust the subplot parameters to make the plot smaller
box = ax.get_position()
ax.set_position([box.x0 + 0.1, box.y0 + 0.1, box.width * 0.75, box.height * 0.75])
mask = np.random.randint(0, len(dld_highVoltage_peak), 500)
f_bowl_plot = _predict_bowl_model(fit_mode, parameters, dld_x_peak[mask], dld_y_peak[mask])
dld_t_plot = dld_peak[mask] / f_bowl_plot
scat_1 = ax.scatter(dld_x_peak[mask], dld_y_peak[mask], zs=dld_peak[mask], color="blue",
label=r"$t$", s=1)
scat_2 = ax.scatter(dld_x_peak[mask], dld_y_peak[mask], zs=dld_t_plot, color="red",
label=r"$t_{C_{B}}$", s=1)
plt.legend(handles=[scat_1, scat_2], loc='upper left', markerscale=5., prop={'size': 10})
ax.set_xlabel(r'$X_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_ylabel(r'$Y_{det}$ (mm)', fontsize=10, labelpad=10)
ax.set_zlabel(r"Time of Flight (ns)", fontsize=10, labelpad=5)
ax.view_init(elev=7, azim=-41)
if save:
# Enable rendering for text elements
rcParams['svg.fonttype'] = 'none'
save_figure(
fig,
directory=variables.result_path,
stem=f"peak_tof_bowl_corr_3d_{index_fig}",
formats=("svg", "png"),
dpi=600,
)
if plot:
plt.show()
if calibration_apply:
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
elif calibration_mode == 'mc':
variables.mc_calib = calibration_mc_tof
return f_bowl
def _auto_detect_peaks(calibration_array, n_peaks=3, prominence=100, distance=500, hist_bin_size=0.1):
"""Auto-detect the top N prominent peaks in the calibration spectrum."""
arr = np.asarray(calibration_array, dtype=float)
arr = arr[np.isfinite(arr)]
if arr.size < 50:
raise CalibrationInputError("Not enough valid ions for multi-peak calibration")
mc_min = float(np.percentile(arr, 0.1))
mc_max = float(np.percentile(arr, 99.9))
trimmed = arr[(arr >= mc_min) & (arr <= mc_max)]
if trimmed.size < 50:
raise CalibrationInputError("Not enough ions remain after trimming for multi-peak calibration")
hist_edges, _ = _build_histogram_bins(trimmed, max(hist_bin_size, 1e-6))
hist_y = np.histogram(trimmed, bins=hist_edges)[0]
hist_x = (hist_edges[:-1] + hist_edges[1:]) / 2
if hist_y.size >= 5:
kernel_size = min(9, hist_y.size if hist_y.size % 2 == 1 else hist_y.size - 1)
kernel_size = max(3, kernel_size)
kernel = np.ones(kernel_size, dtype=float) / float(kernel_size)
hist_y_eval = np.convolve(hist_y.astype(float), kernel, mode='same')
else:
hist_y_eval = hist_y.astype(float)
peaks_found = np.array([], dtype=int)
prominence_trials = [
max(1.0, float(prominence)),
max(1.0, float(prominence) * 0.5),
max(1.0, float(prominence) * 0.25),
max(1.0, float(np.max(hist_y_eval)) * 0.05),
]
for prominence_value in prominence_trials:
peaks_found, _ = find_peaks(
hist_y_eval,
prominence=prominence_value,
distance=max(1, int(distance)),
height=0,
)
if len(peaks_found) > 0:
break
if len(peaks_found) == 0:
peaks_found, _ = find_peaks(
hist_y_eval,
prominence=max(1.0, float(np.max(hist_y_eval)) * 0.02),
distance=max(1, int(distance) // 2),
height=0,
)
if len(peaks_found) == 0:
raise CalibrationInputError("No peaks found for multi-peak calibration")
prom = peak_prominences(hist_y_eval, peaks_found)
pw = peak_widths(hist_y_eval, peaks_found, rel_height=0.5)
sorted_idx = np.argsort(prom[0])[::-1]
top_n = min(n_peaks, len(sorted_idx))
results = []
for pi in sorted_idx[:top_n]:
peak_pos = float(hist_x[peaks_found[pi]])
left = float(np.interp(pw[2][pi], np.arange(len(hist_x)), hist_x))
right = float(np.interp(pw[3][pi], np.arange(len(hist_x)), hist_x))
width = max(right - left, float(hist_bin_size) * 3)
margin = max(float(hist_bin_size) * 2, width * 0.25)
x1 = max(mc_min, left - margin)
x2 = min(mc_max, right + margin)
if x2 <= x1:
continue
candidate = {
'position': peak_pos,
'x1': x1,
'x2': x2,
'prominence': float(prom[0][pi]),
'width': float(width),
}
if any(not (candidate['x2'] <= existing['x1'] or existing['x2'] <= candidate['x1']) for existing in results):
continue
results.append(candidate)
if not results:
raise CalibrationInputError("Peak windows could not be resolved for multi-peak calibration")
return results
[docs]
def auto_detect_reference_peaks(calibration_array, n_peaks=6, prominence=100, distance=500, hist_bin_size=0.1):
"""Public helper for stable multi-peak evaluation windows used by auto calibration."""
return _auto_detect_peaks(
calibration_array,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
hist_bin_size=hist_bin_size,
)
[docs]
def multi_peak_voltage_corr_main(
dld_highVoltage,
variables,
calibration_mode='mc',
model='robust_fit',
bin_size=0.01,
n_peaks=3,
prominence=100,
distance=500,
):
"""Voltage correction using multiple auto-detected peaks simultaneously."""
model = normalize_voltage_model(model)
calib_arr = variables.get_calibration_array(calibration_mode)
dld_hv = np.asarray(dld_highVoltage, dtype=float)
detected = _auto_detect_peaks(
calib_arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
)
all_v_means = []
all_t_normalized = []
peak_info = []
for pk in detected:
x1, x2, peak_pos = pk['x1'], pk['x2'], pk['position']
mask = (calib_arr > x1) & (calib_arr < x2)
peak_ions = calib_arr[mask]
voltage_ions = dld_hv[mask]
if len(peak_ions) < 100:
continue
maximum_location = _resolve_peak_location(peak_ions, 'histogram', bin_size)
if maximum_location == 0:
continue
sample_size = max(1, int(len(voltage_ions) / 100))
for i in range(0, len(voltage_ions), sample_size):
chunk_v = voltage_ions[i:i + sample_size]
chunk_t = peak_ions[i:i + sample_size]
if len(chunk_t) < 5:
continue
mean_voltage = float(np.mean(chunk_v))
normalized_value = float(np.mean(chunk_t)) / maximum_location
if not np.isfinite(mean_voltage) or not np.isfinite(normalized_value) or normalized_value <= 0:
continue
all_v_means.append(mean_voltage)
all_t_normalized.append(normalized_value)
peak_info.append({
'position': peak_pos,
'x1': x1,
'x2': x2,
'n_ions': int(np.sum(mask)),
})
if len(all_v_means) < 3:
raise CalibrationInputError(
"Not enough data points from multi-peak detection for voltage correction"
)
v_arr = np.asarray(all_v_means)
t_arr = np.asarray(all_t_normalized)
if model == 'robust_fit':
fitresult = robust_voltage_fit(v_arr, t_arr)
elif model == 'curve_fit':
fitresult, _ = curve_fit(voltage_corr, v_arr, t_arr)
predicted_fit = _predict_voltage_model(model, fitresult, v_arr)
if not np.all(np.isfinite(np.asarray(predicted_fit, dtype=float))):
raise CalibrationInputError("Voltage fit returned invalid parameters")
f_v = np.clip(
_predict_voltage_model(model, fitresult, dld_hv),
np.finfo(float).eps,
None,
)
correction_factor = np.sqrt(f_v) if calibration_mode == 'tof' else f_v
source = variables.dld_t_calib if calibration_mode == 'tof' else variables.mc_calib
calibration_mc_tof = source / correction_factor
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
else:
variables.mc_calib = calibration_mc_tof
return fitresult, peak_info
[docs]
def multi_peak_bowl_corr_main(
dld_x,
dld_y,
dld_highVoltage,
variables,
det_diam,
calibration_mode='mc',
fit_mode='robust_fit',
sample_size=9,
bin_size=0.01,
n_peaks=3,
prominence=100,
distance=500,
sampling_mode='polar',
):
"""Bowl correction using multiple auto-detected peaks simultaneously."""
ensure_choice(fit_mode, field_name="fit_mode", allowed=BOWL_FIT_MODES)
calib_arr = variables.get_calibration_array(calibration_mode)
sampling_mode = _resolve_sampling_mode(sampling_mode, variables)
dld_x_mm = np.asarray(dld_x) * 10
dld_y_mm = np.asarray(dld_y) * 10
detected = _auto_detect_peaks(
calib_arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
)
all_x_samples = []
all_y_samples = []
all_t_normalized = []
n_used = 0
for pk in detected:
x1, x2 = pk['x1'], pk['x2']
mask = (calib_arr > x1) & (calib_arr < x2)
peak_t = calib_arr[mask]
peak_x = dld_x_mm[mask]
peak_y = dld_y_mm[mask]
if len(peak_t) < 100:
continue
maximum_location = _resolve_peak_location(peak_t, 'histogram', bin_size)
if maximum_location == 0:
continue
peak_samples = _collect_spatial_samples(
peak_x,
peak_y,
peak_t,
maximum_location,
'mean',
sample_size,
bin_size,
sampling_mode=sampling_mode,
det_diam=det_diam,
)
if len(peak_samples['x']) == 0:
continue
all_x_samples.extend(peak_samples['x'].tolist())
all_y_samples.extend(peak_samples['y'].tolist())
all_t_normalized.extend(peak_samples['t'].tolist())
n_used += 1
if len(all_x_samples) < 5:
raise CalibrationInputError(
"Not enough spatial data from multi-peak detection for bowl correction"
)
x_arr = np.array(all_x_samples)
y_arr = np.array(all_y_samples)
t_arr = np.array(all_t_normalized)
if fit_mode == 'curve_fit':
if sampling_mode == 'polar':
radial_parameters, _ = curve_fit(_radial_bowl_corr, [x_arr, y_arr], t_arr)
parameters = {
'model': 'radial_curve_fit',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters, _ = curve_fit(bowl_corr, [x_arr, y_arr], t_arr)
elif fit_mode == 'ml_fit':
parameters = hybrid_calibration_model(x_arr, y_arr, t_arr)
elif fit_mode == 'robust_fit':
if sampling_mode == 'polar':
r2_arr = x_arr ** 2 + y_arr ** 2
radial_design = np.column_stack([
np.ones(len(x_arr), dtype=float),
r2_arr,
r2_arr ** 2,
x_arr,
y_arr,
])
radial_parameters = _robust_joint_linear_fit(radial_design, t_arr)
parameters = {
'model': 'radial_linear',
'parameters': [float(value) for value in radial_parameters],
'sampling_mode': sampling_mode,
}
else:
parameters = robust_fit(x_arr, y_arr, t_arr)
if isinstance(parameters, dict):
is_finite = np.all(np.isfinite(np.asarray(parameters.get('parameters', []), dtype=float)))
elif fit_mode == 'robust_fit':
trial_prediction = _predict_bowl_model(fit_mode, parameters, x_arr, y_arr)
is_finite = np.all(np.isfinite(np.asarray(trial_prediction, dtype=float)))
else:
is_finite = np.all(np.isfinite(np.asarray(parameters, dtype=float)))
if not is_finite:
raise CalibrationInputError("Bowl fit returned invalid parameters")
f_bowl = np.clip(
_predict_bowl_model(fit_mode, parameters, dld_x_mm, dld_y_mm),
np.finfo(float).eps,
None,
)
source = variables.dld_t_calib if calibration_mode == 'tof' else variables.mc_calib
calibration_mc_tof = source / f_bowl
if calibration_mode == 'tof':
variables.dld_t_calib = calibration_mc_tof
else:
variables.mc_calib = calibration_mc_tof
return parameters, n_used
def _joint_feature_matrix(voltage_values, x_values, y_values, voltage_center, voltage_scale, spatial_scale):
"""Build a smooth joint voltage-plus-detector feature matrix."""
v = (np.asarray(voltage_values, dtype=float) - float(voltage_center)) / float(voltage_scale)
x = np.asarray(x_values, dtype=float) / float(spatial_scale)
y = np.asarray(y_values, dtype=float) / float(spatial_scale)
r2 = x ** 2 + y ** 2
return np.column_stack([
np.ones(len(v), dtype=float),
v,
v ** 2,
x,
y,
x ** 2,
y ** 2,
x * y,
r2,
v * r2,
v * x,
v * y,
])
def _robust_joint_linear_fit(feature_matrix, target):
"""Fit a smooth joint correction surface while rejecting large residual outliers."""
x_data = np.asarray(feature_matrix, dtype=float)
y_data = np.asarray(target, dtype=float)
params, *_ = np.linalg.lstsq(x_data, y_data, rcond=None)
for _ in range(3):
residual = y_data - x_data @ params
median = float(np.median(residual))
mad = float(np.median(np.abs(residual - median)))
scale = max(1.4826 * mad, np.finfo(float).eps)
good = np.abs(residual - median) <= 3.5 * scale
if np.count_nonzero(good) < x_data.shape[1]:
break
params, *_ = np.linalg.lstsq(x_data[good], y_data[good], rcond=None)
return params
[docs]
def joint_voltage_bowl_corr_main(
dld_x,
dld_y,
dld_highVoltage,
variables,
det_diam,
calibration_mode='mc',
sample_size=9,
bin_size=0.01,
n_peaks=4,
prominence=100,
distance=500,
sampling_mode='polar',
):
"""Fit a smooth combined voltage-plus-detector correction surface from several peaks."""
ensure_choice(calibration_mode, field_name="calibration_mode", allowed=["tof", "mc"])
calib_arr = variables.get_calibration_array(calibration_mode)
sampling_mode = _resolve_sampling_mode(sampling_mode, variables)
dld_x_mm = np.asarray(dld_x, dtype=float) * 10
dld_y_mm = np.asarray(dld_y, dtype=float) * 10
dld_highVoltage = np.asarray(dld_highVoltage, dtype=float)
ensure_matching_lengths(
dld_x_mm,
dld_y_mm,
dld_highVoltage,
calib_arr,
field_names=("dld_x", "dld_y", "dld_highVoltage", f"{calibration_mode}_calibration_values"),
)
sample_size = int(ensure_positive(sample_size, field_name="sample_size"))
bin_size = ensure_positive(bin_size, field_name="bin_size")
detected = _auto_detect_peaks(
calib_arr,
n_peaks=n_peaks,
prominence=prominence,
distance=distance,
hist_bin_size=bin_size,
)
all_x_samples = []
all_y_samples = []
all_v_samples = []
all_t_normalized = []
peak_info = []
for peak in detected:
mask = (calib_arr > peak['x1']) & (calib_arr < peak['x2'])
peak_t = calib_arr[mask]
peak_x = dld_x_mm[mask]
peak_y = dld_y_mm[mask]
peak_v = dld_highVoltage[mask]
if len(peak_t) < 100:
continue
maximum_location = _resolve_peak_location(peak_t, 'histogram', bin_size)
if maximum_location <= 0 or not np.isfinite(maximum_location):
continue
peak_samples = _collect_spatial_samples(
peak_x,
peak_y,
peak_t,
maximum_location,
'mean',
sample_size,
bin_size,
sampling_mode=sampling_mode,
det_diam=det_diam,
dld_v=peak_v,
)
n_cells = len(peak_samples['x'])
if n_cells == 0:
continue
all_x_samples.extend(peak_samples['x'].tolist())
all_y_samples.extend(peak_samples['y'].tolist())
all_v_samples.extend(peak_samples['v'].tolist())
all_t_normalized.extend(peak_samples['t'].tolist())
if n_cells > 0:
peak_info.append({
'position': peak['position'],
'x1': peak['x1'],
'x2': peak['x2'],
'n_ions': int(np.sum(mask)),
'n_cells': int(n_cells),
})
if len(all_t_normalized) < 20:
raise CalibrationInputError(
"Not enough samples from multi-peak detection for joint voltage/bowl refinement"
)
v_arr = np.asarray(all_v_samples, dtype=float)
x_arr = np.asarray(all_x_samples, dtype=float)
y_arr = np.asarray(all_y_samples, dtype=float)
t_arr = np.asarray(all_t_normalized, dtype=float)
voltage_center = float(np.median(v_arr))
voltage_scale = max(float(np.std(v_arr)), np.finfo(float).eps)
spatial_scale = max(
float(np.nanmax(np.sqrt(x_arr ** 2 + y_arr ** 2))) if len(x_arr) else 0.0,
float(det_diam) / 2.0,
1.0,
)
feature_matrix = _joint_feature_matrix(v_arr, x_arr, y_arr, voltage_center, voltage_scale, spatial_scale)
parameters = _robust_joint_linear_fit(feature_matrix, t_arr)
if not np.all(np.isfinite(parameters)):
raise CalibrationInputError("Joint voltage/bowl fit returned invalid parameters")
all_features = _joint_feature_matrix(
dld_highVoltage,
dld_x_mm,
dld_y_mm,
voltage_center,
voltage_scale,
spatial_scale,
)
correction = np.clip(all_features @ parameters, np.finfo(float).eps, None)
source = variables.dld_t_calib if calibration_mode == 'tof' else variables.mc_calib
corrected = source / correction
if calibration_mode == 'tof':
variables.dld_t_calib = corrected
else:
variables.mc_calib = corrected
model = {
'parameters': [float(value) for value in parameters],
'feature_names': ['bias', 'v', 'v2', 'x', 'y', 'x2', 'y2', 'xy', 'r2', 'v_r2', 'v_x', 'v_y'],
'voltage_center': voltage_center,
'voltage_scale': voltage_scale,
'spatial_scale': spatial_scale,
'peaks_used': len(peak_info),
}
return model, peak_info