Source code for pyccapt.calibration.reconstructions.sdm

from copy import copy
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib import colors
from matplotlib import cm
from scipy.signal import find_peaks

from pyccapt.calibration.data_tools.merge_range import merge_by_range
from pyccapt.calibration.reconstructions.io_utils import save_matplotlib_figure

[docs] def sdm( particles, bin_size, variables=None, roi=[0, 0, 0.5], z_cut=True, normalize=False, plot_mode='bar', plot=False, save=False, figure_size=(6, 6), figname='sdm', histogram_type='1d', axes=None, i_composition=None, j_composition=None, plot_roi=False, theta_x=0, phi_y=0, log=False, frac=1.0, range_sequence=[], range_mc=[], range_detx=[], range_dety=[], range_x=[], range_y=[], range_z=[], range_vol=[], ): """ Computes 1D or 2D histograms for a set of particle coordinates. Parameters ---------- particles : (N, 3) np.array Set of particle coordinates for which to compute the SDM. bin_size : float Bin size for each histogram. variables : variables object normalize : bool, optional Option to normalize the histograms. If True, the histogram values are normalized. roi : list, optional Region of interest for the SDM. Default is [0, 0, 1]. z_cut : bool, optional Cut the z distances over 1 nm plot_mode : str, optional The plot mode for the histograms. Options are 'bar' or 'line'. plot : bool, optional Option to plot the histograms. If True, the histograms are plotted. save : bool, optional Option to save the histograms. If True, the histograms are saved. figure_size : (float, float), optional The size of the figure in inches. figname : str, optional The name of the figure. histogram_type : str, optional Type of histogram. Options are '1D' or '2D' or '3D'. axes : list or None, optional Specifies the axes for 1D or 2D histograms. For '1d', provide a list like ['x'], ['y'], or ['z']. For '2d', provide a list like ['x', 'y'], ['y', 'z'], or ['x', 'z'] or ['x', 'y', 'z']. i_composition : list, optional Composition of the first element in the SDM. j_composition : list, optional Composition of the second element in the SDM. plot_roi : bool, optional Option to plot the region of interest. If True, the region of interest is plotted. theta_x : float, optional Rotation angle around the x-axis. phi_y : float, optional Rotation angle around the y-axis. log : bool, optional Option to plot the SDM in log scale. If True, the SDM is plotted in log scale. frac : float, optional Fraction of the second element in the SDM. range_sequence : list, optional Sequence range for the SDM. range_mc : list, optional Mass-to-charge range for the SDM. range_detx : list, optional Detector x-coordinate range for the SDM. range_dety : list, optional Detector y-coordinate range for the SDM. range_x : list, optional X-coordinate range for the SDM. range_y : list, optional Y-coordinate range for the SDM. range_z : list, optional Z-coordinate range for the SDM. range_vol : list, optional Volume range for the SDM. Returns ------- histograms : list of np.array List of 1D or 2D histograms based on user preferences. edges : list of np.array Bin edges for each histogram. """ if range_sequence or range_mc or range_detx or range_dety or range_x or range_y or range_z or range_vol: if range_sequence: if range_sequence: mask_sequence = np.zeros(len(particles), dtype=bool) if range_sequence[0] < 1 and range_sequence[1] < 1: mask_sequence[int(len(particles) * range_sequence[0]) : int(len(particles) * range_sequence[1])] = True else: mask_sequence[range_sequence[0] : range_sequence[1]] = True else: mask_sequence = np.zeros(len(particles), dtype=bool) mask_sequence[: int(len(particles) * range_sequence)] = True else: mask_sequence = np.ones(len(particles), dtype=bool) if range_detx and range_dety: mask_det_x = (variables.dld_x_det < range_detx[1]) & (variables.dld_x_det > range_detx[0]) mask_det_y = (variables.dld_y_det < range_dety[1]) & (variables.dld_y_det > range_dety[0]) mask_det = mask_det_x & mask_det_y else: mask_det = np.ones(len(particles), dtype=bool) if range_mc: mask_mc = (variables.mc <= range_mc[1]) & (variables.mc >= range_mc[0]) else: mask_mc = np.ones(len(particles), dtype=bool) if range_x and range_y and range_z: mask_x = (variables.x < range_x[1]) & (variables.x > range_x[0]) mask_y = (variables.y < range_y[1]) & (variables.y > range_y[0]) mask_z = (variables.z < range_z[1]) & (variables.z > range_z[0]) mask_3d = mask_x & mask_y & mask_z else: mask_3d = np.ones(len(particles), dtype=bool) if range_vol: mask_vol = (variables.dld_high_voltage < range_vol[1]) & (variables.dld_high_voltage > range_vol[0]) else: mask_vol = np.ones(len(particles), dtype=bool) mask = mask_sequence & mask_det & mask_mc & mask_3d & mask_vol if variables is not None: variables.mask = mask print('The number of data sequence:', len(mask_sequence[mask_sequence == True])) print('The number of data mc:', len(mask_mc[mask_mc == True])) print('The number of data det:', len(mask_det[mask_det == True])) print('The number of data 3d:', len(mask_3d[mask_3d == True])) print('The number of data vol:', len(mask_vol[mask_vol == True])) else: mask = np.ones(len(particles), dtype=bool) len_true_mask = len(mask[mask == True]) if frac < 1: # set axis limits based on fraction of x and y data based on fraction true_indices = np.where(mask)[0] num_set_to_flase = int(len(true_indices) * (1 - frac)) indices_to_set_false = np.random.choice(true_indices, num_set_to_flase, replace=False) mask[indices_to_set_false] = False print('The number of data cropping due to fraction:', len_true_mask - len(mask[mask == True])) if variables is not None: dist_temp = np.sqrt((variables.dld_x_det - roi[0]) ** 2 + (variables.dld_y_det - roi[1]) ** 2) mask_roi = dist_temp <= roi[2] else: print('Variables is None. ROI selection needs detector coordinates. Only use radius') dist_temp = np.sqrt((particles[:, 0]) ** 2 + (particles[:, 1]) ** 2) mask_roi = dist_temp <= roi[2] print('The number of data ROI:', len(mask_roi[mask_roi == True])) mask = mask & mask_roi print('The number of data after cropping:', len(mask[mask == True])) if variables is not None and i_composition and j_composition: if i_composition and isinstance(i_composition, list): if 'name' in variables.data.columns: data = variables.data else: if variables.range_data is None: raise ValueError('Range data is not provided') data = merge_by_range(variables.data, variables.range_data, full=True) mask_i_comp = np.zeros(len(particles), dtype=bool) # Create a mask from the composition list of variables.data for comp in i_composition: mask_i_comp = mask_i_comp | data['name'].apply(lambda x: comp in str(x) if not pd.isna(x) else False) else: raise ValueError('No list of i composition is provided') if j_composition and isinstance(j_composition, list): if 'name' in variables.data.columns: pass else: if variables.range_data is None: raise ValueError('Range data is not provided') data = merge_by_range(variables.data, variables.range_data, full=True) mask_j_comp = np.zeros(len(particles), dtype=bool) # Create a mask from the composition list of variables.data for comp in j_composition: mask_j_comp = mask_j_comp | data['name'].apply(lambda x: comp in str(x) if not pd.isna(x) else False) else: raise ValueError('No list of j composition is provided') if variables is not None: if i_composition and isinstance(i_composition, list) and j_composition and isinstance(j_composition, list): mask_i = mask & mask_i_comp mask_j = mask & mask_j_comp else: mask_i = mask mask_j = mask else: mask_i = mask mask_j = mask particles_backup = particles.copy() theta = np.radians(theta_x) phi = np.radians(phi_y) print('The number of ions in is:', len(particles[mask])) print('The number of ions in i composition is:', len(particles[mask_i])) print('The number of ions in j composition is:', len(particles[mask_j])) # Calculate relative positions based on user choices dx, dy, dz = None, None, None histograms = [] if theta_x == 0 and phi_y == 0: if 'x' in axes and 'y' in axes and 'z' in axes: dx = np.subtract.outer(particles[:, 0][mask_i], particles[:, 0][mask_j]) dy = np.subtract.outer(particles[:, 1][mask_i], particles[:, 1][mask_j]) dz = np.subtract.outer(particles[:, 2][mask_i], particles[:, 2][mask_j]) elif 'x' in axes and 'y' in axes: dx = np.subtract.outer(particles[:, 0][mask_i], particles[:, 0][mask_j]) dy = np.subtract.outer(particles[:, 1][mask_i], particles[:, 1][mask_j]) elif 'x' in axes and 'z' in axes: dx = np.subtract.outer(particles[:, 0][mask_i], particles[:, 0][mask_j]) dz = np.subtract.outer(particles[:, 2][mask_i], particles[:, 2][mask_j]) elif 'y' in axes and 'z' in axes: dy = np.subtract.outer(particles[:, 1][mask_i], particles[:, 1][mask_j]) dz = np.subtract.outer(particles[:, 2][mask_i], particles[:, 2][mask_j]) elif 'x' in axes: dx = np.subtract.outer(particles[:, 0][mask_i], particles[:, 0][mask_j]) elif 'y' in axes: dy = np.subtract.outer(particles[:, 1][mask_i], particles[:, 1][mask_j]) elif 'z' in axes: dz = np.subtract.outer(particles[:, 2][mask_i], particles[:, 2][mask_j]) else: particles_i_masked = particles[mask_i] particles_j_masked = particles[mask_j] shift = np.empty((len(particles_i_masked), len(particles_j_masked), 3), dtype=np.result_type(particles, particles)) for i in range(len(particles_i_masked)): delta = particles_i_masked[i, :] - particles[mask_j] shift[i, :, 0] = ( np.cos(theta) * delta[:, 0] + np.sin(theta) * np.sin(phi) * delta[:, 1] + np.sin(theta) * np.cos(phi) * delta[:, 2] ) shift[i, :, 1] = np.cos(phi) * delta[:, 1] - np.sin(phi) * delta[:, 2] shift[i, :, 2] = ( -np.sin(theta) * delta[:, 0] + np.cos(theta) * np.sin(phi) * delta[:, 1] + np.cos(theta) * np.cos(phi) * delta[:, 2] ) if 'x' in axes: dx = shift[:, :, 0] if 'y' in axes: dy = shift[:, :, 1] if 'z' in axes: dz = shift[:, :, 2] edges_list = [] def _symmetric_edges(*arrays): finite_arrays = [np.asarray(arr, dtype=float).ravel() for arr in arrays if arr is not None] finite_arrays = [arr[np.isfinite(arr)] for arr in finite_arrays if arr.size > 0] if not finite_arrays: raise ValueError('No valid SDM distances are available to build histogram edges') max_abs = max(float(np.max(np.abs(arr))) for arr in finite_arrays) max_abs = max(max_abs, float(bin_size)) return np.arange(-max_abs, max_abs + bin_size, bin_size) if 'x' in axes and 'y' in axes and 'z' in axes: dx = dx.flatten() dy = dy.flatten() dz = dz.flatten() mask = ((dx <= 1) & (dx >= -1)) & ((dy <= 1) & (dy >= -1)) if z_cut: mask = mask & ((dz <= 1) & (dz >= -1)) dx = dx[mask] dy = dy[mask] dz = dz[mask] edges = _symmetric_edges(dx, dy, dz) elif 'x' in axes and 'y' in axes: mask = ((dx <= 1) & (dx >= -1)) & ((dy <= 1) & (dy >= -1)) dx = dx[mask] dy = dy[mask] edges = _symmetric_edges(dx, dy) elif 'y' in axes and 'z' in axes: dy = dy.flatten() dz = dz.flatten() mask = (dy <= 1) & (dy >= -1) if z_cut: mask = mask & ((dz <= 1) & (dz >= -1)) dz = dz[mask] dy = dy[mask] edges = _symmetric_edges(dy, dz) elif 'x' in axes and 'z' in axes: dx = dx.flatten() dz = dz.flatten() mask = (dx <= 1) & (dx >= -1) if z_cut: mask = mask & ((dz <= 1) & (dz >= -1)) dx = dx[mask] dz = dz[mask] edges = _symmetric_edges(dx, dz) elif 'x' in axes: dx = dx.flatten() mask = (dx <= 1) & (dx >= -1) dx = dx[mask] edges = _symmetric_edges(dx) elif 'y' in axes: dy = dy.flatten() mask = (dy <= 1) & (dy >= -1) dy = dy[mask] edges = _symmetric_edges(dy) elif 'z' in axes: dz = dz.flatten() if z_cut: mask = (dz <= 1) & (dz >= -1) dz = dz[mask] edges = _symmetric_edges(dz) if histogram_type == '1D': if 'x' in axes: dx = dx[dx != 0] histo_dx, bins_dx = np.histogram(dx, bins=edges) histograms.append(histo_dx) edges_list.append(bins_dx) elif 'y' in axes: dy = dy[dy != 0] histo_dy, bins_dy = np.histogram(dy, bins=edges) histograms.append(histo_dy) edges_list.append(bins_dy) elif 'z' in axes: dz = dz[dz != 0] histo_dz, bins_dz = np.histogram(dz, bins=edges) histograms.append(histo_dz) edges_list.append(bins_dz) else: raise ValueError("Invalid axes for 1D histogram. Choose from ['x'], ['y'], or ['z'].") if normalize: histograms[-1] = histograms[-1] / np.max(histograms[-1]) if histogram_type == '2D': if 'x' in axes and 'y' in axes: mask = ~((dx == 0) & (dy == 0)) dx = dx[mask] dy = dy[mask] hist2d, x_edges, y_edges = np.histogram2d(dx, dy, bins=[edges, edges]) extent = [x_edges[0], x_edges[-1], y_edges[0], y_edges[-1]] histograms.append(hist2d) edges_list.extend([x_edges, y_edges]) elif 'y' in axes and 'z' in axes: mask = ~((dy == 0) & (dz == 0)) dy = dy[mask] dz = dz[mask] hist2d, x_edges, y_edges = np.histogram2d(dy, dz, bins=[edges, edges]) extent = [x_edges[0], x_edges[-1], y_edges[0], y_edges[-1]] histograms.append(hist2d) edges_list.extend([x_edges, y_edges]) elif 'x' in axes and 'z' in axes: mask = ~((dx == 0) & (dz == 0)) dx = dx[mask] dz = dz[mask] hist2d, x_edges, y_edges = np.histogram2d(dx, dz, bins=[edges, edges]) extent = [x_edges[0], x_edges[-1], y_edges[0], y_edges[-1]] histograms.append(hist2d) edges_list.extend([x_edges, y_edges]) else: raise ValueError("Invalid axes for 2D histogram. Choose from ['x', 'y'], ['y', 'z'], or ['x', 'z'].") if normalize: histograms[-1] = histograms[-1] / np.max(histograms[-1]) if histogram_type == '3D': if 'x' in axes and 'y' in axes and 'z' in axes: mask = ~((dx == 0) & (dy == 0) & (dz == 0)) dx = dx[mask] dy = dy[mask] dz = dz[mask] hist3d, edges = np.histogramdd((dx, dy, dz), bins=[edges, edges, edges]) histograms.append(hist3d) edges_list.extend([edges]) if normalize: histograms[-1] = histograms[-1] / np.max(histograms[-1]) if plot or save: # Plot histograms if histogram_type == '1D': fig, ax = plt.subplots(figsize=figure_size) for i, hist in enumerate(histograms): if plot_mode == 'bar': ax.bar(edges[:-1], hist, width=bin_size, align='edge') ax.set_ylabel('Counts') ax.set_xlabel(f'{axes[i]} (nm)') elif plot_mode == 'line': ax.plot(edges[:-1], hist) ax.set_ylabel('Counts') ax.set_xlabel(f'{axes[i]} (nm)') if log: ax.set_yscale('log') try: # Detect peaks peaks, _ = find_peaks(hist, height=0) # Calculate distances between peaks distances = np.diff(peaks) # Horizontal distances (indices) for i in range(len(peaks) - 1): dx = edges[:-1] # Get coordinates for the peaks x1, x2 = dx[peaks[i]], dx[peaks[i + 1]] y1, y2 = hist[peaks[i]], hist[peaks[i + 1]] # Draw dashed line yy = max(y1, y2) plt.annotate('', xy=(x2, yy), xytext=(x1, yy), arrowprops=dict(arrowstyle='<->', color='blue', lw=1.5)) # Annotate with distance plt.text((x1 + x2) / 2, max(y1, y2) + 0.1, f'{x2 - x1:.2f}', color='blue', ha='center', va='bottom') except Exception as e: print('error:', e) print('No peaks found in the histogram') elif histogram_type == '2D': if figure_size[0] - figure_size[1] > 2: print('The figure size is not appropriate for 2D histogram') figure_size = (5, 4) print('The figure size is changed to:', figure_size) fig, ax = plt.subplots(figsize=figure_size) img = ax.imshow(histograms[-1].T, origin='lower', extent=extent, aspect="auto") ax.set_ylabel(f'{axes[1]} (nm)') ax.set_xlabel(f'{axes[0]} (nm)') cmap = copy(plt.cm.plasma) cmap.set_bad(cmap(0)) if log: pcm = ax.pcolormesh(x_edges, y_edges, histograms[-1].T, cmap=cmap, norm=colors.LogNorm(), rasterized=True) else: pcm = ax.pcolormesh(x_edges, y_edges, histograms[-1].T, cmap=cmap, rasterized=True) cbar = fig.colorbar(pcm, ax=ax, pad=0) cbar.set_label('Counts', fontsize=10) elif histogram_type == '3D': print('3D histogram is not supported yet.') if save and variables is not None: save_matplotlib_figure(fig, variables, stem=f"sdm_{figname}", formats=("png", "pdf"), dpi=600) if plot: plt.show() if plot_roi: x = particles_backup[:, 0] y = particles_backup[:, 1] edges_d = np.arange(min(np.min(x), np.min(y)), max(np.max(x), np.max(y)), 0.5) FDM, xedges, yedges = np.histogram2d(x, y, bins=(edges_d, edges_d)) if normalize: FDM = FDM / np.max(FDM) cmap_instance = copy(cm.get_cmap('plasma')) cmap_instance.set_bad(cmap_instance(0)) fig1, ax1 = plt.subplots(figsize=(5, 4)) if log and not normalize: FDM = np.log1p(FDM) pcm = ax1.pcolormesh(xedges, yedges, FDM.T, cmap=cmap_instance, norm=colors.LogNorm(), rasterized=True) else: pcm = ax1.pcolormesh(xedges, yedges, FDM.T, cmap=cmap_instance, rasterized=True) cbar = fig1.colorbar(pcm, ax=ax1, pad=0) cbar.set_label('Event Counts', fontsize=10) plt.xlabel('x (nm)') plt.ylabel('y (nm)') # draw a circle with radius roi[2] circle = plt.Circle((roi[0], roi[1]), roi[2], color='white', fill=False, linewidth=1.5) ax1.add_artist(circle) if save and variables is not None: save_matplotlib_figure( fig1, variables, stem=f"disparity_roi_{figname}", formats=("png", "pdf"), dpi=600, ) if plot: plt.show() return histograms, edges_list
# def sdm_background(res, limit): # """ # Performs iterative smoothing of a 1D array with a convergence condition. # # Parameters: # - res (np.ndarray): 2D array where the first column represents data points. # - limit (float): Convergence threshold for the deviation percentage. # # Returns: # - np0 (np.ndarray): The original input data after smoothing. # - dev (np.ndarray): Array of deviation values over each iteration. # """ # # Initialize arrays and variables # np0 = res[:, 0] # Original data points # np1 = np.zeros_like(np0) # Array for the new smoothed values # dev = np.zeros((1000, 3)) # Pre-allocate deviation array (size can be adjusted) # # # Find the index of the maximum value in the first column # id_center = np.argmax(np0) # # # Initialize deviation # dev[0, 0] = 0 # dev[0, 1] = np.inf # dev[0, 2] = np.inf # # i = 0 # while dev[i, 2] >= limit: # # Smoothing step: update np1 based on the average of neighboring values # for j in range(1, len(np0) - 1): # np1[j] = min(np0[j], (np0[j + 1] + np0[j - 1]) / 2) # # # Compute the deviation in the current iteration # dev[i + 1, 0] = i + 1 # dev[i + 1, 1] = res[id_center, 0] - np.max(np1) # Deviation from max value # dev[i + 1, 2] = np.abs(dev[i + 1, 1] - dev[i, 1]) * 100 / res[id_center, 0] # Relative change in deviation # # # Update the np0 array for the next iteration # np0 = np1.copy() # np1.fill(0) # Reset np1 for the next iteration # # i += 1 # # # Truncate dev array to the actual number of iterations # dev = dev[:i + 1, :] # # return np0, dev