Source code for pyccapt.calibration.core.tools

import matplotlib.pyplot as plt
import numpy as np
import pybaselines
import re
from adjustText import adjust_text

from pybaselines import Baseline
from scipy.optimize import curve_fit
from scipy.signal import find_peaks, peak_widths

from pyccapt.calibration.core import interactive_point_identification
from pyccapt.calibration.core.background import fit_background
from pyccapt.calibration.core.exceptions import CalibrationInputError
from pyccapt.calibration.core.validation import ensure_positive
from pyccapt.calibration.data_tools import data_loadcrop, plot_vline_draw, selectors_data
from pyccapt.calibration.path_utils import save_figure

def _normalize_range_colors(values):
    """Normalize stored range colors for matplotlib usage."""
    normalized = []
    for value in values:
        value = str(value).strip()
        if value and not value.startswith('#') and re.fullmatch(r'[A-Fa-f0-9]{6}', value):
            value = f'#{value}'
        normalized.append(value)
    return normalized

def _plain_range_label(value):
    """Convert stored ion/range labels into plain text safe for matplotlib."""
    text = str(value).strip()
    if not text:
        return text
    text = text.replace("$", "")
    text = re.sub(r"_\{([^}]*)\}", r"\1", text)
    text = re.sub(r"\^\{([^}]*)\}", r" \1", text)
    text = text.replace("{", "").replace("}", "")
    text = text.replace("^", "").strip()
    return text

def _resolve_range_display_labels(range_data):
    """Return plain-text labels for ranged overlays and legends."""
    for column in ("name", "ion_name", "ion"):
        if column in range_data.columns:
            labels = [_plain_range_label(value) for value in range_data[column].tolist()]
            if any(label for label in labels):
                return labels
    return [_plain_range_label(value) for value in range(len(range_data))]

