Source code for maldibatchkit.diagnostics.maldi

"""MALDI-specific batch diagnostics.

These functions operate directly on binned-intensity matrices (rows =
samples, columns = m/z bins) and produce tidy per-batch summaries.
"""

from __future__ import annotations

from typing import Any

import numpy as np
import numpy.typing as npt
import pandas as pd

from .._utils import ArrayLike, _resolve_mz_axis

__all__ = [
    "peak_position_drift",
    "per_batch_spectrum_count",
    "tic_cov_per_batch",
]


def _to_dataframe(X: ArrayLike) -> pd.DataFrame:
    if isinstance(X, pd.DataFrame):
        return X
    arr = np.asarray(X)
    return pd.DataFrame(arr)


[docs] def per_batch_spectrum_count(batch: ArrayLike) -> pd.Series: """Return the number of spectra per batch as a sorted Series.""" s = pd.Series(np.asarray(batch)).value_counts().sort_index() s.name = "n_spectra" s.index.name = "batch" return s
[docs] def tic_cov_per_batch(X: ArrayLike, batch: ArrayLike) -> pd.Series: """Per-batch coefficient of variation of the Total Ion Count. High CoV within a batch suggests substantial within-batch intensity fluctuation, which in MALDI-TOF tends to be driven by acquisition issues rather than biology. Parameters ---------- X : array-like of shape (n_samples, n_features) Binned intensities. batch : array-like of shape (n_samples,) Batch labels. Returns ------- pd.Series CoV (``std/mean``) of TIC for each batch, indexed by batch level. """ df = _to_dataframe(X) tic = df.sum(axis=1).to_numpy() b = np.asarray(batch) out: dict[Any, float] = {} for lvl in np.unique(b): vals = tic[b == lvl] mean = float(vals.mean()) std = float(vals.std(ddof=0)) out[lvl] = std / mean if mean > 0 else float("nan") s = pd.Series(out).sort_index() s.name = "tic_cov" s.index.name = "batch" return s
[docs] def peak_position_drift( X: ArrayLike, batch: ArrayLike, *, mz_values: ArrayLike | None = None, top_k: int = 50, ) -> pd.DataFrame: """Per-batch peak-position drift relative to a global reference. For each batch we compute its median spectrum, identify the top ``top_k`` peaks of the **global** median spectrum, and locate the nearest local maximum of the per-batch median to each of those global peaks. The returned table summarises the distribution of (signed) position shifts per batch. Parameters ---------- X : array-like of shape (n_samples, n_features) Binned intensities. batch : array-like of shape (n_samples,) Batch labels. mz_values : array-like of shape (n_features,), optional m/z values for the columns of ``X``. If None, integer column positions are used, and the returned ``mean_delta_mz`` column is in bin units. top_k : int, default=50 Number of global peaks to track. Returns ------- pd.DataFrame One row per batch, with columns ``mean_delta_mz``, ``median_delta_mz``, ``max_abs_delta_mz``. """ df = _to_dataframe(X).astype(float) b = np.asarray(batch) mz, _ = _resolve_mz_axis(df, mz_values) global_median = df.median(axis=0).to_numpy() # Local maxima of the global median peaks_idx = _local_maxima(global_median) if peaks_idx.size == 0: # Degenerate case: flat / monotonic spectrum return pd.DataFrame( { "mean_delta_mz": [], "median_delta_mz": [], "max_abs_delta_mz": [], }, ) peak_intensities = global_median[peaks_idx] order = np.argsort(-peak_intensities)[:top_k] tracked = peaks_idx[order] records = [] for lvl in np.unique(b): batch_median = df.loc[b == lvl].median(axis=0).to_numpy() batch_peaks = _local_maxima(batch_median) if batch_peaks.size == 0: records.append( { "batch": lvl, "mean_delta_mz": float("nan"), "median_delta_mz": float("nan"), "max_abs_delta_mz": float("nan"), } ) continue deltas = [] for p in tracked: nearest = batch_peaks[int(np.argmin(np.abs(batch_peaks - p)))] deltas.append(mz[nearest] - mz[p]) deltas_arr = np.asarray(deltas) records.append( { "batch": lvl, "mean_delta_mz": float(deltas_arr.mean()), "median_delta_mz": float(np.median(deltas_arr)), "max_abs_delta_mz": float(np.max(np.abs(deltas_arr))), } ) out = pd.DataFrame(records).set_index("batch") out = out.sort_index() return out
def _local_maxima(x: npt.NDArray[np.float64]) -> npt.NDArray[np.int64]: """Return indices of strict local maxima of a 1-D array.""" if x.size < 3: return np.asarray([], dtype=np.int64) greater_prev = x[1:-1] > x[:-2] greater_next = x[1:-1] > x[2:] return np.where(greater_prev & greater_next)[0] + 1