"""Clustering helpers for calibrated APT datasets."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Sequence
import numpy as np
import plotly.graph_objects as go
from scipy.spatial import cKDTree
DEFAULT_CLUSTER_COLORS = (
"#EF553B",
"#00CC96",
"#636EFA",
"#AB63FA",
"#FFA15A",
)
SUPPORTED_CLUSTERING_METHODS = (
"min-max",
"maximum-separation",
"hdbscan",
"comp-seeded-support-hdbscan",
"composition-gmm-voxel",
"compspace-agnostic-seeded",
)
[docs]
@dataclass(frozen=True)
class MinMaxClusterResult:
"""Result of a clustering pass on a selected ion population."""
labels: np.ndarray
selected_mask: np.ndarray
selected_indices: np.ndarray
centers: np.ndarray
ion_names: tuple[str, ...]
cluster_column: str
algorithm: str = "min-max"
parameters: dict[str, float | int | bool] | None = None
@property
def counts(self) -> tuple[int, ...]:
return tuple(int(np.count_nonzero(self.labels == idx)) for idx in range(len(self.centers)))
@property
def n_clusters(self) -> int:
return int(len(self.centers))
[docs]
def parse_label_selection(selection: str | Iterable[str]) -> tuple[str, ...]:
"""Normalize a comma-separated label selection."""
if isinstance(selection, str):
labels = [item.strip() for item in selection.split(",")]
else:
labels = [str(item).strip() for item in selection]
labels = [label for label in labels if label]
return tuple(dict.fromkeys(labels))
[docs]
def normalize_clustering_method(method: str) -> str:
"""Return the canonical clustering method identifier."""
normalized = str(method).strip().lower().replace("_", "-").replace("+", "-").replace(" ", "-")
while "--" in normalized:
normalized = normalized.replace("--", "-")
aliases = {
"min-max": "min-max",
"minmax": "min-max",
"composition-gmm-voxel": "composition-gmm-voxel",
"composition-gmm": "composition-gmm-voxel",
"gmm-voxel": "composition-gmm-voxel",
"compspace-agnostic-seeded": "compspace-agnostic-seeded",
"comp-space-agnostic-seeded": "compspace-agnostic-seeded",
"agnostic-seeded": "compspace-agnostic-seeded",
"maximum-separation": "maximum-separation",
"max-separation": "maximum-separation",
"maximumseparation": "maximum-separation",
"maximum-seperation": "maximum-separation",
"max-seperation": "maximum-separation",
"mmax-separation": "maximum-separation",
"mmax-seperation": "maximum-separation",
"mmax-sepretion": "maximum-separation",
"hdbscan": "hdbscan",
"comp-seeded-support-hdbscan": "comp-seeded-support-hdbscan",
"comp-seeded-hdbscan": "comp-seeded-support-hdbscan",
"comp-seeded-support": "comp-seeded-support-hdbscan",
"comp-seeded": "comp-seeded-support-hdbscan",
}
resolved = aliases.get(normalized)
if resolved is None:
supported = ", ".join(SUPPORTED_CLUSTERING_METHODS)
raise ValueError(f"Unsupported clustering method {method!r}. Choose one of: {supported}")
return resolved
def _resolve_xyz(variables) -> np.ndarray:
x = np.asarray(getattr(variables, "x", np.zeros(0)))
y = np.asarray(getattr(variables, "y", np.zeros(0)))
z = np.asarray(getattr(variables, "z", np.zeros(0)))
if len(x) == 0 or len(y) == 0 or len(z) == 0:
raise ValueError("Reconstruction coordinates are empty. Run the reconstruction first.")
if not (len(x) == len(y) == len(z)):
raise ValueError("Reconstruction coordinates must have the same length.")
return np.column_stack((x, y, z))
def _resolve_mc(variables) -> np.ndarray:
mc = np.asarray(getattr(variables, "mc", np.zeros(0)))
if len(mc) == 0 and getattr(variables, "data", None) is not None and "mc (Da)" in variables.data.columns:
mc = variables.data["mc (Da)"].to_numpy()
if len(mc) == 0:
raise ValueError("Mass-to-charge data is empty. Load or extract calibrated data first.")
return mc
def _build_selection_mask(variables, ion_names: Sequence[str]) -> np.ndarray:
if getattr(variables, "range_data", None) is None or variables.range_data.empty:
raise ValueError("Range data is required for precipitate clustering.")
labels = {label.strip() for label in ion_names if str(label).strip()}
if not labels:
raise ValueError("Provide at least one ion or element label for clustering.")
mc = _resolve_mc(variables)
selection_mask = np.zeros(len(mc), dtype=bool)
matched_labels: set[str] = set()
for _, row in variables.range_data.iterrows():
row_ion = str(row.get("ion", "")).strip()
row_elements = row.get("element", [])
if not isinstance(row_elements, (list, tuple, np.ndarray)):
row_elements = [row_elements]
row_elements = {str(item).strip() for item in row_elements if str(item).strip()}
if row_ion in labels or labels.intersection(row_elements):
row_mask = (mc > float(row["mc_low"])) & (mc < float(row["mc_up"]))
selection_mask |= row_mask
if row_ion in labels:
matched_labels.add(row_ion)
matched_labels.update(labels.intersection(row_elements))
if not np.any(selection_mask):
joined = ", ".join(sorted(labels))
raise ValueError(f"No ions matched the requested cluster selection: {joined}")
if not matched_labels:
joined = ", ".join(sorted(labels))
raise ValueError(f"None of the requested labels were found in the range data: {joined}")
return selection_mask
def _write_cluster_labels(variables, cluster_column: str, labels: np.ndarray) -> None:
"""Persist clustering labels onto shared variables and dataframe when possible."""
if getattr(variables, "data", None) is not None and len(variables.data) == len(labels):
variables.data[cluster_column] = labels
setattr(variables, cluster_column, labels)
def _build_cluster_result(
variables,
*,
ion_names: tuple[str, ...],
selection_mask: np.ndarray,
labels: np.ndarray,
centers: np.ndarray,
cluster_column: str,
algorithm: str,
parameters: dict[str, float | int | bool] | None = None,
) -> MinMaxClusterResult:
"""Expand selected-ion labels back to the full reconstruction length."""
full_labels = np.full(len(selection_mask), -1, dtype=int)
selected_indices = np.flatnonzero(selection_mask)
full_labels[selected_indices] = labels
_write_cluster_labels(variables, cluster_column, full_labels)
return MinMaxClusterResult(
labels=full_labels,
selected_mask=selection_mask,
selected_indices=selected_indices,
centers=np.asarray(centers, dtype=float).reshape((-1, 3)) if len(centers) else np.empty((0, 3), dtype=float),
ion_names=ion_names,
cluster_column=cluster_column,
algorithm=algorithm,
parameters=parameters,
)
def _centers_from_labeled_points(points: np.ndarray, labels: np.ndarray) -> np.ndarray:
"""Compute cluster centers for non-negative labels."""
labels = np.asarray(labels, dtype=int)
valid = labels >= 0
if not np.any(valid):
return np.empty((0, 3), dtype=float)
centers = []
for label in sorted(np.unique(labels[valid]).tolist()):
centers.append(np.asarray(points[labels == label], dtype=float).mean(axis=0))
return np.asarray(centers, dtype=float)
def _drop_small_clusters(labels: np.ndarray, *, n_min: int) -> np.ndarray:
"""Mark clusters smaller than n_min as noise and compact surviving labels."""
labels = np.asarray(labels, dtype=int).copy()
if n_min <= 1:
return labels
valid = labels >= 0
if not np.any(valid):
return labels
unique, counts = np.unique(labels[valid], return_counts=True)
keep = {int(label) for label, count in zip(unique, counts) if int(count) >= int(n_min)}
labels = np.array([label if int(label) in keep else -1 for label in labels], dtype=int)
valid = labels >= 0
if not np.any(valid):
return labels
remap = {int(old): idx for idx, old in enumerate(sorted(np.unique(labels[valid]).tolist()))}
return np.array([remap[int(label)] if label >= 0 else -1 for label in labels], dtype=int)
[docs]
def min_max_clustering(points: np.ndarray, n_clusters: int = 2, max_iter: int = 50) -> tuple[np.ndarray, np.ndarray]:
"""Segment points with a deterministic Min-Max initialization plus centroid refinement."""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 3:
raise ValueError("points must be a (N, 3) array")
if n_clusters < 2:
raise ValueError("n_clusters must be at least 2")
if len(points) < n_clusters:
raise ValueError("Not enough points for the requested number of clusters")
centroid = points.mean(axis=0)
first_index = int(np.argmax(np.linalg.norm(points - centroid, axis=1)))
centers = [points[first_index]]
while len(centers) < n_clusters:
distances = np.stack([np.linalg.norm(points - center, axis=1) for center in centers], axis=1)
candidate_index = int(np.argmax(np.min(distances, axis=1)))
centers.append(points[candidate_index])
centers = np.asarray(centers, dtype=float)
labels = np.zeros(len(points), dtype=int)
for _ in range(max_iter):
distances = np.stack([np.linalg.norm(points - center, axis=1) for center in centers], axis=1)
new_labels = np.argmin(distances, axis=1)
new_centers = centers.copy()
for idx in range(n_clusters):
cluster_points = points[new_labels == idx]
if len(cluster_points) > 0:
new_centers[idx] = cluster_points.mean(axis=0)
if np.array_equal(new_labels, labels) and np.allclose(new_centers, centers):
labels = new_labels
centers = new_centers
break
labels = new_labels
centers = new_centers
order = np.argsort(centers[:, 0], kind="stable")
remap = {int(old): int(new) for new, old in enumerate(order)}
labels = np.array([remap[int(label)] for label in labels], dtype=int)
centers = centers[order]
return labels, centers
[docs]
def estimate_maximum_separation_distance(
points: np.ndarray,
*,
kth_neighbor: int = 3,
percentile: float = 50.0,
) -> float:
"""Estimate `d_max` from the kth-nearest-neighbor distance distribution."""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 3:
raise ValueError("points must be a (N, 3) array")
if len(points) < 2:
raise ValueError("At least two points are required to estimate d_max")
kth_neighbor = int(kth_neighbor)
if kth_neighbor < 1:
raise ValueError("kth_neighbor must be at least 1")
percentile = float(percentile)
if not 0.0 <= percentile <= 100.0:
raise ValueError("percentile must be between 0 and 100")
effective_neighbor = min(kth_neighbor, len(points) - 1)
tree = cKDTree(points)
distances, _ = tree.query(points, k=effective_neighbor + 1)
kth_distances = np.asarray(distances[:, effective_neighbor], dtype=float)
d_max = float(np.percentile(kth_distances, percentile))
if not np.isfinite(d_max) or d_max <= 0:
raise ValueError("Estimated d_max must be a positive finite number")
return d_max
[docs]
def maximum_separation_clustering(
points: np.ndarray,
*,
d_max: float,
n_min: int,
) -> tuple[np.ndarray, np.ndarray]:
"""Cluster points by connected components with maximum edge length `d_max`."""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 3:
raise ValueError("points must be a (N, 3) array")
if len(points) == 0:
raise ValueError("points cannot be empty")
d_max = float(d_max)
if not np.isfinite(d_max) or d_max <= 0:
raise ValueError("d_max must be a positive finite number")
n_min = int(n_min)
if n_min < 2:
raise ValueError("n_min must be at least 2")
n_points = len(points)
labels = np.full(n_points, -1, dtype=int)
if n_points < n_min:
return labels, np.empty((0, 3), dtype=float)
tree = cKDTree(points)
try:
pairs = tree.query_pairs(d_max, output_type="ndarray")
except TypeError:
pairs = np.asarray(sorted(tree.query_pairs(d_max)), dtype=int)
if pairs.size == 0:
return labels, np.empty((0, 3), dtype=float)
parent = np.arange(n_points, dtype=int)
size = np.ones(n_points, dtype=int)
def find(index: int) -> int:
while parent[index] != index:
parent[index] = parent[parent[index]]
index = parent[index]
return index
def union(left: int, right: int) -> None:
root_left = find(left)
root_right = find(right)
if root_left == root_right:
return
if size[root_left] < size[root_right]:
root_left, root_right = root_right, root_left
parent[root_right] = root_left
size[root_left] += size[root_right]
for left, right in np.asarray(pairs, dtype=int):
union(int(left), int(right))
roots = np.fromiter((find(index) for index in range(n_points)), count=n_points, dtype=int)
unique_roots, inverse, counts = np.unique(roots, return_inverse=True, return_counts=True)
valid_mask = counts >= n_min
if not np.any(valid_mask):
return labels, np.empty((0, 3), dtype=float)
centers = []
cluster_sizes = []
old_to_new: dict[int, int] = {}
valid_root_ids = np.flatnonzero(valid_mask)
for local_root_id in valid_root_ids:
member_mask = inverse == local_root_id
centers.append(points[member_mask].mean(axis=0))
cluster_sizes.append(int(np.count_nonzero(member_mask)))
old_to_new[int(local_root_id)] = len(centers) - 1
centers = np.asarray(centers, dtype=float)
cluster_sizes = np.asarray(cluster_sizes, dtype=int)
order = np.lexsort((centers[:, 2], centers[:, 1], centers[:, 0], -cluster_sizes))
remap = {int(old_idx): int(new_idx) for new_idx, old_idx in enumerate(order)}
for point_index, local_root_id in enumerate(inverse):
if not valid_mask[local_root_id]:
continue
labels[point_index] = remap[old_to_new[int(local_root_id)]]
centers = centers[order]
return labels, centers
def hdbscan_clustering(
points: np.ndarray,
*,
n_min: int,
min_samples: int = 3,
d_max: float | None = None,
auto_d_max: bool = True,
percentile: float = 50.0,
) -> tuple[np.ndarray, np.ndarray, dict[str, float | int | bool]]:
"""Cluster points using HDBSCAN when available, with DBSCAN fallback."""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 3:
raise ValueError("points must be a (N, 3) array")
if len(points) == 0:
raise ValueError("points cannot be empty")
n_min = max(2, int(n_min))
min_samples = max(1, int(min_samples))
parameters: dict[str, float | int | bool] = {
"n_min": int(n_min),
"min_samples": int(min_samples),
}
try:
import hdbscan as _hdbscan
clusterer = _hdbscan.HDBSCAN(min_cluster_size=n_min, min_samples=min_samples)
labels = np.asarray(clusterer.fit_predict(points), dtype=int)
labels = _drop_small_clusters(labels, n_min=n_min)
centers = _centers_from_labeled_points(points, labels)
parameters["backend"] = True
return labels, centers, parameters
except Exception:
# Fallback for environments without hdbscan: density clustering with DBSCAN.
from sklearn.cluster import DBSCAN
if auto_d_max or d_max is None:
eps = estimate_maximum_separation_distance(
points,
kth_neighbor=max(1, min_samples),
percentile=float(percentile),
)
else:
eps = float(d_max)
labels = np.asarray(DBSCAN(eps=eps, min_samples=n_min).fit_predict(points), dtype=int)
labels = _drop_small_clusters(labels, n_min=n_min)
centers = _centers_from_labeled_points(points, labels)
parameters.update(
{"backend": False, "d_max": float(eps), "auto_d_max": bool(auto_d_max), "percentile": float(percentile)}
)
return labels, centers, parameters
def composition_gmm_voxel_clustering(
points: np.ndarray,
*,
n_clusters: int,
voxel_size: float = 1.0,
n_min: int = 2,
) -> tuple[np.ndarray, np.ndarray, dict[str, float | int | bool]]:
"""Cluster points by fitting a Gaussian mixture over voxelized spatial features."""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 3:
raise ValueError("points must be a (N, 3) array")
n_clusters = max(2, int(n_clusters))
voxel_size = float(voxel_size)
if not np.isfinite(voxel_size) or voxel_size <= 0:
raise ValueError("voxel_size must be a positive finite number")
voxel_index = np.floor(points / voxel_size).astype(int)
unique_voxels, inverse, counts = np.unique(voxel_index, axis=0, return_inverse=True, return_counts=True)
voxel_centers = (unique_voxels.astype(float) + 0.5) * voxel_size
# Include normalized occupancy as a weak composition-like signal per voxel.
occupancy = counts.astype(float) / max(float(np.max(counts)), 1.0)
features = np.column_stack((voxel_centers, occupancy))
from sklearn.mixture import GaussianMixture
n_components = min(n_clusters, len(features))
gmm = GaussianMixture(n_components=n_components, covariance_type="full", random_state=0)
voxel_labels = np.asarray(gmm.fit_predict(features), dtype=int)
point_labels = voxel_labels[inverse]
point_labels = _drop_small_clusters(point_labels, n_min=max(2, int(n_min)))
centers = _centers_from_labeled_points(points, point_labels)
params = {"n_clusters": int(n_clusters), "voxel_size": float(voxel_size), "n_min": int(max(2, int(n_min)))}
return point_labels, centers, params
def _seed_points_from_selection(
variables,
seed_labels: Sequence[str] | str,
) -> np.ndarray:
labels = parse_label_selection(seed_labels)
if not labels:
return np.empty((0, 3), dtype=float)
xyz = _resolve_xyz(variables)
seed_mask = _build_selection_mask(variables, labels)
return xyz[seed_mask]
def compspace_agnostic_seeded_clustering(
points: np.ndarray,
*,
n_clusters: int,
seed_points: np.ndarray | None = None,
n_min: int = 2,
) -> tuple[np.ndarray, np.ndarray, dict[str, float | int | bool]]:
"""Cluster points with optional seeded initialization, agnostic to composition features."""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 3:
raise ValueError("points must be a (N, 3) array")
n_clusters = max(2, int(n_clusters))
from sklearn.cluster import KMeans
if seed_points is None:
seed_points = np.empty((0, 3), dtype=float)
seed_points = np.asarray(seed_points, dtype=float)
if seed_points.ndim != 2:
seed_points = np.empty((0, 3), dtype=float)
if seed_points.size and seed_points.shape[1] != 3:
seed_points = np.empty((0, 3), dtype=float)
init = "k-means++"
n_init: int | str = "auto"
if len(seed_points) > 0:
if len(seed_points) >= n_clusters:
init = seed_points[:n_clusters]
else:
remaining = n_clusters - len(seed_points)
# Add farthest points for missing seeds.
centroid = points.mean(axis=0)
distances = np.linalg.norm(points - centroid, axis=1)
extra = points[np.argsort(distances)[-remaining:]]
init = np.vstack((seed_points, extra))
n_init = 1
model = KMeans(n_clusters=n_clusters, init=init, n_init=n_init, random_state=0)
labels = np.asarray(model.fit_predict(points), dtype=int)
labels = _drop_small_clusters(labels, n_min=max(2, int(n_min)))
centers = _centers_from_labeled_points(points, labels)
params = {"n_clusters": int(n_clusters), "n_min": int(max(2, int(n_min))), "seeded": bool(len(seed_points) > 0)}
return labels, centers, params
[docs]
def segment_ions_by_min_max(
variables,
ion_names: Sequence[str] | str,
*,
n_clusters: int = 2,
cluster_column: str = "cluster_minmax",
) -> MinMaxClusterResult:
"""Cluster a selected ion population into `n_clusters` precipitate segments."""
ion_names_tuple = parse_label_selection(ion_names)
xyz = _resolve_xyz(variables)
selection_mask = _build_selection_mask(variables, ion_names_tuple)
labels, centers = min_max_clustering(xyz[selection_mask], n_clusters=n_clusters)
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column,
algorithm="min-max",
parameters={"n_clusters": int(n_clusters)},
)
[docs]
def segment_ions_by_maximum_separation(
variables,
ion_names: Sequence[str] | str,
*,
d_max: float | None = None,
n_min: int = 25,
auto_d_max: bool = True,
kth_neighbor: int = 3,
percentile: float = 50.0,
cluster_column: str = "cluster_maxsep",
) -> MinMaxClusterResult:
"""Cluster a selected ion population with a fast maximum-separation rule."""
ion_names_tuple = parse_label_selection(ion_names)
xyz = _resolve_xyz(variables)
selection_mask = _build_selection_mask(variables, ion_names_tuple)
selected_points = xyz[selection_mask]
if auto_d_max or d_max is None:
d_max_value = estimate_maximum_separation_distance(
selected_points,
kth_neighbor=kth_neighbor,
percentile=percentile,
)
else:
d_max_value = float(d_max)
labels, centers = maximum_separation_clustering(
selected_points,
d_max=d_max_value,
n_min=n_min,
)
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column,
algorithm="maximum-separation",
parameters={
"d_max": float(d_max_value),
"n_min": int(n_min),
"kth_neighbor": int(kth_neighbor),
"percentile": float(percentile),
"auto_d_max": bool(auto_d_max),
},
)
[docs]
def segment_ions(
variables,
ion_names: Sequence[str] | str,
*,
method: str = "min-max",
n_clusters: int = 2,
d_max: float | None = None,
n_min: int = 25,
auto_d_max: bool = True,
kth_neighbor: int = 3,
percentile: float = 50.0,
voxel_size: float = 1.0,
seed_labels: Sequence[str] | str | None = None,
cluster_column: str | None = None,
) -> MinMaxClusterResult:
"""Cluster a selected ion population with the requested algorithm."""
normalized_method = normalize_clustering_method(method)
if normalized_method == "min-max":
return segment_ions_by_min_max(
variables,
ion_names,
n_clusters=n_clusters,
cluster_column=cluster_column or "cluster_minmax",
)
ion_names_tuple = parse_label_selection(ion_names)
xyz = _resolve_xyz(variables)
selection_mask = _build_selection_mask(variables, ion_names_tuple)
selected_points = xyz[selection_mask]
if normalized_method == "maximum-separation":
d_max_value = (
estimate_maximum_separation_distance(selected_points, kth_neighbor=kth_neighbor, percentile=percentile)
if auto_d_max or d_max is None
else float(d_max)
)
labels, centers = maximum_separation_clustering(
selected_points,
d_max=d_max_value,
n_min=n_min,
)
params = {
"d_max": float(d_max_value),
"n_min": int(n_min),
"kth_neighbor": int(kth_neighbor),
"percentile": float(percentile),
"auto_d_max": bool(auto_d_max),
}
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column or "cluster_maxsep",
algorithm="maximum-separation",
parameters=params,
)
if normalized_method == "hdbscan":
labels, centers, params = hdbscan_clustering(
selected_points,
n_min=n_min,
min_samples=kth_neighbor,
d_max=d_max,
auto_d_max=auto_d_max,
percentile=percentile,
)
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column or "cluster_hdbscan",
algorithm="hdbscan",
parameters=params,
)
if normalized_method == "comp-seeded-support-hdbscan":
seed_points = _seed_points_from_selection(variables, seed_labels or ())
seeded_support = len(seed_points) >= 2
d_max_seed = d_max
auto_from_seed = auto_d_max
if seeded_support and (auto_d_max or d_max is None):
d_max_seed = estimate_maximum_separation_distance(
seed_points,
kth_neighbor=max(1, int(kth_neighbor)),
percentile=float(percentile),
)
auto_from_seed = False
labels, centers, params = hdbscan_clustering(
selected_points,
n_min=n_min,
min_samples=kth_neighbor,
d_max=d_max_seed,
auto_d_max=auto_from_seed,
percentile=percentile,
)
params.update({"seeded_support": bool(seeded_support)})
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column or "cluster_comp_seed_hdbscan",
algorithm="comp-seeded-support-hdbscan",
parameters=params,
)
if normalized_method == "composition-gmm-voxel":
labels, centers, params = composition_gmm_voxel_clustering(
selected_points,
n_clusters=n_clusters,
voxel_size=voxel_size,
n_min=n_min,
)
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column or "cluster_comp_gmm_voxel",
algorithm="composition-gmm-voxel",
parameters=params,
)
seed_points = _seed_points_from_selection(variables, seed_labels or ())
labels, centers, params = compspace_agnostic_seeded_clustering(
selected_points,
n_clusters=n_clusters,
seed_points=seed_points,
n_min=n_min,
)
return _build_cluster_result(
variables,
ion_names=ion_names_tuple,
selection_mask=selection_mask,
labels=labels,
centers=centers,
cluster_column=cluster_column or "cluster_comp_seeded",
algorithm="compspace-agnostic-seeded",
parameters=params,
)
[docs]
def build_cluster_scatter_traces(
variables,
cluster_result: MinMaxClusterResult,
*,
opacity: float = 0.9,
marker_size: float = 2.5,
valid_mask: np.ndarray | None = None,
) -> list[go.Scatter3d]:
"""Build Plotly traces for clustered precipitate segments."""
if valid_mask is None:
mask_limit = np.ones(len(cluster_result.labels), dtype=bool)
else:
mask_limit = np.asarray(valid_mask, dtype=bool)
if mask_limit.shape != cluster_result.labels.shape:
raise ValueError("valid_mask must match the cluster label array length")
traces: list[go.Scatter3d] = []
for label_index in range(cluster_result.n_clusters):
mask = (cluster_result.labels == label_index) & mask_limit
if not np.any(mask):
continue
cluster_size = int(np.count_nonzero(mask))
traces.append(
go.Scatter3d(
x=np.asarray(variables.x)[mask],
y=np.asarray(variables.y)[mask],
z=np.asarray(variables.z)[mask],
mode="markers",
name=f"Cluster {label_index + 1} (n={cluster_size})",
showlegend=True,
legendgroup="clusters",
legendrank=10 + label_index,
marker=dict(
size=marker_size,
color=DEFAULT_CLUSTER_COLORS[label_index % len(DEFAULT_CLUSTER_COLORS)],
opacity=opacity,
),
)
)
return traces
[docs]
def build_cluster_context_trace(
variables,
*,
mask: np.ndarray,
name: str,
color: str = "rgba(160,160,160,0.55)",
opacity: float = 0.12,
marker_size: float = 1.0,
showlegend: bool = True,
) -> go.Scatter3d | None:
"""Build a faint context trace to show specimen geometry around clusters."""
mask = np.asarray(mask, dtype=bool)
if len(mask) == 0 or not np.any(mask):
return None
return go.Scatter3d(
x=np.asarray(variables.x)[mask],
y=np.asarray(variables.y)[mask],
z=np.asarray(variables.z)[mask],
mode="markers",
name=name,
showlegend=showlegend,
legendgroup="cluster-context",
legendrank=1,
marker=dict(
size=marker_size,
color=color,
opacity=opacity,
),
)
__all__ = [
"MinMaxClusterResult",
"SUPPORTED_CLUSTERING_METHODS",
"build_cluster_context_trace",
"build_cluster_scatter_traces",
"estimate_maximum_separation_distance",
"min_max_clustering",
"maximum_separation_clustering",
"normalize_clustering_method",
"parse_label_selection",
"segment_ions",
"segment_ions_by_maximum_separation",
"segment_ions_by_min_max",
]