[docs] def hist_plot( mc_tof, variables, bin, label, range_data=None, adjust_label=False, ranging=False, hist_color_range=False, log=True, mode='count', percent=50, peaks_find=True, peaks_find_plot=False, plot=False, prominence=50, distance=None, h_line=False, selector='None', fast_hist=True, fig_name=None, text_loc='right', fig_size=(9, 5), background={'calculation': False}, ): """ Generate a histogram plot with optional peak_x finding and background calculation. Args: mc_tof (array-like): Input array of time-of-flight values. bin (float): Bin width for the histogram. label (str): Label type ('mc' or 'tof'). range_data (optional, array-like): Range data. adjust_label (bool): Flag to adjust overlapping peak_x labels. ranging (bool): Flag to enable ranging. hist_color_range (bool): Flag to enable histogram color ranging. log (bool): Flag to enable logarithmic y-axis scale. mode (str): Mode for histogram calculation ('count' or 'normalised'). percent (int): Percentage value for peak_x width calculation. peaks_find (bool): Flag to enable peak_x finding. peaks_find_plot (bool): Flag to plot peak_x finding results. plot (bool): Flag to enable plotting. prominence (float): Minimum prominence value for peak_x finding. distance (optional, float): Minimum horizontal distance between peaks for peak_x finding. h_line (bool): Flag to draw horizontal lines for peak_x width. selector (str): Selector mode for interactive selection ('None', 'rect', or 'peak_x'). fast_hist (bool): Flag to enable fast histogram calculation. fig_name (optional, str): Name of the figure file to save. text_loc (str): Location of the text annotation ('left' or 'right'). fig_size (tuple): Size of the figure. background (dict): Background calculation options. Returns: tuple: Tuple containing x_peaks, y_peaks, peaks_widths, and mask. Raises: ValueError: If an invalid mode or selector is provided. """ mc_tof = np.asarray(mc_tof) if mc_tof.size == 0: raise CalibrationInputError("mc_tof cannot be empty") bin = ensure_positive(bin, field_name="bin") bins = np.linspace(np.min(mc_tof), np.max(mc_tof), round(np.max(mc_tof) / bin)) if fast_hist: steps = 'stepfilled' else: steps = 'bar' if mode == 'count': y, x = np.histogram(mc_tof, bins=bins) # y = np.log(y) elif mode == 'normalised': # calculate as counts/(Da * totalCts) so that mass spectra with different # count numbers are comparable mc_tof = (mc_tof / bin) / len(mc_tof) # y, x = np.histogram(mc_tof, bins=bins) y, x = np.histogram(mc_tof, bins=bins) # y = np.log(y) # med = median(y); else: raise CalibrationInputError("Unsupported histogram mode: %r" % mode) try: if peaks_find: peaks, properties = find_peaks(y, prominence=prominence, distance=distance, height=0) index_peak_max = np.argmax(properties['peak_heights']) # find peak_x width peak_widths_p = peak_widths(y, peaks, rel_height=(percent / 100), prominence_data=None) except ValueError: print('Peak finding failed.') peaks_find = False if plot: fig1, ax1 = plt.subplots(figsize=fig_size) if ranging and hist_color_range: colors = _normalize_range_colors(range_data['color'].tolist()) mc_low = range_data['mc_low'].tolist() mc_up = range_data['mc_up'].tolist() labels = _resolve_range_display_labels(range_data) mask_all = np.full(len(mc_tof), False) # Loop runs ``len(labels) + 1`` times: one pass per ranged # species, then a final pass that draws the leftover (unranged) # events as ``mc_tof[~mask_all]``. The previous code compared # ``i`` to a non-existent ``len(ion)`` and recomputed # ``mask_all`` from a stale ``mask`` — both bugs (the former # raised NameError; the latter was a no-op since every per-label # iteration already ORed its mask into ``mask_all``). for i in range(len(labels) + 1): if i < len(labels): mask = np.logical_and((mc_tof < mc_up[i]), mc_tof > mc_low[i]) mask_all = np.logical_or(mask_all, mask) name_element = 'unranged' if labels[i] == 'unranged' else labels[i] y, x, _ = plt.hist( mc_tof[mask], bins=bins, log=log, histtype=steps, color=colors[i], label=name_element, ) else: # Final iteration: i == len(labels). Plot the events not # captured by any range as a slate-gray "unranged" baseline. y, x, _ = plt.hist( mc_tof[~mask_all], bins=bins, log=log, histtype=steps, color='slategray', ) else: y, x, _ = plt.hist(mc_tof, bins=bins, log=log, histtype=steps, color='slategray') # calculate the background if background['calculation']: if background['mode'] == 'aspls': baseline_fitter = Baseline(x_data=bins[:-1]) fit_1, params_1 = baseline_fitter.aspls(y, lam=5e10, tol=1e-1, max_iter=100) if background['mode'] == 'fabc': fit_2, params_2 = pybaselines.classification.fabc( y, lam=background['lam'], num_std=background['num_std'], pad_kwargs='edges' ) if background['mode'] == 'dietrich': fit_2, params_2 = pybaselines.classification.dietrich(y, num_std=background['num_std']) if background['mode'] == 'cwt_br': fit_2, params_2 = pybaselines.classification.cwt_br( y, poly_order=background['poly_order'], num_std=background['num_std'], tol=background['tol'] ) if background['mode'] == 'selective_mask_t': p = np.poly1d(np.polyfit(background['non_mask'][:, 0], background['non_mask'][:, 1], 5)) baseline_handle = ax1.plot(x, p(x), '--') if background['mode'] == 'selective_mask_mc': fitresult, _ = curve_fit(fit_background, background['non_mask'][:, 0], background['non_mask'][:, 1]) yy = fit_background(x, *fitresult) ax1.plot(x, yy, '--') if background['plot_no_back']: mask_2 = params_2['mask'] mask_f = np.full((len(mc_tof)), False) for i in range(len(mask_2)): if mask_2[i]: step_loc = np.min(mc_tof) + bin * i mask_t = np.logical_and((mc_tof < step_loc + bin), (mc_tof > step_loc)) mask_f = np.logical_or(mask_f, mask_t) background_ppm = (len(mask_f[mask_f == True]) * 1e6 / len(mask_f)) / np.max(mc_tof) if background['plot_no_back']: if background['plot']: ax1.plot(bins[:-1], fit_2, label='class', color='r') ax3 = ax1.twiny() ax3.axis("off") ax3.plot(fit_1, label='aspls', color='black') mask_2 = params_2['mask'] if background['patch']: ax1.plot(bins[:-1][mask_2], y[mask_2], 'o', color='orange')[0] if peaks_find: ax1.set_ylabel("Event Counts", fontsize=14) if label == 'mc': ax1.set_xlabel("Mass/Charge [Da]", fontsize=14) elif label == 'tof': ax1.set_xlabel("Time of Flight [ns]", fontsize=14) print("The peak_x index for MRP calculation is:", index_peak_max) if label == 'mc': mrp = '{:.2f}'.format( x[peaks[index_peak_max]] / (x[int(peak_widths_p[3][index_peak_max])] - x[int(peak_widths_p[2][index_peak_max])]) ) if background['calculation'] and background['plot_no_back']: txt = 'bin width: %s Da\nnum atoms: %.2f$e^6$\nbackG: %s ppm/Da\nMRP(FWHM): %s' % ( bin, len(mc_tof) / 1000000, int(background_ppm), mrp, ) else: # annotation with range stats upperLim = 4.5 # Da lowerLim = 3.5 # Da mask = np.logical_and((x >= lowerLim), (x <= upperLim)) BG4 = np.sum(y[np.array(mask[:-1])]) / (upperLim - lowerLim) BG4 = BG4 / len(mc_tof) * 1e6 txt = 'bin width: %s Da\nnum atoms: %.2f$e^6$\nBG@4: %s ppm/Da\nMRP(FWHM): %s' % ( bin, (len(mc_tof) / 1000000), int(BG4), mrp, ) elif label == 'tof': mrp = '{:.2f}'.format( x[peaks[index_peak_max]] / (x[int(peak_widths_p[3][index_peak_max])] - x[int(peak_widths_p[2][index_peak_max])]) ) if background['calculation'] and background['plot_no_back']: txt = 'bin width: %s ns\nnum atoms: %.2f$e^6$\nbackG: %s ppm/ns\nMRP(FWHM): %s' % ( bin, len(mc_tof) / 1000000, int(background_ppm), mrp, ) else: # annotation with range stats upperLim = 50.5 # ns lowerLim = 49.5 # ns mask = np.logical_and((x >= lowerLim), (x <= upperLim)) BG50 = np.sum(y[np.array(mask[:-1])]) / (upperLim - lowerLim) BG50 = BG50 / len(mc_tof) * 1e6 txt = 'bin width: %s ns\nnum atoms: %.2f$e^6$ \nBG@50: %s ppm/ns\nMRP(FWHM): %s' % ( bin, len(mc_tof) / 1000000, int(BG50), mrp, ) props = dict(boxstyle='round', facecolor='wheat', alpha=1) if text_loc == 'left': ax1.text( 0.01, 0.95, txt, va='top', ma='left', transform=ax1.transAxes, bbox=props, fontsize=10, alpha=1, horizontalalignment='left', verticalalignment='top', ) elif text_loc == 'right': ax1.text( 0.98, 0.95, txt, va='top', ma='left', transform=ax1.transAxes, bbox=props, fontsize=10, alpha=1, horizontalalignment='right', verticalalignment='top', ) ax1.tick_params(axis='both', which='major', labelsize=10) ax1.tick_params(axis='both', which='minor', labelsize=10) annotes = [] texts = [] if peaks_find_plot: if ranging: ion = range_data['ion'].tolist() x_peak_loc = range_data['mc'].tolist() y_peak_loc = range_data['peak_count'].tolist() for i in range(len(ion)): texts.append(plt.text(x_peak_loc[i], y_peak_loc[i], r'%s' % ion[i], color='black', size=10, alpha=1)) annotes.append(str(i + 1)) else: for i in range(len(peaks)): if selector == 'range': if i in variables.peaks_x_selected: texts.append( plt.text( x[peaks][i], y[peaks][i], '%s' % '{:.2f}'.format(x[peaks][i]), color='black', size=10, alpha=1, ) ) else: texts.append( plt.text( x[peaks][i], y[peaks][i], '%s' % '{:.2f}'.format(x[peaks][i]), color='black', size=10, alpha=1, ) ) if h_line: for i in range(len(variables.h_line_pos)): if ( variables.h_line_pos[i] < np.max(mc_tof) + 10 and variables.h_line_pos[i] > np.max(mc_tof) - 10 ): plt.axvline(x=variables.h_line_pos[i], color='b', linestyle='--', linewidth=2) annotes.append(str(i + 1)) if adjust_label: adjust_text(texts, arrowprops=dict(arrowstyle='-', color='red', lw=0.5)) if selector == 'rect': # Connect and initialize rectangle box selector data_loadcrop.rectangle_box_selector(ax1, variables) plt.connect('key_press_event', selectors_data.toggle_selector(variables)) elif selector == 'peak_x': # connect peak_x selector af = interactive_point_identification.AnnoteFinder(x[peaks], y[peaks], annotes, variables, ax=ax1) fig1.canvas.mpl_connect('button_press_event', lambda event: af.annotates_plotter(event)) zoom_manager = plot_vline_draw.HorizontalZoom(ax1, fig1) fig1.canvas.mpl_connect('key_press_event', lambda event: zoom_manager.on_key_press(event)) fig1.canvas.mpl_connect('key_release_event', lambda event: zoom_manager.on_key_release(event)) fig1.canvas.mpl_connect('scroll_event', lambda event: zoom_manager.on_scroll(event)) elif selector == 'range': # connect range selector line_manager = plot_vline_draw.VerticalLineManager(variables, ax1, fig1, [], []) fig1.canvas.mpl_connect('button_press_event', lambda event: line_manager.on_press(event)) fig1.canvas.mpl_connect('button_release_event', lambda event: line_manager.on_release(event)) fig1.canvas.mpl_connect('motion_notify_event', lambda event: line_manager.on_motion(event)) fig1.canvas.mpl_connect('key_press_event', lambda event: line_manager.on_key_press(event)) fig1.canvas.mpl_connect('scroll_event', lambda event: line_manager.on_scroll(event)) fig1.canvas.mpl_connect('key_release_event', lambda event: line_manager.on_key_release(event)) else: if selector == 'range': # connect range selector line_manager = plot_vline_draw.VerticalLineManager(variables, ax1, fig1, [], []) texts = [] for i in range(len(variables.peak_x)): if i in variables.peaks_x_selected: texts.append( plt.text( variables.peak_x[i], variables.peak_y[i], '%s' % '{:.2f}'.format(variables.peak_x[i]), color='black', size=10, alpha=1, ) ) plt.tight_layout() if fig_name is not None: if label == 'mc': save_figure(fig1, directory=variables.result_path, stem=f"mc_{fig_name}", formats=("pdf", "png"), dpi=300) elif label == 'tof': save_figure( fig1, directory=variables.result_path, stem=f"tof_{fig_name}", formats=("pdf", "png"), dpi=300, ) if ranging and hist_color_range: plt.legend(loc='center right') plt.show() if peaks_find: peak_widths_f = [] for i in range(len(peaks)): peak_widths_f.append([y[int(peak_widths_p[2][i])], x[int(peak_widths_p[2][i])], x[int(peak_widths_p[3][i])]]) if background['calculation'] and background['plot_no_back']: x_peaks = x[peaks] y_peaks = y[peaks] peaks_widths = peak_widths_f mask = mask_f else: x_peaks = x[peaks] y_peaks = y[peaks] peaks_widths = peak_widths_f mask = None index_max_ini = np.argmax(y_peaks) variables.max_peak = x_peaks[index_max_ini] variables.peak_x = x_peaks variables.peak_y = y_peaks else: x_peaks = None y_peaks = None peaks_widths = None mask = None return x_peaks, y_peaks, peaks_widths, mask
[docs] def mc_hist_plot(variables, bin_size, mode, prominence, distance, percent, selector, plot, figname, lim, peaks_find_plot): """ Plot the mass spectrum or tof spectrum. It is helper function for tutorials. Args: variables (object): Variables object. bin_size (float): Bin size for the histogram. mode (str): 'mc' for mass spectrum or 'tof' for tof spectrum. prominence (float): Prominence for the peak_x finding. distance (float): Distance for the peak_x finding. percent (float): Percent for the peak_x finding. selector (str): Selector for the peak_x finding. plot (bool): Plot the histogram. figname (str): Figure name. lim (float): Limit for the histogram. peaks_find_plot (bool): Plot the peaks. Returns: None """ mode_map = { 'mc': (variables.mc_calib, 'mc'), 'mc_c': (variables.mc_uc, 'mc'), 'tof': (variables.dld_t_calib, 'tof'), 'tof_c': (variables.dld_t_c, 'tof'), } if mode not in mode_map: raise CalibrationInputError(f"Unsupported mode: {mode!r}") hist, label = mode_map[mode] if selector == 'peak_x': variables.peaks_x_selected = [] peaks_ini, peaks_y_ini, peak_widths_p_ini, _ = hist_plot( hist[hist < lim], variables, bin_size, label=label, distance=distance, percent=percent, prominence=prominence, selector=selector, plot=plot, fig_name=figname, peaks_find_plot=peaks_find_plot, ) if peaks_ini is not None: index_max_ini = np.argmax(peaks_y_ini) mrp = peaks_ini[index_max_ini] / (peak_widths_p_ini[index_max_ini][2] - peak_widths_p_ini[index_max_ini][1]) print('Mass resolving power for the highest peak_x at peak_x index %a (MRP --> m/m_2-m_1):' % index_max_ini, mrp) for i in range(len(peaks_ini)): print( 'Peaks ', i, 'is at location and height: ({:.2f}, {:.2f})'.format(peaks_ini[i], peaks_y_ini[i]), 'peak_x window sides ({:.1f}-maximum) are: ({:.2f}, {:.2f})'.format( percent, peak_widths_p_ini[i][1], peak_widths_p_ini[i][2] ), '-> {:.2f}'.format(peak_widths_p_ini[i][2] - peak_widths_p_ini[i][1]), )