# -*- coding: utf-8 -*-
r"""Peak-calling helpers for ROCCO-style segmentation from Consenrich tracks."""
from __future__ import annotations
import json
import logging
import math
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
from scipy import signal, stats
from . import cconsenrich
from . import core
logger = logging.getLogger(__name__)
_TINY = float(np.finfo(np.float64).tiny)
_ROCCO_BUDGET_MIN = 0.001
_ROCCO_BUDGET_MAX = 0.10
_ROCCO_NULL_QUANTILE = 0.80
_ROCCO_THRESHOLD_Z_DEFAULT = 2.0
_ROCCO_NUM_BOOTSTRAP_DEFAULT = 128
_ROCCO_BUDGET_Z_GRID = (1.5, 2.0, 2.5, 3.0)
_ROCCO_MAX_ITER_DEFAULT = 60
_NESTED_ROCCO_ITERS_DEFAULT = 3
_NESTED_ROCCO_JACCARD_DEFAULT = 0.999
_NESTED_ROCCO_MIN_PARENT_STEPS = 5
_NESTED_ROCCO_MIN_CHILD_STEPS = _NESTED_ROCCO_MIN_PARENT_STEPS
_NESTED_ROCCO_BUDGET_SCALE_DEFAULT = 0.5
_NESTED_ROCCO_SUBPROBLEM_MAX_ITER = 5
_EXPORT_MEDIAN_SIGNAL_LOCAL_UNCERTAINTY_MULTIPLIER = 2.5
def _asFloatVector(name: str, values) -> np.ndarray:
arr = np.asarray(values, dtype=np.float64)
if arr.ndim != 1:
raise ValueError(f"`{name}` must be one-dimensional")
if arr.size == 0:
raise ValueError(f"`{name}` must be non-empty")
if not np.all(np.isfinite(arr)):
raise ValueError(f"`{name}` contains non-finite values")
return arr
def _validateExportFilterUncertaintyMultiplier(value: float) -> float:
value_ = float(value)
if not np.isfinite(value_) or value_ < 0.0:
raise ValueError(
"`exportFilterUncertaintyMultiplier` must be finite and non-negative"
)
return value_
def _thresholdZKey(thresholdZ: float) -> str:
z = float(thresholdZ)
return f"{z:.6f}".rstrip("0").rstrip(".")
def _resolveThresholdZGrid(
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
thresholdZGrid: Iterable[float] | None = None,
) -> List[float]:
values: List[float] = []
seen: set[str] = set()
for value in [
thresholdZ,
*(_ROCCO_BUDGET_Z_GRID if thresholdZGrid is None else thresholdZGrid),
]:
value_ = float(max(float(value), 0.0))
key = _thresholdZKey(value_)
if key not in seen:
seen.add(key)
values.append(value_)
values.sort()
return values
def _halfSampleMode(sortedValues: np.ndarray) -> float:
vals = np.asarray(sortedValues, dtype=np.float64)
vals = vals[np.isfinite(vals)]
n = int(vals.size)
if n == 0:
return 0.0
if n == 1:
return float(vals[0])
if n == 2:
return float(np.mean(vals))
if n == 3:
leftWidth = float(vals[1] - vals[0])
rightWidth = float(vals[2] - vals[1])
return (
float(np.mean(vals[:2]))
if leftWidth <= rightWidth
else float(np.mean(vals[1:]))
)
window = int(math.ceil(n / 2))
bestStart = 0
bestWidth = float(vals[window - 1] - vals[0])
for start in range(1, n - window + 1):
width = float(vals[start + window - 1] - vals[start])
if width < bestWidth:
bestWidth = width
bestStart = start
return _halfSampleMode(vals[bestStart : bestStart + window])
def studentizedScoreTrack(
state: npt.ArrayLike,
uncertainty: npt.ArrayLike | None = None,
tau0: float = 1.0,
returnDetails: bool = False,
) -> np.ndarray | Tuple[np.ndarray, Dict[str, float | str]]:
r"""Build a studentized score track from Consenrich outputs."""
state_ = _asFloatVector("state", state)
n = int(state_.size)
tau0_ = float(max(tau0, 0.0))
seMode = "identity"
if uncertainty is None:
effectiveSE = np.ones(n, dtype=np.float64)
else:
effectiveSE = _asFloatVector("uncertainty", uncertainty)
if effectiveSE.size != n:
raise ValueError("`uncertainty` must match `state` length")
seMode = "uncertainty"
effectiveSE = np.maximum(effectiveSE, 0.0)
denom = np.sqrt(effectiveSE * effectiveSE + tau0_ * tau0_)
denom = np.maximum(denom, 1.0e-6)
scoreTrack = state_ / denom
if not returnDetails:
return scoreTrack
details: Dict[str, float | str] = {
"se_mode": str(seMode),
"tau0": float(tau0_),
"state_abs_median": float(np.median(np.abs(state_))),
"uncertainty_median": float(np.median(effectiveSE)),
"uncertainty_min": float(np.min(effectiveSE)),
"uncertainty_max": float(np.max(effectiveSE)),
"score_denom_median": float(np.median(denom)),
"score_denom_min": float(np.min(denom)),
"score_denom_max": float(np.max(denom)),
}
return scoreTrack, details
def shrinkageScoreTrack(
state: npt.ArrayLike,
uncertainty: npt.ArrayLike | None = None,
nullCenter: float = 0.0,
tau0: float = 1.0,
returnDetails: bool = False,
) -> np.ndarray | Tuple[np.ndarray, Dict[str, float | str]]:
r"""Build a posterior-mean-style shrinkage score track for ROCCO."""
state_ = _asFloatVector("state", state)
n = int(state_.size)
tau0_ = float(max(tau0, 0.0))
nullCenter_ = float(nullCenter)
priorVariance = float(tau0_ * tau0_)
seMode = "identity"
if uncertainty is None:
effectiveSE = np.zeros(n, dtype=np.float64)
shrinkWeights = np.ones(n, dtype=np.float64)
else:
effectiveSE = _asFloatVector("uncertainty", uncertainty)
if effectiveSE.size != n:
raise ValueError("`uncertainty` must match `state` length")
effectiveSE = np.maximum(effectiveSE, 0.0)
seMode = "uncertainty"
if priorVariance <= 0.0:
shrinkWeights = np.zeros(n, dtype=np.float64)
else:
shrinkWeights = priorVariance / np.maximum(
priorVariance + effectiveSE * effectiveSE,
1.0e-12,
)
centeredState = state_ - nullCenter_
scoreTrack = shrinkWeights * centeredState
if not returnDetails:
return scoreTrack
details: Dict[str, float | str] = {
"score_mode": "posterior_mean_shrinkage",
"se_mode": str(seMode),
"tau0": float(tau0_),
"prior_variance": float(priorVariance),
"null_center_input": float(nullCenter_),
"centered_state_abs_median": float(np.median(np.abs(centeredState))),
"uncertainty_median": float(np.median(effectiveSE)),
"uncertainty_min": float(np.min(effectiveSE)),
"uncertainty_max": float(np.max(effectiveSE)),
"shrink_weight_median": float(np.median(shrinkWeights)),
"shrink_weight_min": float(np.min(shrinkWeights)),
"shrink_weight_max": float(np.max(shrinkWeights)),
}
return scoreTrack, details
def consenrichStateScoreTrack(
state: npt.ArrayLike,
uncertainty: npt.ArrayLike | None = None,
returnDetails: bool = False,
) -> np.ndarray | Tuple[np.ndarray, Dict[str, float | str | bool]]:
r"""Use the fitted Consenrich state directly as the ROCCO score track."""
state_ = _asFloatVector("state", state)
seMode = "none"
uncertaintyAvailable = False
if uncertainty is not None:
uncertainty_ = _asFloatVector("uncertainty", uncertainty)
if uncertainty_.size != state_.size:
raise ValueError("`uncertainty` must match `state` length")
seMode = "ignored"
uncertaintyAvailable = True
if not returnDetails:
return state_
details: Dict[str, float | str | bool] = {
"score_mode": "consenrich_state",
"se_mode": str(seMode),
"uncertainty_available": bool(uncertaintyAvailable),
"uncertainty_used": False,
"state_median": float(np.median(state_)),
"state_abs_median": float(np.median(np.abs(state_))),
"state_min": float(np.min(state_)),
"state_max": float(np.max(state_)),
}
return state_, details
def _selectRobustNullSupport(
values: np.ndarray,
bulkQuantile: float = 0.60,
) -> Tuple[np.ndarray, Dict[str, float | str | int]]:
z = _asFloatVector("values", values)
n = int(z.size)
bulkQuantile_ = float(np.clip(bulkQuantile, 0.05, 0.95))
minSupport = max(16, int(math.ceil(0.05 * n)))
cutoff = float(
np.quantile(
z,
bulkQuantile_,
method="interpolated_inverted_cdf",
)
)
lowerBulk = z[z <= cutoff]
bulkSource = "lower_bulk"
if lowerBulk.size < minSupport:
lowerBulk = z
bulkSource = "full_track"
bulkSorted = np.sort(np.asarray(lowerBulk, dtype=np.float64))
provisionalCenter = float(
_halfSampleMode(bulkSorted) if bulkSorted.size >= 4 else np.median(bulkSorted)
)
centerMethod = (
f"{bulkSource}_half_sample_mode"
if bulkSorted.size >= 4
else f"{bulkSource}_median"
)
bulkResiduals = bulkSorted - provisionalCenter
bulkMad = 1.4826 * float(
np.median(np.abs(bulkResiduals - np.median(bulkResiduals)))
)
bulkIqr = (
float(stats.iqr(bulkResiduals, rng=(25, 75))) / 1.349
if bulkResiduals.size >= 4
else 0.0
)
bulkStd = float(np.std(bulkResiduals, ddof=1)) if bulkResiduals.size >= 2 else 0.0
supportScale = float(max(bulkMad, bulkIqr, bulkStd, 1.0e-6))
supportRadius = float(
max(
2.5 * supportScale,
(
float(
np.quantile(
np.abs(bulkResiduals),
0.50,
method="interpolated_inverted_cdf",
)
)
if bulkResiduals.size >= 4
else supportScale
),
1.0e-6,
)
)
support = z[np.abs(z - provisionalCenter) <= supportRadius]
if support.size >= minSupport:
method = "mode_centered_central_support"
else:
order = np.argsort(np.abs(z - provisionalCenter))
support = z[np.asarray(order[:minSupport], dtype=np.int64)]
method = "mode_centered_nearest_support"
details: Dict[str, float | str | int] = {
"null_method": str(method),
"support_size": int(support.size),
"lower_bulk_size": int(lowerBulk.size),
"bulk_quantile": float(bulkQuantile_),
"bulk_cutoff": float(cutoff),
"provisional_center": float(provisionalCenter),
"center_method": str(centerMethod),
"support_radius": float(supportRadius),
"support_scale": float(supportScale),
}
return np.asarray(support, dtype=np.float64), details
def estimateROCCONull(
scoreTrack: npt.ArrayLike,
bulkQuantile: float = 0.60,
) -> Tuple[float, float, Dict[str, float | str]]:
r"""Estimate a robust null center and scale from mode-centered central support."""
z = _asFloatVector("scoreTrack", scoreTrack)
support, supportMeta = _selectRobustNullSupport(
z,
bulkQuantile=bulkQuantile,
)
nullCenter = float(supportMeta["provisional_center"])
centeredSupport = support - nullCenter
scaleBase = centeredSupport
scaleMethod = "central_support_mad"
scaleMad = 1.4826 * float(np.median(np.abs(scaleBase - np.median(scaleBase))))
scaleIqr = (
float(stats.iqr(scaleBase, rng=(25, 75))) / 1.349
if scaleBase.size >= 4
else 0.0
)
scaleStd = float(np.std(scaleBase, ddof=1)) if scaleBase.size >= 2 else 0.0
nullScale = float(max(scaleMad, scaleIqr, scaleStd, 1.0e-6))
details: Dict[str, float | str] = {
"null_method": str(supportMeta["null_method"]),
"scale_method": str(scaleMethod),
"support_size": int(supportMeta["support_size"]),
"lower_bulk_size": int(supportMeta["lower_bulk_size"]),
"bulk_quantile": float(supportMeta["bulk_quantile"]),
"bulk_cutoff": float(supportMeta["bulk_cutoff"]),
"provisional_center": float(supportMeta["provisional_center"]),
"center_method": str(supportMeta["center_method"]),
"support_radius": float(supportMeta["support_radius"]),
"support_scale": float(supportMeta["support_scale"]),
"null_center": float(nullCenter),
"null_scale": float(nullScale),
"null_center_shift_from_zero": float(nullCenter),
"support_fraction": float(supportMeta["support_size"] / max(int(z.size), 1)),
}
return nullCenter, nullScale, details
def _estimateTemplateScale(template: np.ndarray) -> float:
centeredTemplate = _asFloatVector("template", template)
scaleMad = 1.4826 * float(
np.median(np.abs(centeredTemplate - np.median(centeredTemplate)))
)
scaleIqr = (
float(stats.iqr(centeredTemplate, rng=(25, 75))) / 1.349
if centeredTemplate.size >= 4
else 0.0
)
scaleStd = (
float(np.std(centeredTemplate, ddof=1)) if centeredTemplate.size >= 2 else 0.0
)
return float(max(scaleMad, scaleIqr, scaleStd, 1.0e-6))
def _calibrateStationaryNullDWB(
scoreTrack: np.ndarray,
template: np.ndarray,
nullCenter: float,
nullScale: float,
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
thresholdZGrid: Iterable[float] | None = None,
dependenceSpan: int | None = None,
numBootstrap: int = _ROCCO_NUM_BOOTSTRAP_DEFAULT,
kernel: str = "bartlett",
randomSeed: int = 0,
calibrationQuantile: float = _ROCCO_NULL_QUANTILE,
pooledNullFloor: Dict[str, Any] | None = None,
templateMeta: Dict[str, Any] | None = None,
coreMeta: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
scoreTrack_ = _asFloatVector("scoreTrack", scoreTrack)
template_ = _asFloatVector("template", template)
zGrid = _resolveThresholdZGrid(
thresholdZ=thresholdZ,
thresholdZGrid=thresholdZGrid,
)
dependenceSpanDetails = _resolveRoccoDependenceSpanDetails(
scoreTrack_,
dependenceSpan=dependenceSpan,
)
dependenceSpan_ = int(dependenceSpanDetails["point"])
numBootstrap_ = max(int(numBootstrap), 8)
calibrationQuantile_ = float(np.clip(calibrationQuantile, 0.50, 0.999))
kernel_ = str(kernel)
panelId = (
f"stationary_null_dwb:{int(randomSeed)}:{int(dependenceSpan_)}:"
f"{int(numBootstrap_)}:{kernel_}:{int(template_.size)}"
)
rng = np.random.default_rng(int(randomSeed))
upperTailOffsets: Dict[str, np.ndarray] = {
_thresholdZKey(z): np.empty(numBootstrap_, dtype=np.float64) for z in zGrid
}
for b in range(numBootstrap_):
multipliers = _generateDWBMultipliers(
template_.size,
dependenceSpan_,
rng,
kernel=kernel_,
)
draw = np.asarray(template_, dtype=np.float64) * np.asarray(
multipliers, dtype=np.float64
)
draw -= float(np.mean(draw))
for z in zGrid:
tailAlpha = float(stats.norm.sf(float(max(z, 0.0))))
tailQuantile = 1.0 - tailAlpha if float(z) > 0.0 else 0.5
upperTailOffsets[_thresholdZKey(z)][b] = float(
np.quantile(
draw,
tailQuantile,
method="interpolated_inverted_cdf",
)
)
thresholdViews: Dict[str, Dict[str, Any]] = {}
thresholdMetrics: Dict[str, Dict[str, Any]] = {}
nullOccByKey: Dict[str, np.ndarray] = {
_thresholdZKey(z): np.empty(numBootstrap_, dtype=np.float64) for z in zGrid
}
nullSoftByKey: Dict[str, np.ndarray] = {
_thresholdZKey(z): np.empty(numBootstrap_, dtype=np.float64) for z in zGrid
}
coreMeta_ = {} if coreMeta is None else dict(coreMeta)
templateMeta_ = {} if templateMeta is None else dict(templateMeta)
for z in zGrid:
key = _thresholdZKey(z)
tailAlpha = float(stats.norm.sf(float(max(z, 0.0))))
empiricalUpper = float(
np.quantile(
upperTailOffsets[key],
calibrationQuantile_,
method="interpolated_inverted_cdf",
)
)
pooledView = _resolvePooledNullFloorView(pooledNullFloor, z)
pooledThresholdFloor = 0.0
pooledScaleFloor = 0.0
pooledSource = "none"
if pooledView is not None:
pooledThresholdFloor = float(
max(pooledView.get("threshold_offset_floor", 0.0), 0.0)
)
pooledScaleFloor = float(max(pooledView.get("null_scale_floor", 0.0), 0.0))
pooledSource = str(pooledView.get("source", "external_floor"))
z_ = float(max(z, 0.0))
thresholdOffset = float(max(empiricalUpper, pooledThresholdFloor, 0.0))
empiricalScale = (
float(max(float(nullScale), thresholdOffset / float(z), 1.0e-6))
if float(z) > 0.0
else float(max(float(nullScale), thresholdOffset, 1.0e-6))
)
threshold = float(nullCenter + thresholdOffset)
nullScale_ = float(max(empiricalScale, pooledScaleFloor, 1.0e-6))
observedExcess = np.clip(
(scoreTrack_ - threshold) / max(nullScale_, _TINY),
0.0,
None,
)
nullMeta = {
**templateMeta_,
"null_method": "stationary_null_dwb",
"null_calibration_method": "stationary_null_dwb",
"core_null_method": str(coreMeta_.get("null_method", "unknown")),
"core_scale_method": str(coreMeta_.get("scale_method", "unknown")),
"core_null_scale": float(nullScale),
"threshold_offset_local": float(empiricalUpper),
"threshold_offset_floor": float(pooledThresholdFloor),
"threshold_offset": float(thresholdOffset),
"threshold_offset_local_empirical": float(empiricalUpper),
"null_scale_local": float(empiricalScale),
"null_scale_floor": float(pooledScaleFloor),
"null_scale": float(nullScale_),
"pooled_floor_source": str(pooledSource),
"pooled_floor_applied": bool(
(pooledThresholdFloor > empiricalUpper + 1.0e-12)
or (pooledScaleFloor > empiricalScale + 1.0e-12)
),
"threshold": float(threshold),
"threshold_z": float(z_),
"null_center": float(nullCenter),
"tail_method": "stationary_null_dwb",
"tail_alpha": float(tailAlpha),
"bootstrap_quantile": float(calibrationQuantile_),
"num_bootstrap": int(numBootstrap_),
"dependence_span": int(dependenceSpan_),
"dwb_bandwidth": int(dependenceSpan_),
"dwb_bandwidth_lower": int(dependenceSpanDetails["lower"]),
"dwb_bandwidth_upper": int(dependenceSpanDetails["upper"]),
"dwb_bandwidth_method": str(dependenceSpanDetails["method"]),
"dependence_span_lower": int(dependenceSpanDetails["lower"]),
"dependence_span_upper": int(dependenceSpanDetails["upper"]),
"dependence_span_method": str(dependenceSpanDetails["method"]),
"context_span_lower": int(dependenceSpanDetails["lower"]),
"context_span_upper": int(dependenceSpanDetails["upper"]),
"context_span_method": str(dependenceSpanDetails["method"]),
"kernel": str(kernel_),
"dwb_panel_id": str(panelId),
"dwb_panel_reused": True,
"bootstrap_upper_tail_offset": float(empiricalUpper),
"upper_tail_offset_mean": float(np.mean(upperTailOffsets[key])),
"upper_tail_offset_sd": (
float(np.std(upperTailOffsets[key], ddof=1))
if numBootstrap_ > 1
else 0.0
),
"threshold_offset": float(thresholdOffset),
"empirical_null_scale": float(empiricalScale),
}
thresholdViews[key] = {
"threshold_z": float(z_),
"null_center": float(nullCenter),
"null_scale": float(nullScale_),
"threshold": float(threshold),
"null_meta": dict(nullMeta),
"template": np.asarray(template_, dtype=np.float64),
"template_meta": dict(templateMeta_),
}
thresholdMetrics[key] = {
"threshold_z": float(z_),
"threshold": float(threshold),
"null_center": float(nullCenter),
"null_scale": float(nullScale_),
"observed_tail_occupancy": float(np.mean(scoreTrack_ > threshold)),
"observed_soft_tail": float(np.mean(observedExcess)),
"budget_occupancy_raw": 0.0,
"budget_soft_raw": 0.0,
"null_meta": dict(nullMeta),
"template_meta": dict(templateMeta_),
"dwb_panel_id": str(panelId),
}
rng = np.random.default_rng(int(randomSeed))
for b in range(numBootstrap_):
multipliers = _generateDWBMultipliers(
template_.size,
dependenceSpan_,
rng,
kernel=kernel_,
)
draw = np.asarray(template_, dtype=np.float64) * np.asarray(
multipliers, dtype=np.float64
)
draw -= float(np.mean(draw))
for z in zGrid:
key = _thresholdZKey(z)
view = thresholdViews[key]
thresholdOffset = float(view["threshold"]) - float(nullCenter)
nullScale_ = float(view["null_scale"])
nullOccByKey[key][b] = float(np.mean(draw > thresholdOffset))
nullSoftByKey[key][b] = float(
np.mean(
np.clip(
(draw - thresholdOffset) / max(nullScale_, _TINY),
0.0,
None,
)
)
)
for z in zGrid:
key = _thresholdZKey(z)
metrics = thresholdMetrics[key]
nullOcc = np.asarray(nullOccByKey[key], dtype=np.float64)
nullSoft = np.asarray(nullSoftByKey[key], dtype=np.float64)
nullOccCal = float(
np.quantile(
nullOcc, calibrationQuantile_, method="interpolated_inverted_cdf"
)
)
nullSoftCal = float(
np.quantile(
nullSoft, calibrationQuantile_, method="interpolated_inverted_cdf"
)
)
metrics.update(
{
"null_tail_occupancy": float(np.mean(nullOcc)),
"null_tail_occupancy_calibrated": float(nullOccCal),
"null_tail_occupancy_sd": (
float(np.std(nullOcc, ddof=1)) if numBootstrap_ > 1 else 0.0
),
"null_soft_tail": float(np.mean(nullSoft)),
"null_soft_tail_calibrated": float(nullSoftCal),
"null_soft_tail_sd": (
float(np.std(nullSoft, ddof=1)) if numBootstrap_ > 1 else 0.0
),
"budget_occupancy_raw": float(
np.clip(
float(metrics["observed_tail_occupancy"]) - nullOccCal, 0.0, 1.0
)
),
"budget_soft_raw": float(
np.clip(
float(metrics["observed_soft_tail"]) - nullSoftCal, 0.0, 1.0
)
),
"null_quantile": float(calibrationQuantile_),
}
)
return {
"method": "stationary_null_dwb",
"null_calibration_method": "stationary_null_dwb",
"kernel": str(kernel_),
"num_bootstrap": int(numBootstrap_),
"null_quantile": float(calibrationQuantile_),
"dependence_span": int(dependenceSpan_),
"dwb_bandwidth": int(dependenceSpan_),
"dwb_bandwidth_lower": int(dependenceSpanDetails["lower"]),
"dwb_bandwidth_upper": int(dependenceSpanDetails["upper"]),
"dwb_bandwidth_method": str(dependenceSpanDetails["method"]),
"dependence_span_lower": int(dependenceSpanDetails["lower"]),
"dependence_span_upper": int(dependenceSpanDetails["upper"]),
"dependence_span_method": str(dependenceSpanDetails["method"]),
"context_span_lower": int(dependenceSpanDetails["lower"]),
"context_span_upper": int(dependenceSpanDetails["upper"]),
"context_span_method": str(dependenceSpanDetails["method"]),
"dwb_panel_id": str(panelId),
"dwb_panel_reused": True,
"threshold_views": thresholdViews,
"threshold_metrics": thresholdMetrics,
"budget_z_grid": [float(z) for z in zGrid],
"primary_key": _thresholdZKey(float(thresholdZ)),
}
def _estimateEmpiricalMirroredNullForROCCO(
scoreTrack: np.ndarray,
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
bulkQuantile: float = 0.60,
pooledNullFloor: Dict[str, Any] | None = None,
) -> Tuple[float, float, float, Dict[str, Any]]:
nullCenter, coreNullScale, coreMeta = estimateROCCONull(
scoreTrack,
bulkQuantile=bulkQuantile,
)
template, templateMeta = _prepareNullResidualTemplate(
scoreTrack,
nullCenter,
coreNullScale,
bulkQuantile=bulkQuantile,
)
calibration = _calibrateStationaryNullDWB(
scoreTrack,
template,
nullCenter,
coreNullScale,
thresholdZ=thresholdZ,
thresholdZGrid=[thresholdZ],
randomSeed=0,
calibrationQuantile=_ROCCO_NULL_QUANTILE,
pooledNullFloor=pooledNullFloor,
templateMeta=templateMeta,
coreMeta=coreMeta,
)
key = _thresholdZKey(float(thresholdZ))
view = dict(calibration["threshold_views"][key])
metrics = dict(calibration["threshold_metrics"][key])
details: Dict[str, Any] = {
**dict(view["null_meta"]),
**dict(metrics),
}
nullScale = float(view["null_scale"])
threshold = float(view["threshold"])
return nullCenter, nullScale, threshold, details
def _isAutosomeName(chromosome: str) -> bool:
chrom = str(chromosome).strip().lower()
if chrom.startswith("chr"):
chrom = chrom[3:]
if not chrom.isdigit():
return False
chromNum = int(chrom)
return 1 <= chromNum <= 22
def _prepareROCCOBaseScore(
state: npt.ArrayLike,
uncertainty: npt.ArrayLike | None = None,
tau0: float = 1.0,
bulkQuantile: float = 0.60,
) -> Dict[str, Any]:
state_ = _asFloatVector("state", state)
stateNullCenter, stateNullScale, stateNullMeta = estimateROCCONull(
state_,
bulkQuantile=bulkQuantile,
)
scoreTrack, scoreMeta = consenrichStateScoreTrack(
state_,
uncertainty=uncertainty,
returnDetails=True,
)
scoreMeta["tau0"] = float(max(float(tau0), 0.0))
scoreMeta["tau0_used"] = False
return {
"score_track": np.asarray(scoreTrack, dtype=np.float64),
"state_null_center": float(stateNullCenter),
"state_null_scale": float(stateNullScale),
"score_meta": dict(scoreMeta),
"state_null_meta": dict(stateNullMeta),
}
def _estimateAutosomalNullFloorForROCCO(
basePreparedByChrom: Dict[str, Dict[str, Any]],
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
bulkQuantile: float = 0.60,
thresholdZGrid: Iterable[float] | None = None,
) -> Dict[str, Any]:
if len(basePreparedByChrom) == 0:
return {
"source": "none",
"threshold_offset_floor": 0.0,
"null_scale_floor": 0.0,
"chromosome_count": 0,
"tail_support_size": 0,
"threshold_views": {},
"budget_z_grid": [],
}
autosomeChroms = [chrom for chrom in basePreparedByChrom if _isAutosomeName(chrom)]
selectedChroms = (
autosomeChroms if len(autosomeChroms) > 0 else list(basePreparedByChrom)
)
pooledTemplateParts: List[np.ndarray] = []
for chrom in selectedChroms:
scoreTrack = np.asarray(
basePreparedByChrom[chrom]["score_track"], dtype=np.float64
)
nullCenter, coreNullScale, _ = estimateROCCONull(
scoreTrack,
bulkQuantile=bulkQuantile,
)
template, _ = _prepareNullResidualTemplate(
scoreTrack,
nullCenter,
coreNullScale,
bulkQuantile=bulkQuantile,
)
pooledTemplateParts.append(np.asarray(template, dtype=np.float64))
pooledTemplate = np.concatenate(pooledTemplateParts).astype(np.float64, copy=False)
pooledScale = _estimateTemplateScale(pooledTemplate)
zGrid = _resolveThresholdZGrid(
thresholdZ=thresholdZ,
thresholdZGrid=thresholdZGrid,
)
calibration = _calibrateStationaryNullDWB(
pooledTemplate,
pooledTemplate,
0.0,
pooledScale,
thresholdZ=thresholdZ,
thresholdZGrid=zGrid,
randomSeed=0,
calibrationQuantile=_ROCCO_NULL_QUANTILE,
templateMeta={"tail_support_size": int(pooledTemplate.size)},
coreMeta={
"null_method": "pooled_template",
"scale_method": "pooled_template_scale",
},
)
thresholdViews: Dict[str, Dict[str, Any]] = {}
primaryKey = _thresholdZKey(float(thresholdZ))
primaryMeta: Dict[str, Any] | None = None
for z in zGrid:
pooledMeta = dict(
calibration["threshold_views"][_thresholdZKey(z)]["null_meta"]
)
thresholdOffsetFloor = float(pooledMeta["threshold_offset"])
scaleFloor = float(pooledMeta["null_scale"])
meta = {
"threshold_z": float(z),
"threshold_offset_floor": float(thresholdOffsetFloor),
"null_scale_floor": float(scaleFloor),
**pooledMeta,
}
thresholdViews[_thresholdZKey(z)] = meta
if _thresholdZKey(z) == primaryKey:
primaryMeta = meta
if primaryMeta is None:
primaryMeta = thresholdViews[_thresholdZKey(zGrid[0])]
return {
"source": (
"autosomal_pool" if len(autosomeChroms) > 0 else "all_chromosomes_pool"
),
"chromosome_count": int(len(selectedChroms)),
"tail_support_size": int(pooledTemplate.size),
"threshold_offset_floor": float(primaryMeta["threshold_offset_floor"]),
"null_scale_floor": float(primaryMeta["null_scale_floor"]),
"threshold_z": float(primaryMeta["threshold_z"]),
"threshold_views": thresholdViews,
"budget_z_grid": [float(z) for z in zGrid],
"chromosomes": [str(chrom) for chrom in selectedChroms],
**primaryMeta,
}
def _resolvePooledNullFloorView(
pooledNullFloor: Dict[str, Any] | None,
thresholdZ: float,
) -> Dict[str, Any] | None:
if pooledNullFloor is None:
return None
thresholdViews = pooledNullFloor.get("threshold_views")
if isinstance(thresholdViews, dict):
match = thresholdViews.get(_thresholdZKey(thresholdZ))
if isinstance(match, dict):
return dict(match)
return {
"source": str(pooledNullFloor.get("source", "external_floor")),
"threshold_offset_floor": float(
max(pooledNullFloor.get("threshold_offset_floor", 0.0), 0.0)
),
"null_scale_floor": float(
max(pooledNullFloor.get("null_scale_floor", 0.0), 0.0)
),
}
def _resolveRoccoDependenceSpanDetails(
values: np.ndarray,
dependenceSpan: int | None = None,
) -> Dict[str, int | str]:
if dependenceSpan is not None:
span = max(int(dependenceSpan), 2)
return {
"point": int(span),
"lower": int(span),
"upper": int(span),
"method": "fixed",
}
n = int(values.size)
try:
if n >= 100:
positiveVals = np.clip(values, 0.0, None)
contextSize, contextLower, contextUpper = core.getContextSize(
positiveVals,
minSpan=3,
maxSpan=min(64, max(12, n // 8)),
)
point = max(int(contextSize), 2)
lower = max(min(int(contextLower), point), 2)
upper = max(int(contextUpper), point)
return {
"point": int(point),
"lower": int(lower),
"upper": int(upper),
"method": "getContextSize",
}
except Exception as ex:
logger.info("getContextSize fallback for ROCCO budget span: %s", ex)
fallback = max(min(int(round(np.sqrt(n))), 64), 4)
return {
"point": int(fallback),
"lower": int(fallback),
"upper": int(fallback),
"method": "sqrt_fallback",
}
def _prepareNullResidualTemplate(
scoreTrack: np.ndarray,
nullCenter: float,
nullScale: float,
bulkQuantile: float = 0.60,
) -> Tuple[np.ndarray, Dict[str, float]]:
centered = np.asarray(scoreTrack, dtype=np.float64) - float(nullCenter)
support, supportMeta = _selectRobustNullSupport(
np.asarray(scoreTrack, dtype=np.float64),
bulkQuantile=bulkQuantile,
)
bulkVals = np.asarray(support, dtype=np.float64) - float(nullCenter)
if bulkVals.size < 4:
bulkVals = centered
clipAbs = float(
max(
(
np.quantile(
np.abs(bulkVals),
0.95,
method="interpolated_inverted_cdf",
)
if bulkVals.size > 0
else 0.0
),
nullScale,
1.0e-6,
)
)
template = np.clip(centered, -clipAbs, clipAbs).astype(np.float64, copy=False)
template = template - float(np.mean(template))
templateStd = float(np.std(template, ddof=1)) if template.size >= 2 else 0.0
if np.isfinite(templateStd) and templateStd > _TINY:
template = template * (float(nullScale) / templateStd)
else:
template = np.full_like(template, 0.0, dtype=np.float64)
return template, {
"clip_abs": float(clipAbs),
"template_std": float(np.std(template, ddof=1)) if template.size >= 2 else 0.0,
"template_mean": float(np.mean(template)),
"template_support_radius": float(supportMeta["support_radius"]),
"template_support_size": float(supportMeta["support_size"]),
}
def _prepareROCCOScoreAndNull(
state: npt.ArrayLike,
uncertainty: npt.ArrayLike | None = None,
tau0: float = 1.0,
bulkQuantile: float = 0.60,
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
numBootstrap: int = _ROCCO_NUM_BOOTSTRAP_DEFAULT,
dependenceSpan: int | None = None,
kernel: str = "bartlett",
randomSeed: int = 0,
nullQuantile: float = _ROCCO_NULL_QUANTILE,
pooledNullFloor: Dict[str, Any] | None = None,
thresholdZGrid: Iterable[float] | None = None,
) -> Dict[str, Any]:
r"""Prepare the direct Consenrich score track and robust null for ROCCO budgeting."""
prepared = _prepareROCCOBaseScore(
state,
uncertainty=uncertainty,
tau0=tau0,
bulkQuantile=bulkQuantile,
)
scoreTrack = np.asarray(prepared["score_track"], dtype=np.float64)
zGrid = _resolveThresholdZGrid(
thresholdZ=thresholdZ,
thresholdZGrid=thresholdZGrid,
)
nullCenter, coreNullScale, coreMeta = estimateROCCONull(
scoreTrack,
bulkQuantile=bulkQuantile,
)
template, templateMeta = _prepareNullResidualTemplate(
scoreTrack,
nullCenter,
coreNullScale,
bulkQuantile=bulkQuantile,
)
calibration = _calibrateStationaryNullDWB(
scoreTrack,
template,
nullCenter,
coreNullScale,
thresholdZ=thresholdZ,
thresholdZGrid=zGrid,
dependenceSpan=dependenceSpan,
numBootstrap=numBootstrap,
kernel=kernel,
randomSeed=randomSeed,
calibrationQuantile=nullQuantile,
pooledNullFloor=pooledNullFloor,
templateMeta=templateMeta,
coreMeta=coreMeta,
)
thresholdViews = dict(calibration["threshold_views"])
primaryKey = str(calibration["primary_key"])
primaryView = thresholdViews.get(primaryKey)
if primaryView is None:
primaryView = thresholdViews[_thresholdZKey(zGrid[0])]
return {
**prepared,
"score_track": scoreTrack,
"null_center": float(primaryView["null_center"]),
"null_scale": float(primaryView["null_scale"]),
"threshold": float(primaryView["threshold"]),
"null_meta": dict(primaryView["null_meta"]),
"template": np.asarray(primaryView["template"], dtype=np.float64),
"template_meta": dict(primaryView["template_meta"]),
"threshold_views": thresholdViews,
"threshold_metrics": dict(calibration["threshold_metrics"]),
"budget_z_grid": [float(z) for z in zGrid],
"dwb_calibration": {
"method": str(calibration["method"]),
"null_calibration_method": str(calibration["null_calibration_method"]),
"kernel": str(calibration["kernel"]),
"num_bootstrap": int(calibration["num_bootstrap"]),
"null_quantile": float(calibration["null_quantile"]),
"dependence_span": int(calibration["dependence_span"]),
"dwb_bandwidth": int(calibration["dwb_bandwidth"]),
"dwb_bandwidth_lower": int(calibration["dwb_bandwidth_lower"]),
"dwb_bandwidth_upper": int(calibration["dwb_bandwidth_upper"]),
"dwb_bandwidth_method": str(calibration["dwb_bandwidth_method"]),
"dependence_span_lower": int(calibration["dependence_span_lower"]),
"dependence_span_upper": int(calibration["dependence_span_upper"]),
"dependence_span_method": str(calibration["dependence_span_method"]),
"context_span_lower": int(calibration["context_span_lower"]),
"context_span_upper": int(calibration["context_span_upper"]),
"context_span_method": str(calibration["context_span_method"]),
"dwb_panel_id": str(calibration["dwb_panel_id"]),
"dwb_panel_reused": bool(calibration["dwb_panel_reused"]),
"random_seed": int(randomSeed),
"primary_key": str(calibration["primary_key"]),
"threshold_z_grid": [float(z) for z in calibration["budget_z_grid"]],
},
"pooled_null_floor": None if pooledNullFloor is None else dict(pooledNullFloor),
}
def _bartlettKernel(x: np.ndarray) -> np.ndarray:
ax = np.abs(x)
return np.where(ax <= 1.0, 1.0 - ax, 0.0)
def _parzenKernel(x: np.ndarray) -> np.ndarray:
ax = np.abs(x)
out = np.zeros_like(ax, dtype=np.float64)
mask1 = ax <= 0.5
mask2 = (ax > 0.5) & (ax <= 1.0)
out[mask1] = 1.0 - 6.0 * ax[mask1] ** 2 + 6.0 * ax[mask1] ** 3
out[mask2] = 2.0 * (1.0 - ax[mask2]) ** 3
return out
def _quadraticSpectralKernel(x: np.ndarray) -> np.ndarray:
ax = np.abs(x)
out = np.zeros_like(ax, dtype=np.float64)
zeroMask = ax < 1.0e-12
out[zeroMask] = 1.0
nz = ~zeroMask
if np.any(nz):
y = (6.0 * np.pi * ax[nz]) / 5.0
out[nz] = (25.0 / (12.0 * np.pi * np.pi * ax[nz] * ax[nz])) * (
(np.sin(y) / np.maximum(y, 1.0e-12)) - np.cos(y)
)
return out
def _kernelValues(name: str, lags: np.ndarray, bandwidth: int) -> np.ndarray:
kernelName = str(name).strip().lower().replace("-", "_")
x = np.abs(lags.astype(np.float64, copy=False)) / max(float(bandwidth), 1.0)
if kernelName in {"bartlett", "triangle", "triangular"}:
return _bartlettKernel(x)
if kernelName in {"parzen"}:
return _parzenKernel(x)
if kernelName in {"qs", "quadratic_spectral", "quadraticspectral"}:
return _quadraticSpectralKernel(x)
raise ValueError(f"Unknown DWB kernel: {name}")
def _generateDWBMultipliers(
n: int,
bandwidth: int,
rng: np.random.Generator,
kernel: str = "bartlett",
) -> np.ndarray:
bandwidth_ = max(int(bandwidth), 2)
kernelName = str(kernel).strip().lower().replace("-", "_")
maxLag = (
bandwidth_
if kernelName not in {"qs", "quadratic_spectral", "quadraticspectral"}
else max(8 * bandwidth_, 32)
)
lags = np.arange(-maxLag, maxLag + 1, dtype=np.int64)
weights = _kernelValues(kernel, lags, bandwidth_)
weights = np.asarray(weights, dtype=np.float64)
weights /= math.sqrt(max(float(np.sum(weights * weights)), _TINY))
noise = rng.standard_normal(int(n + 2 * maxLag))
multipliers = np.convolve(noise, weights, mode="valid")[:n]
multipliers = np.asarray(multipliers, dtype=np.float64)
multipliers -= float(np.mean(multipliers))
sd = float(np.std(multipliers, ddof=1)) if n >= 2 else 0.0
if not np.isfinite(sd) or sd <= _TINY:
return np.ones(n, dtype=np.float64)
return multipliers / sd
def _estimateEffectiveSampleSize(
values: np.ndarray,
maxLag: int,
) -> Tuple[float, float, int]:
x = np.asarray(values, dtype=np.float64)
n = int(x.size)
if n < 2:
return float(n), 1.0, 0
x = x - float(np.mean(x))
var = float(np.dot(x, x) / max(n, 1))
if not np.isfinite(var) or var <= _TINY:
return float(n), 1.0, 0
maxLag_ = max(1, min(int(maxLag), n - 1))
tau = 1.0
lagsUsed = 0
for lag in range(1, maxLag_ + 1):
cov = float(np.dot(x[:-lag], x[lag:]) / max(n - lag, 1))
rho = cov / var
if (not np.isfinite(rho)) or rho <= 0.0:
break
tau += 2.0 * rho
lagsUsed = lag
tau = float(max(tau, 1.0))
return float(n / tau), tau, lagsUsed
def _preparedStationaryNullDWBMatches(
prepared: Dict[str, Any],
*,
scoreTrack: np.ndarray,
numBootstrap: int,
dependenceSpan: int | None,
kernel: str,
thresholdZ: float,
thresholdZGrid: Iterable[float] | None,
randomSeed: int,
nullQuantile: float,
) -> bool:
calibration = prepared.get("dwb_calibration")
if not isinstance(calibration, dict):
return False
expectedGrid = _resolveThresholdZGrid(
thresholdZ=thresholdZ,
thresholdZGrid=(
prepared.get("budget_z_grid") if thresholdZGrid is None else thresholdZGrid
),
)
expectedSpan = _resolveRoccoDependenceSpanDetails(
scoreTrack,
dependenceSpan=dependenceSpan,
)
return (
str(calibration.get("method")) == "stationary_null_dwb"
and int(calibration.get("num_bootstrap", -1)) == max(int(numBootstrap), 8)
and int(calibration.get("dependence_span", -1)) == int(expectedSpan["point"])
and str(calibration.get("kernel", "")).strip().lower().replace("-", "_")
== str(kernel).strip().lower().replace("-", "_")
and int(calibration.get("random_seed", -1)) == int(randomSeed)
and float(calibration.get("null_quantile", -1.0))
== float(np.clip(nullQuantile, 0.50, 0.999))
and str(calibration.get("primary_key", ""))
== _thresholdZKey(float(max(thresholdZ, 0.0)))
and list(calibration.get("threshold_z_grid", []))
== [float(z) for z in expectedGrid]
)
def _estimateBudgetForPreparedROCCOScore(
prepared: Dict[str, Any],
statistic: str = "integrated",
numBootstrap: int = _ROCCO_NUM_BOOTSTRAP_DEFAULT,
dependenceSpan: int | None = None,
kernel: str = "bartlett",
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
bulkQuantile: float = 0.60,
randomSeed: int = 0,
tau0: float = 1.0,
nullQuantile: float = _ROCCO_NULL_QUANTILE,
budgetMin: float = _ROCCO_BUDGET_MIN,
budgetMax: float = _ROCCO_BUDGET_MAX,
thresholdZGrid: Iterable[float] | None = None,
returnDetails: bool = False,
) -> float | Tuple[float, Dict[str, Any]]:
r"""Estimate a ROCCO budget from a shared stationary-null DWB calibration."""
scoreTrack = np.asarray(prepared["score_track"], dtype=np.float64)
scoreMeta = dict(prepared["score_meta"])
stateNullMeta = dict(prepared["state_null_meta"])
zGrid = _resolveThresholdZGrid(
thresholdZ=thresholdZ,
thresholdZGrid=(
prepared.get("budget_z_grid") if thresholdZGrid is None else thresholdZGrid
),
)
numBootstrap_ = max(int(numBootstrap), 16)
nullQuantile_ = float(np.clip(nullQuantile, 0.50, 0.999))
budgetMin_ = float(max(budgetMin, 0.0))
budgetMax_ = float(max(budgetMax, budgetMin_))
primaryKey = _thresholdZKey(float(max(thresholdZ, 0.0)))
statistic_ = str(statistic).strip().lower().replace("-", "_")
if statistic_ in {
"integrated",
"integrated_excess",
"integrated_excess_tail",
"excess",
}:
selectedStatistic = "integrated_excess_tail"
elif statistic_ in {
"occupancy",
"tail_fraction",
"fraction",
"tail",
"tail_occupancy",
}:
selectedStatistic = "tail_occupancy"
elif statistic_ in {"soft", "soft_occupancy", "soft_tail"}:
selectedStatistic = "soft"
else:
raise ValueError(f"Unknown budget statistic: {statistic}")
if _preparedStationaryNullDWBMatches(
prepared,
scoreTrack=scoreTrack,
numBootstrap=numBootstrap,
dependenceSpan=dependenceSpan,
kernel=kernel,
thresholdZ=thresholdZ,
thresholdZGrid=thresholdZGrid,
randomSeed=randomSeed,
nullQuantile=nullQuantile,
):
thresholdViews = dict(prepared.get("threshold_views", {}))
thresholdMetrics = dict(prepared.get("threshold_metrics", {}))
calibrationMeta = dict(prepared.get("dwb_calibration", {}))
else:
calibration = _calibrateStationaryNullDWB(
scoreTrack,
np.asarray(prepared["template"], dtype=np.float64),
float(prepared["null_center"]),
float(prepared["null_meta"].get("core_null_scale", prepared["null_scale"])),
thresholdZ=thresholdZ,
thresholdZGrid=zGrid,
dependenceSpan=dependenceSpan,
numBootstrap=numBootstrap,
kernel=kernel,
randomSeed=randomSeed,
calibrationQuantile=nullQuantile,
pooledNullFloor=prepared.get("pooled_null_floor"),
templateMeta=dict(prepared.get("template_meta", {})),
coreMeta=dict(prepared.get("state_null_meta", {})),
)
thresholdViews = dict(calibration["threshold_views"])
thresholdMetrics = dict(calibration["threshold_metrics"])
calibrationMeta = {
"method": str(calibration["method"]),
"null_calibration_method": str(calibration["null_calibration_method"]),
"kernel": str(calibration["kernel"]),
"num_bootstrap": int(calibration["num_bootstrap"]),
"null_quantile": float(calibration["null_quantile"]),
"dependence_span": int(calibration["dependence_span"]),
"dwb_bandwidth": int(calibration["dwb_bandwidth"]),
"dwb_bandwidth_lower": int(calibration["dwb_bandwidth_lower"]),
"dwb_bandwidth_upper": int(calibration["dwb_bandwidth_upper"]),
"dwb_bandwidth_method": str(calibration["dwb_bandwidth_method"]),
"dependence_span_lower": int(calibration["dependence_span_lower"]),
"dependence_span_upper": int(calibration["dependence_span_upper"]),
"dependence_span_method": str(calibration["dependence_span_method"]),
"context_span_lower": int(calibration["context_span_lower"]),
"context_span_upper": int(calibration["context_span_upper"]),
"context_span_method": str(calibration["context_span_method"]),
"dwb_panel_id": str(calibration["dwb_panel_id"]),
"dwb_panel_reused": bool(calibration["dwb_panel_reused"]),
"random_seed": int(randomSeed),
"primary_key": str(calibration["primary_key"]),
"threshold_z_grid": [float(z) for z in calibration["budget_z_grid"]],
}
primaryView = thresholdViews.get(primaryKey)
if primaryView is None:
primaryView = thresholdViews[_thresholdZKey(zGrid[0])]
primaryKey = _thresholdZKey(zGrid[0])
primaryMetrics = dict(thresholdMetrics[primaryKey])
integratedSeriesParts: List[np.ndarray] = []
for z in zGrid:
view = thresholdViews[_thresholdZKey(z)]
integratedSeriesParts.append(
np.clip(
(scoreTrack - float(view["threshold"]))
/ max(float(view["null_scale"]), _TINY),
0.0,
None,
)
)
budgetIntegratedRaw = float(
np.mean(
[float(metrics["budget_soft_raw"]) for metrics in thresholdMetrics.values()]
)
)
primaryBudgetOccupancyRaw = float(primaryMetrics["budget_occupancy_raw"])
primaryBudgetSoftRaw = float(primaryMetrics["budget_soft_raw"])
budgetOccupancy = float(np.clip(primaryBudgetOccupancyRaw, budgetMin_, budgetMax_))
budgetSoft = float(np.clip(primaryBudgetSoftRaw, budgetMin_, budgetMax_))
budgetIntegrated = float(np.clip(budgetIntegratedRaw, budgetMin_, budgetMax_))
if selectedStatistic == "tail_occupancy":
budget = budgetOccupancy
budgetRaw = primaryBudgetOccupancyRaw
essSeries = (scoreTrack > float(primaryView["threshold"])).astype(np.float64)
elif selectedStatistic == "soft":
budget = budgetSoft
budgetRaw = primaryBudgetSoftRaw
essSeries = np.clip(
(scoreTrack - float(primaryView["threshold"]))
/ max(float(primaryView["null_scale"]), _TINY),
0.0,
None,
)
else:
budget = budgetIntegrated
budgetRaw = budgetIntegratedRaw
essSeries = (
np.mean(np.vstack(integratedSeriesParts), axis=0)
if len(integratedSeriesParts) > 0
else np.zeros_like(scoreTrack, dtype=np.float64)
)
effectiveTotalCount, tauInt, essLagsUsed = _estimateEffectiveSampleSize(
essSeries,
maxLag=max(
4 * int(calibrationMeta["dependence_span"]),
int(calibrationMeta["dependence_span"]),
),
)
if not returnDetails:
return budget
budgetModel = {
"integrated_excess_tail": "dwb_integrated_excess_tail",
"tail_occupancy": "dwb_tail_occupancy",
"soft": "dwb_soft_tail",
}[selectedStatistic]
details: Dict[str, Any] = {
"method": "stationary_null_dwb",
"null_calibration_method": "stationary_null_dwb",
"statistic": str(selectedStatistic),
"kernel": str(calibrationMeta["kernel"]),
"num_bootstrap": int(calibrationMeta["num_bootstrap"]),
"null_quantile": float(nullQuantile_),
"dependence_span": int(calibrationMeta["dependence_span"]),
"dwb_bandwidth": int(calibrationMeta["dwb_bandwidth"]),
"dwb_bandwidth_lower": int(calibrationMeta["dwb_bandwidth_lower"]),
"dwb_bandwidth_upper": int(calibrationMeta["dwb_bandwidth_upper"]),
"dwb_bandwidth_method": str(calibrationMeta["dwb_bandwidth_method"]),
"dependence_span_lower": int(calibrationMeta["dependence_span_lower"]),
"dependence_span_upper": int(calibrationMeta["dependence_span_upper"]),
"dependence_span_method": str(calibrationMeta["dependence_span_method"]),
"context_span_lower": int(calibrationMeta["context_span_lower"]),
"context_span_upper": int(calibrationMeta["context_span_upper"]),
"context_span_method": str(calibrationMeta["context_span_method"]),
"dwb_panel_id": str(calibrationMeta["dwb_panel_id"]),
"dwb_panel_reused": bool(calibrationMeta["dwb_panel_reused"]),
"tau0": float(max(tau0, 0.0)),
"threshold": float(primaryView["threshold"]),
"threshold_z": float(thresholdZ),
"threshold_z_grid": [float(z) for z in zGrid],
"bulk_quantile": float(bulkQuantile),
"budget_model": str(budgetModel),
"null_center": float(primaryView["null_center"]),
"null_scale": float(primaryView["null_scale"]),
"state_null_center": float(prepared["state_null_center"]),
"state_null_scale": float(prepared["state_null_scale"]),
"pooled_null_floor_source": str(
prepared.get("pooled_null_floor", {}).get("source", "none")
if prepared.get("pooled_null_floor") is not None
else "none"
),
"observed_tail_occupancy": float(primaryMetrics["observed_tail_occupancy"]),
"null_tail_occupancy": float(primaryMetrics["null_tail_occupancy"]),
"null_tail_occupancy_calibrated": float(
primaryMetrics["null_tail_occupancy_calibrated"]
),
"null_tail_occupancy_sd": float(primaryMetrics["null_tail_occupancy_sd"]),
"observed_soft_tail": float(primaryMetrics["observed_soft_tail"]),
"null_soft_tail": float(primaryMetrics["null_soft_tail"]),
"null_soft_tail_calibrated": float(primaryMetrics["null_soft_tail_calibrated"]),
"null_soft_tail_sd": float(primaryMetrics["null_soft_tail_sd"]),
"budget_min": float(budgetMin_),
"budget_max": float(budgetMax_),
"budget_raw": float(budgetRaw),
"budget_clipped": bool(abs(budget - budgetRaw) > 1.0e-12),
"budget_occupancy_raw": float(primaryBudgetOccupancyRaw),
"budget_soft_raw": float(primaryBudgetSoftRaw),
"budget_integrated_raw": float(budgetIntegratedRaw),
"budget_occupancy": float(budgetOccupancy),
"budget_soft": float(budgetSoft),
"budget_integrated": float(budgetIntegrated),
"effective_count": float(budget * effectiveTotalCount),
"effective_total_count": float(effectiveTotalCount),
"autocorrelation_time": float(tauInt),
"ess_lags_used": int(essLagsUsed),
"threshold_metrics": thresholdMetrics,
}
details.update(scoreMeta)
details.update(stateNullMeta)
details.update(dict(primaryView["null_meta"]))
details.update(dict(primaryView["template_meta"]))
return budget, details
[docs]
def getROCCOBudget(
state: npt.ArrayLike,
uncertainty: npt.ArrayLike | None = None,
tau0: float = 1.0,
statistic: str = "integrated",
numBootstrap: int = _ROCCO_NUM_BOOTSTRAP_DEFAULT,
dependenceSpan: int | None = None,
kernel: str = "bartlett",
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
bulkQuantile: float = 0.60,
randomSeed: int = 0,
nullQuantile: float = _ROCCO_NULL_QUANTILE,
pooledNullFloor: Dict[str, Any] | None = None,
returnDetails: bool = False,
) -> float | Tuple[float, Dict[str, Any]]:
r"""Estimate a chromosome 'budget' from the fitted Consenrich state."""
prepared = _prepareROCCOScoreAndNull(
state,
uncertainty=uncertainty,
tau0=tau0,
bulkQuantile=bulkQuantile,
thresholdZ=thresholdZ,
numBootstrap=numBootstrap,
dependenceSpan=dependenceSpan,
kernel=kernel,
randomSeed=randomSeed,
nullQuantile=nullQuantile,
pooledNullFloor=pooledNullFloor,
)
result = _estimateBudgetForPreparedROCCOScore(
prepared,
statistic=statistic,
numBootstrap=numBootstrap,
dependenceSpan=dependenceSpan,
kernel=kernel,
thresholdZ=thresholdZ,
bulkQuantile=bulkQuantile,
randomSeed=randomSeed,
tau0=tau0,
nullQuantile=nullQuantile,
returnDetails=returnDetails,
)
return result
def shrinkROCCOBudgets(
effectiveCounts: Dict[str, float],
effectiveTotals: Dict[str, float],
posteriorQuantile: float | None = None,
minPriorConcentration: float = 8.0,
minBudget: float = 0.0,
maxBudget: float = 0.5,
) -> Tuple[Dict[str, float], Dict[str, float]]:
r"""Apply a simple beta-binomial EB shrinkage to chromosome-wise budget estimates."""
chroms = sorted(set(effectiveCounts) & set(effectiveTotals))
if len(chroms) == 0:
raise ValueError("No overlapping chromosome keys found.")
successes = np.asarray(
[max(float(effectiveCounts[c]), 0.0) for c in chroms],
dtype=np.float64,
)
totals = np.asarray(
[max(float(effectiveTotals[c]), 1.0) for c in chroms],
dtype=np.float64,
)
successes = np.minimum(successes, totals)
rawBudgets = successes / np.maximum(totals, 1.0)
pooledRate = float(np.sum(successes) / np.sum(totals))
if pooledRate <= 1.0e-12 and float(np.sum(successes)) <= 1.0e-12:
shrunkZero = {chrom: 0.0 for chrom in chroms}
return shrunkZero, {
"genome_wide_budget": 0.0,
"alpha_hat": 0.0,
"beta_hat": 1.0,
"prior_concentration": float(max(minPriorConcentration, 2.0)),
"min_prior_concentration": float(max(minPriorConcentration, 2.0)),
"posterior_estimator": "degenerate_zero",
"posterior_quantile": None,
"min_budget": float(max(minBudget, 0.0)),
"max_budget": float(max(maxBudget, max(minBudget, 0.0))),
}
if len(chroms) == 1:
shrunkSingle = {
chroms[0]: float(
np.clip(
rawBudgets[0],
max(minBudget, 0.0),
max(maxBudget, max(minBudget, 0.0)),
)
)
}
return shrunkSingle, {
"genome_wide_budget": float(pooledRate),
"alpha_hat": float(max(pooledRate, 1.0e-6)),
"beta_hat": float(max(1.0 - pooledRate, 1.0e-6)),
"prior_concentration": 0.0,
"prior_concentration_raw": 0.0,
"prior_concentration_cap": 0.0,
"prior_concentration_capped": False,
"min_prior_concentration": float(max(minPriorConcentration, 2.0)),
"posterior_estimator": "none_single_chromosome",
"posterior_quantile": (
None
if posteriorQuantile is None
else float(np.clip(posteriorQuantile, 1.0e-4, 0.9999))
),
"min_budget": float(max(minBudget, 0.0)),
"max_budget": float(max(maxBudget, max(minBudget, 0.0))),
}
else:
observedVar = float(np.var(rawBudgets, ddof=1))
theoreticalMinVar = float(np.mean(pooledRate * (1.0 - pooledRate) / totals))
excessVar = max(observedVar - theoreticalMinVar, 1.0e-8)
concentrationRaw = max(
(pooledRate * (1.0 - pooledRate) / excessVar) - 1.0,
float(max(minPriorConcentration, 2.0)),
)
concentrationCap = float(
max(
float(max(minPriorConcentration, 2.0)),
float(np.median(np.sqrt(np.maximum(totals, 1.0)))),
)
)
concentration = min(concentrationRaw, concentrationCap)
alphaHat = max(pooledRate * concentration, 1.0e-3)
betaHat = max((1.0 - pooledRate) * concentration, 1.0e-3)
q: float | None
if posteriorQuantile is None:
q = None
else:
q = float(np.clip(posteriorQuantile, 1.0e-4, 0.9999))
minBudget_ = float(max(minBudget, 0.0))
maxBudget_ = float(max(maxBudget, minBudget_))
shrunk: Dict[str, float] = {}
for idx, chrom in enumerate(chroms):
if q is None:
posterior = float(
(successes[idx] + alphaHat) / max(totals[idx] + alphaHat + betaHat, 1.0)
)
else:
posterior = float(
stats.beta.ppf(
q,
successes[idx] + alphaHat,
max(totals[idx] - successes[idx], 0.0) + betaHat,
)
)
if not np.isfinite(posterior):
posterior = pooledRate
shrunk[chrom] = float(np.clip(posterior, minBudget_, maxBudget_))
meta = {
"genome_wide_budget": float(pooledRate),
"alpha_hat": float(alphaHat),
"beta_hat": float(betaHat),
"prior_concentration": float(concentration),
"prior_concentration_raw": float(concentrationRaw if len(chroms) > 1 else 0.0),
"prior_concentration_cap": float(concentrationCap if len(chroms) > 1 else 0.0),
"prior_concentration_capped": bool(
len(chroms) > 1 and concentration < concentrationRaw - 1.0e-12
),
"min_prior_concentration": float(max(minPriorConcentration, 2.0)),
"posterior_estimator": "mean" if q is None else "quantile",
"posterior_quantile": None if q is None else float(q),
"min_budget": float(minBudget_),
"max_budget": float(maxBudget_),
}
return shrunk, meta
def estimateROCCOGamma(
scoreTrack: npt.ArrayLike,
dependenceSpan: int | None = None,
gammaSpan: int | None = None,
gamma: float | None = 0.5,
gammaScale: float = 0.5,
clipMin: float = 0.5,
clipMax: float | None = 50.0,
nullCenter: float | None = None,
threshold: float | None = None,
returnDetails: bool = False,
) -> float | Tuple[float, Dict[str, float | str]]:
r"""Estimate a constant ROCCO boundary penalty from score scale and context size."""
if gamma is None:
gamma_ = 0.5
if not returnDetails:
return gamma_
return gamma_, {"method": "fixed", "gamma": float(gamma_)}
gamma_ = float(gamma)
if not np.isfinite(gamma_):
raise ValueError("`gamma` must be finite")
if gamma_ >= 0.0:
if not returnDetails:
return gamma_
return gamma_, {"method": "fixed", "gamma": float(gamma_)}
scores = _asFloatVector("scoreTrack", scoreTrack)
dependenceSpanDetails = _resolveRoccoDependenceSpanDetails(
scores,
dependenceSpan=dependenceSpan,
)
dependenceSpan_ = int(dependenceSpanDetails["point"])
gammaSpan_ = (
max(int(gammaSpan), 2)
if gammaSpan is not None
else int(dependenceSpanDetails["lower"])
)
referenceLevel = 0.0
referenceMethod = "zero"
if threshold is not None and np.isfinite(float(threshold)):
referenceLevel = float(threshold)
referenceMethod = "threshold"
elif nullCenter is not None and np.isfinite(float(nullCenter)):
referenceLevel = float(nullCenter)
referenceMethod = "null_center"
positiveScores = scores - referenceLevel
positiveScores = positiveScores[positiveScores > 0.0]
positiveScale = (
float(
np.median(
positiveScores,
)
)
if positiveScores.size > 0
else 1.0
)
gammaRaw = float(max(float(gammaScale), 0.0) * float(gammaSpan_) * positiveScale)
gamma_ = float(max(gammaRaw, float(max(clipMin, 0.0))))
if clipMax is not None:
gamma_ = float(min(gamma_, float(max(clipMax, clipMin))))
if not returnDetails:
return gamma_
details = {
"method": "dependence_span_lower_score_scale",
"dependence_span": int(dependenceSpan_),
"gamma_span": int(gammaSpan_),
"dependence_span_lower": int(dependenceSpanDetails["lower"]),
"dependence_span_upper": int(dependenceSpanDetails["upper"]),
"dependence_span_method": str(dependenceSpanDetails["method"]),
"context_span_lower": int(dependenceSpanDetails["lower"]),
"context_span_upper": int(dependenceSpanDetails["upper"]),
"context_span_method": str(dependenceSpanDetails["method"]),
"reference_method": str(referenceMethod),
"reference_level": float(referenceLevel),
"positive_score_median": float(positiveScale),
"gamma_scale": float(gammaScale),
"gamma_raw": float(gammaRaw),
"gamma": float(gamma_),
}
if clipMax is not None:
details["gamma_clip_max"] = float(max(clipMax, clipMin))
details["gamma_clip_min"] = float(max(clipMin, 0.0))
return gamma_, details
def solveChromROCCO(
scores: npt.ArrayLike,
budget: float | None = None,
gamma: float = 0.5,
selectionPenalty: float | None = None,
maxIter: int = _ROCCO_MAX_ITER_DEFAULT,
returnDetails: bool = False,
) -> Tuple[np.ndarray, float] | Tuple[np.ndarray, float, Dict[str, Any]]:
scores_ = _asFloatVector("scores", scores)
gamma_ = float(gamma)
if not np.isfinite(gamma_) or gamma_ < 0.0:
raise ValueError("`gamma` must be finite and non-negative")
solution, objective, penalizedObjective, selectedCount, selectionPenalty_ = (
cconsenrich.csolveChromROCCOExact(
scores_,
budget=budget,
gamma=gamma_,
selectionPenalty=selectionPenalty,
maxIter=int(maxIter),
)
)
budgetFallbackUsed = False
budgetTargetCount: int | None = None
if budget is not None and selectionPenalty is None:
budget_ = float(budget)
if np.isfinite(budget_):
budgetTargetCount = int(
min(max(math.floor(scores_.size * budget_), 0), scores_.size)
)
if (
int(selectedCount) == 0
and budgetTargetCount is not None
and budgetTargetCount > 0
):
fallbackMask, fallbackUsed = _bestContiguousBudgetFallbackMask(
scores_,
budgetTargetCount,
0.0,
gamma_,
maxRelativeRange=0.05,
)
if fallbackUsed:
solution = fallbackMask.astype(np.uint8)
selectedCount = int(np.sum(solution))
objective = _roccoObjectiveForSolution(
scores_,
np.asarray(solution, dtype=np.uint8),
gamma_,
)
penalizedObjective = float(objective) - float(
selectionPenalty_
) * float(selectedCount)
budgetFallbackUsed = True
if not returnDetails:
return np.asarray(solution, dtype=np.uint8), float(objective)
details = {
"penalized_objective": float(penalizedObjective),
"selected_count": int(selectedCount),
"selected_fraction": float(selectedCount / max(scores_.size, 1)),
"selection_penalty": float(selectionPenalty_),
"gamma": float(gamma_),
"max_iter": int(maxIter),
"budget_target_count": budgetTargetCount,
"budget_fallback_window": bool(budgetFallbackUsed),
}
return np.asarray(solution, dtype=np.uint8), float(objective), details
def _selectedRunBounds(mask: np.ndarray) -> List[Tuple[int, int]]:
mask_ = np.asarray(mask, dtype=bool)
runs: List[Tuple[int, int]] = []
n = int(mask_.size)
i = 0
while i < n:
if not bool(mask_[i]):
i += 1
continue
start = i
while i + 1 < n and bool(mask_[i + 1]):
i += 1
runs.append((int(start), int(i)))
i += 1
return runs
def _maskJaccard(a: np.ndarray, b: np.ndarray) -> float:
a_ = np.asarray(a, dtype=bool)
b_ = np.asarray(b, dtype=bool)
union = int(np.sum(a_ | b_))
if union == 0:
return 1.0
return float(np.sum(a_ & b_) / union)
def _roccoObjectiveForSolution(
scores: np.ndarray,
solution: np.ndarray,
gamma: float,
) -> float:
scores_ = np.asarray(scores, dtype=np.float64)
solution_ = np.asarray(solution, dtype=np.uint8)
if scores_.size != solution_.size:
raise ValueError("`scores` and `solution` must have the same length")
objective = float(np.sum(scores_[solution_ > 0]))
if solution_.size > 1:
objective -= float(max(float(gamma), 0.0)) * float(
np.sum(np.diff(solution_) != 0)
)
return objective
def _selectedRunLengthBP(
start: int,
end: int,
intervals: np.ndarray | None,
ends: np.ndarray | None,
) -> int:
if intervals is None or ends is None:
return int(end - start + 1)
return int(max(int(ends[end]) - int(intervals[start]), 0))
def _minimumChildBinsForRegion(
start: int,
end: int,
intervals: np.ndarray | None,
ends: np.ndarray | None,
minRegionBP: int | None,
minRegionBins: int,
) -> int:
regionBins = int(max(int(end) - int(start) + 1, 1))
minBins = int(max(int(minRegionBins), 1))
if minRegionBP is not None and intervals is not None and ends is not None:
widths = np.asarray(
ends[start : end + 1] - intervals[start : end + 1],
dtype=np.int64,
)
widths = widths[widths > 0]
if widths.size > 0:
stepBP = int(max(int(np.median(widths)), 1))
minBins = int(max(1, math.ceil(float(minRegionBP) / float(stepBP))))
return int(min(regionBins, max(minBins, 1)))
def _nestedBudgetTargetCount(
regionBins: int,
budgetScale: float,
minChildBins: int,
) -> int:
regionBins_ = int(max(int(regionBins), 1))
scaled = int(math.floor(float(regionBins_) * float(np.clip(budgetScale, 0.0, 1.0))))
return int(min(regionBins_, max(int(minChildBins), scaled, 1)))
def _positiveScoreScale(scores: np.ndarray) -> float:
scores_ = np.asarray(scores, dtype=np.float64)
positive = scores_[scores_ > 0.0]
if positive.size > 0:
scale = float(np.median(positive))
else:
scale = float(np.median(np.abs(scores_)))
if (not np.isfinite(scale)) or scale <= 0.0:
scale = 0.0
return scale
def _nestedSoftSelectionPenalty(
scores: np.ndarray,
selectionPenalty: float,
budgetScale: float,
) -> Tuple[float, Dict[str, float]]:
budgetScale_ = float(np.clip(float(budgetScale), 0.0, 1.0))
basePenalty = float(max(float(selectionPenalty), 0.0))
positiveScale = _positiveScoreScale(scores)
positive = np.asarray(scores, dtype=np.float64)
positive = positive[positive > 0.0]
positiveSpread = 0.0
if positive.size > 1:
positiveSpread = float(
np.quantile(positive, 0.75) - np.quantile(positive, 0.25)
)
if (not np.isfinite(positiveSpread)) or positiveSpread < 0.0:
positiveSpread = 0.0
extraPenalty = float((1.0 - budgetScale_) * positiveSpread)
penalty = float(basePenalty + extraPenalty)
return penalty, {
"base_penalty": float(basePenalty),
"extra_penalty": float(extraPenalty),
"positive_score_scale": float(positiveScale),
"positive_score_spread": float(positiveSpread),
"budget_scale": float(budgetScale_),
}
def _solveMinRunPenalizedChainROCCO(
scores: np.ndarray,
gamma: float,
selectionPenalty: float,
minRunBins: int,
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
scores_ = np.asarray(scores, dtype=np.float64)
if scores_.ndim != 1 or scores_.size == 0:
raise ValueError("`scores` must be a non-empty one-dimensional array")
if not np.all(np.isfinite(scores_)):
raise ValueError("`scores` contains non-finite values")
gamma_ = float(max(float(gamma), 0.0))
penalty_ = float(selectionPenalty)
if not np.isfinite(penalty_):
raise ValueError("`selectionPenalty` must be finite")
n = int(scores_.size)
minRunBins_ = int(min(max(int(minRunBins), 1), n))
numStates = int(minRunBins_ + 1)
negInf = -math.inf
eps = 1.0e-12
largeCount = n + 1
prevValues = np.full(numStates, negInf, dtype=np.float64)
prevCounts = np.full(numStates, largeCount, dtype=np.int64)
prevValues[0] = 0.0
prevCounts[0] = 0
back = np.full((n, numStates), -1, dtype=np.int16)
def _better(
value: float,
count: int,
bestValue: float,
bestCount: int,
) -> bool:
if value > bestValue + eps:
return True
if abs(value - bestValue) <= eps and count < bestCount:
return True
return False
for i in range(n):
adjustedScore = float(scores_[i] - penalty_)
newValues = np.full(numStates, negInf, dtype=np.float64)
newCounts = np.full(numStates, largeCount, dtype=np.int64)
transitionCost = 0.0 if i == 0 else gamma_
if np.isfinite(prevValues[0]):
newValues[0] = prevValues[0]
newCounts[0] = prevCounts[0]
back[i, 0] = 0
candidateValue = prevValues[0] - transitionCost + adjustedScore
candidateCount = int(prevCounts[0] + 1)
if _better(candidateValue, candidateCount, newValues[1], newCounts[1]):
newValues[1] = candidateValue
newCounts[1] = candidateCount
back[i, 1] = 0
if np.isfinite(prevValues[minRunBins_]):
candidateValue = prevValues[minRunBins_] - transitionCost
candidateCount = int(prevCounts[minRunBins_])
if _better(candidateValue, candidateCount, newValues[0], newCounts[0]):
newValues[0] = candidateValue
newCounts[0] = candidateCount
back[i, 0] = minRunBins_
for state in range(1, minRunBins_):
if not np.isfinite(prevValues[state]):
continue
nextState = int(state + 1)
candidateValue = prevValues[state] + adjustedScore
candidateCount = int(prevCounts[state] + 1)
if _better(
candidateValue,
candidateCount,
newValues[nextState],
newCounts[nextState],
):
newValues[nextState] = candidateValue
newCounts[nextState] = candidateCount
back[i, nextState] = state
if np.isfinite(prevValues[minRunBins_]):
candidateValue = prevValues[minRunBins_] + adjustedScore
candidateCount = int(prevCounts[minRunBins_] + 1)
if _better(
candidateValue,
candidateCount,
newValues[minRunBins_],
newCounts[minRunBins_],
):
newValues[minRunBins_] = candidateValue
newCounts[minRunBins_] = candidateCount
back[i, minRunBins_] = minRunBins_
prevValues = newValues
prevCounts = newCounts
bestState = 0
bestValue = float(prevValues[0])
bestCount = int(prevCounts[0])
if _better(
float(prevValues[minRunBins_]),
int(prevCounts[minRunBins_]),
bestValue,
bestCount,
):
bestState = int(minRunBins_)
bestValue = float(prevValues[minRunBins_])
bestCount = int(prevCounts[minRunBins_])
mask = np.zeros(n, dtype=bool)
state = int(bestState)
for i in range(n - 1, -1, -1):
if state > 0:
mask[i] = True
prevState = int(back[i, state])
if prevState < 0:
break
state = prevState
solution = mask.astype(np.uint8)
objective = _roccoObjectiveForSolution(scores_, solution, gamma_)
penalizedObjective = float(objective - penalty_ * float(bestCount))
runs = _selectedRunBounds(mask)
return (
mask,
float(objective),
{
"mode": "min_run_penalty",
"penalized_objective": float(penalizedObjective),
"selected_count": int(bestCount),
"selected_fraction": float(bestCount / max(n, 1)),
"selection_penalty": float(penalty_),
"gamma": float(gamma_),
"min_run_bins": int(minRunBins_),
"num_runs": int(len(runs)),
},
)
def _anchorFallbackWindowMask(
scores: np.ndarray,
anchorIndex: int,
minRunBins: int,
) -> np.ndarray:
scores_ = np.asarray(scores, dtype=np.float64)
n = int(scores_.size)
mask = np.zeros(n, dtype=bool)
if n <= 0:
return mask
anchor = int(np.clip(int(anchorIndex), 0, n - 1))
width = int(min(max(int(minRunBins), 1), n))
leftMin = int(max(0, anchor - width + 1))
leftMax = int(min(anchor, n - width))
bestStart = leftMin
bestScore = -math.inf
cumsum = np.concatenate(([0.0], np.cumsum(scores_, dtype=np.float64)))
for start in range(leftMin, leftMax + 1):
score = float(cumsum[start + width] - cumsum[start])
if score > bestScore:
bestScore = score
bestStart = int(start)
mask[bestStart : bestStart + width] = True
return mask
def _solveAnchoredMinRunPenalizedChainROCCO(
scores: np.ndarray,
gamma: float,
selectionPenalty: float,
minRunBins: int,
anchorIndex: int,
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
scores_ = np.asarray(scores, dtype=np.float64)
if scores_.ndim != 1 or scores_.size == 0:
raise ValueError("`scores` must be a non-empty one-dimensional array")
if not np.all(np.isfinite(scores_)):
raise ValueError("`scores` contains non-finite values")
gamma_ = float(max(float(gamma), 0.0))
penalty_ = float(selectionPenalty)
if not np.isfinite(penalty_):
raise ValueError("`selectionPenalty` must be finite")
n = int(scores_.size)
anchor = int(np.clip(int(anchorIndex), 0, n - 1))
minRunBins_ = int(min(max(int(minRunBins), 1), n))
numStates = int(minRunBins_ + 1)
negInf = -math.inf
eps = 1.0e-12
largeCount = n + 1
prevValues = np.full((2, numStates), negInf, dtype=np.float64)
prevCounts = np.full((2, numStates), largeCount, dtype=np.int64)
prevValues[0, 0] = 0.0
prevCounts[0, 0] = 0
backState = np.full((n, 2, numStates), -1, dtype=np.int16)
backSeen = np.full((n, 2, numStates), -1, dtype=np.int8)
def _better(
value: float,
count: int,
bestValue: float,
bestCount: int,
) -> bool:
if value > bestValue + eps:
return True
if abs(value - bestValue) <= eps and count < bestCount:
return True
return False
def _update(
values: np.ndarray,
counts: np.ndarray,
newSeen: int,
newState: int,
value: float,
count: int,
prevSeen: int,
prevState: int,
i: int,
) -> None:
if _better(
float(value),
int(count),
float(values[newSeen, newState]),
int(counts[newSeen, newState]),
):
values[newSeen, newState] = float(value)
counts[newSeen, newState] = int(count)
backSeen[i, newSeen, newState] = int(prevSeen)
backState[i, newSeen, newState] = int(prevState)
for i in range(n):
adjustedScore = float(scores_[i] - penalty_)
newValues = np.full((2, numStates), negInf, dtype=np.float64)
newCounts = np.full((2, numStates), largeCount, dtype=np.int64)
transitionCost = 0.0 if i == 0 else gamma_
forceOn = bool(i == anchor)
for seen in (0, 1):
# Choose x_i = 0. A run may end only after reaching minRunBins_.
if not forceOn:
if np.isfinite(prevValues[seen, 0]):
_update(
newValues,
newCounts,
seen,
0,
float(prevValues[seen, 0]),
int(prevCounts[seen, 0]),
seen,
0,
i,
)
if np.isfinite(prevValues[seen, minRunBins_]):
_update(
newValues,
newCounts,
seen,
0,
float(prevValues[seen, minRunBins_] - transitionCost),
int(prevCounts[seen, minRunBins_]),
seen,
minRunBins_,
i,
)
# Choose x_i = 1.
newSeen = int(seen or forceOn)
if np.isfinite(prevValues[seen, 0]):
_update(
newValues,
newCounts,
newSeen,
1,
float(prevValues[seen, 0] - transitionCost + adjustedScore),
int(prevCounts[seen, 0] + 1),
seen,
0,
i,
)
for state in range(1, minRunBins_):
if not np.isfinite(prevValues[seen, state]):
continue
_update(
newValues,
newCounts,
newSeen,
state + 1,
float(prevValues[seen, state] + adjustedScore),
int(prevCounts[seen, state] + 1),
seen,
state,
i,
)
if np.isfinite(prevValues[seen, minRunBins_]):
_update(
newValues,
newCounts,
newSeen,
minRunBins_,
float(prevValues[seen, minRunBins_] + adjustedScore),
int(prevCounts[seen, minRunBins_] + 1),
seen,
minRunBins_,
i,
)
prevValues = newValues
prevCounts = newCounts
finalCandidates = [
(float(prevValues[1, 0]), int(prevCounts[1, 0]), 1, 0),
(
float(prevValues[1, minRunBins_]),
int(prevCounts[1, minRunBins_]),
1,
minRunBins_,
),
]
bestValue, bestCount, bestSeen, bestState = max(
finalCandidates,
key=lambda item: (item[0], -item[1]),
)
fallbackUsed = False
if not np.isfinite(bestValue):
mask = _anchorFallbackWindowMask(scores_, anchor, minRunBins_)
fallbackUsed = True
else:
mask = np.zeros(n, dtype=bool)
seen = int(bestSeen)
state = int(bestState)
for i in range(n - 1, -1, -1):
if state > 0:
mask[i] = True
prevState = int(backState[i, seen, state])
prevSeen = int(backSeen[i, seen, state])
if prevState < 0 or prevSeen < 0:
break
state = prevState
seen = prevSeen
if not bool(mask[anchor]):
fallbackMask = _anchorFallbackWindowMask(scores_, anchor, minRunBins_)
mask |= fallbackMask
fallbackUsed = True
solution = mask.astype(np.uint8)
objective = _roccoObjectiveForSolution(scores_, solution, gamma_)
selectedCount = int(np.sum(solution))
penalizedObjective = float(objective - penalty_ * float(selectedCount))
runs = _selectedRunBounds(mask)
return (
mask,
float(objective),
{
"mode": "anchored_min_run_penalty",
"penalized_objective": float(penalizedObjective),
"selected_count": int(selectedCount),
"selected_fraction": float(selectedCount / max(n, 1)),
"selection_penalty": float(penalty_),
"gamma": float(gamma_),
"min_run_bins": int(minRunBins_),
"num_runs": int(len(runs)),
"anchor_index": int(anchor),
"anchor_selected": bool(mask[anchor]),
"anchor_fallback_window": bool(fallbackUsed),
},
)
def _bestContiguousBudgetFallbackMask(
scores: np.ndarray,
targetCount: int,
selectionPenalty: float,
gamma: float,
maxRelativeRange: float | None = None,
) -> Tuple[np.ndarray, bool]:
scores_ = np.asarray(scores, dtype=np.float64)
n = int(scores_.size)
target = int(min(max(int(targetCount), 1), n))
out = np.zeros(n, dtype=bool)
if n == 0 or target <= 0:
return out, False
adjusted = scores_ - float(selectionPenalty)
cumsum = np.concatenate(([0.0], np.cumsum(adjusted, dtype=np.float64)))
bestObjective = -math.inf
bestStart = 0
for start in range(0, n - target + 1):
end = start + target - 1
windowScore = float(cumsum[end + 1] - cumsum[start])
switchCount = int(start > 0) + int(end < n - 1)
objective = windowScore - float(max(float(gamma), 0.0)) * float(switchCount)
if objective > bestObjective:
bestObjective = objective
bestStart = start
if not np.isfinite(bestObjective) or bestObjective <= 0.0:
return out, False
if maxRelativeRange is not None:
maxRelativeRange_ = float(max(float(maxRelativeRange), 0.0))
bestWindow = scores_[bestStart : bestStart + target]
windowRange = float(np.max(bestWindow) - np.min(bestWindow))
windowScale = float(max(np.max(np.abs(bestWindow)), 1.0e-6))
if windowRange > maxRelativeRange_ * windowScale:
return out, False
out[bestStart : bestStart + target] = True
return out, True
def _refineNestedROCCOSolution(
scores: npt.ArrayLike,
solution: npt.ArrayLike,
gamma: float,
selectionPenalty: float,
nestedRoccoIters: int = _NESTED_ROCCO_ITERS_DEFAULT,
nestedRoccoBudgetScale: float = _NESTED_ROCCO_BUDGET_SCALE_DEFAULT,
jaccardThreshold: float = _NESTED_ROCCO_JACCARD_DEFAULT,
intervals: npt.ArrayLike | None = None,
ends: npt.ArrayLike | None = None,
rawScores: npt.ArrayLike | None = None,
minRegionBP: int | None = None,
minRegionBins: int = _NESTED_ROCCO_MIN_CHILD_STEPS,
diagnostics: bool = False,
diagnosticLabel: str | None = None,
diagnosticDetailPath: str | Path | None = None,
) -> Tuple[np.ndarray, Dict[str, Any]]:
r"""Run anchored local ROCCO refinements inside selected first-pass regions.
For each eligible parent or child region ``R``, solve an exact local chain
problem with ``localGamma = 0.25 * gamma``, a hard minimum selected-run
length, and a mandatory anchor at the strongest local evidence bin. When
``nestedRoccoBudgetScale < 1``, translate the scale into a soft per-bin
penalty rather than a hard local quota. This keeps nested ROCCO as a
refinement step: children may shrink or split a parent, but every parent
contributes at least one anchored child.
"""
scores_ = _asFloatVector("scores", scores)
rawScores_ = scores_
if rawScores is not None:
rawScores_ = _asFloatVector("rawScores", rawScores)
if rawScores_.size != scores_.size:
raise ValueError("`rawScores` must match `scores` length")
current = np.asarray(solution, dtype=np.uint8).ravel() > 0
if current.size != scores_.size:
raise ValueError("`solution` must match `scores` length")
intervals_: np.ndarray | None = None
ends_: np.ndarray | None = None
if intervals is not None or ends is not None:
if intervals is None or ends is None:
raise ValueError("`intervals` and `ends` must be supplied together")
intervals_ = np.asarray(intervals, dtype=np.int64).ravel()
ends_ = np.asarray(ends, dtype=np.int64).ravel()
if intervals_.size != scores_.size or ends_.size != scores_.size:
raise ValueError("`intervals` and `ends` must match `scores` length")
maxIters = max(int(nestedRoccoIters), 0)
parentGamma = float(gamma)
if not np.isfinite(parentGamma) or parentGamma < 0.0:
raise ValueError("`gamma` must be finite and non-negative")
localGamma = 0.25 * parentGamma
selectionPenalty_ = float(selectionPenalty)
if not np.isfinite(selectionPenalty_):
raise ValueError("`selectionPenalty` must be finite")
budgetScale = float(nestedRoccoBudgetScale)
if not np.isfinite(budgetScale):
raise ValueError("`nestedRoccoBudgetScale` must be finite")
budgetScale = float(np.clip(budgetScale, 0.0, 1.0))
jaccardThreshold_ = float(np.clip(jaccardThreshold, 0.0, 1.0))
minRegionBins_ = max(int(minRegionBins), 1)
minRegionBP_ = None if minRegionBP is None else max(int(minRegionBP), 0)
subproblemMaxIter = int(_NESTED_ROCCO_SUBPROBLEM_MAX_ITER)
diagnosticDetailPath_ = (
None if diagnosticDetailPath is None else str(diagnosticDetailPath)
)
history: List[Dict[str, float | int | str]] = []
details: Dict[str, Any] = {
"enabled": bool(maxIters > 0),
"requested_iters": int(maxIters),
"completed_iters": 0,
"stop_reason": "disabled" if maxIters == 0 else "not_started",
"jaccard_threshold": float(jaccardThreshold_),
"parent_gamma": float(parentGamma),
"local_gamma": float(localGamma),
"selection_penalty": float(selectionPenalty_),
"budget_scale": float(budgetScale),
"score_shift": float(selectionPenalty_),
"subproblem_mode": "anchored_min_run_penalty",
"subproblem_max_iter": int(subproblemMaxIter),
"min_region_bins": int(minRegionBins_),
"min_region_bp": None if minRegionBP_ is None else int(minRegionBP_),
"min_child_bins": int(minRegionBins_),
"anchor_policy": "argmax_raw_score_leftmost",
"diagnostic_detail_path": diagnosticDetailPath_,
"initial_selected_count": int(np.sum(current)),
"final_selected_count": int(np.sum(current)),
"history": history,
}
if maxIters == 0:
return current.astype(np.uint8), details
diagnostics_ = bool(diagnostics)
label_ = "" if diagnosticLabel is None else f" {diagnosticLabel}"
diagnosticDetailHandle = None
if diagnostics_ and diagnosticDetailPath_ is not None:
diagnosticDetailPathObj = Path(diagnosticDetailPath_)
diagnosticDetailPathObj.parent.mkdir(parents=True, exist_ok=True)
diagnosticDetailHandle = diagnosticDetailPathObj.open("a", encoding="utf-8")
def _writeSubproblemDiagnostic(row: Dict[str, Any]) -> None:
if diagnosticDetailHandle is None:
return
diagnosticDetailHandle.write(json.dumps(row, sort_keys=True) + "\n")
for iterIdx in range(maxIters):
previous = current
newMask = np.zeros_like(previous, dtype=bool)
runs = _selectedRunBounds(previous)
skippedShort = 0
iterBudgetScale = budgetScale if iterIdx == 0 else 1.0
expandedShortChildRuns = 0
expandedShortChildBins = 0
emptyLocalSolutions = 0
budgetFallbackWindows = 0
budgetConstrainedRegions = 0
localPenaltyExtraTotal = 0.0
localPenaltyExtraMax = 0.0
anchorFallbackWindows = 0
parentErasureViolations = 0
anchorSurvivalViolations = 0
if diagnostics_:
logger.info(
"nested ROCCO%s iter=%d start parent_regions=%d selected=%d budget_scale=%.4g local_gamma=%.6g selection_penalty=%.6g",
label_,
int(iterIdx + 1),
int(len(runs)),
int(np.sum(previous)),
float(iterBudgetScale),
float(localGamma),
float(selectionPenalty_),
)
for regionIdx, (start, end) in enumerate(runs, start=1):
regionLengthBP = _selectedRunLengthBP(start, end, intervals_, ends_)
if (minRegionBP_ is not None and regionLengthBP < minRegionBP_) or (
minRegionBP_ is None and (end - start + 1) < minRegionBins_
):
newMask[start : end + 1] = previous[start : end + 1]
skippedShort += 1
if diagnostics_:
_writeSubproblemDiagnostic(
{
"event": "subproblem",
"status": "skipped_short",
"chromosome": diagnosticLabel,
"iter": int(iterIdx + 1),
"region": int(regionIdx),
"bins": int(end - start + 1),
"bp": int(regionLengthBP),
}
)
continue
localScores = scores_[start : end + 1]
localMinChildBins = _minimumChildBinsForRegion(
start,
end,
intervals_,
ends_,
minRegionBP_,
minRegionBins_,
)
localBudgetTarget = _nestedBudgetTargetCount(
end - start + 1,
iterBudgetScale,
localMinChildBins,
)
localRawScores = rawScores_[start : end + 1]
anchorLocal = int(np.argmax(localRawScores))
localBudgetPenalty = float("nan")
localPenaltyDetails: Dict[str, float] = {
"base_penalty": float(selectionPenalty_),
"extra_penalty": 0.0,
"positive_score_scale": 0.0,
"positive_score_spread": 0.0,
"budget_scale": float(iterBudgetScale),
}
if iterBudgetScale < 1.0:
localSelectionPenalty, localPenaltyDetails = (
_nestedSoftSelectionPenalty(
localScores,
selectionPenalty_,
iterBudgetScale,
)
)
localBudgetPenalty = float(localSelectionPenalty)
localMask, _localObjective, localDetails = (
_solveAnchoredMinRunPenalizedChainROCCO(
localScores,
gamma=localGamma,
selectionPenalty=localSelectionPenalty,
minRunBins=localMinChildBins,
anchorIndex=anchorLocal,
)
)
localMode = "anchored_min_run_soft_budget"
budgetConstrainedRegions += 1
else:
localSelectionPenalty, localPenaltyDetails = (
_nestedSoftSelectionPenalty(
localScores,
selectionPenalty_,
iterBudgetScale,
)
)
localBudgetPenalty = float(localSelectionPenalty)
localMask, _localObjective, localDetails = (
_solveAnchoredMinRunPenalizedChainROCCO(
localScores,
gamma=localGamma,
selectionPenalty=localSelectionPenalty,
minRunBins=localMinChildBins,
anchorIndex=anchorLocal,
)
)
localMode = "anchored_min_run_penalty"
localEmptySolution = False
localAnchorFallbackUsed = bool(
localDetails.get("anchor_fallback_window", False)
)
expandedRuns = 0
expandedBins = 0
penaltyExtra = float(localPenaltyDetails["extra_penalty"])
localPenaltyExtraTotal += penaltyExtra
localPenaltyExtraMax = float(max(localPenaltyExtraMax, penaltyExtra))
if not bool(np.any(localMask)):
localEmptySolution = True
emptyLocalSolutions += 1
parentErasureViolations += 1
localMask = _anchorFallbackWindowMask(
localScores,
anchorLocal,
localMinChildBins,
)
localAnchorFallbackUsed = True
if not bool(localMask[anchorLocal]):
anchorSurvivalViolations += 1
localMask = np.asarray(localMask, dtype=bool)
localMask |= _anchorFallbackWindowMask(
localScores,
anchorLocal,
localMinChildBins,
)
localAnchorFallbackUsed = True
if localAnchorFallbackUsed:
anchorFallbackWindows += 1
newMask[start : end + 1] = localMask
if diagnostics_:
if intervals_ is not None and ends_ is not None:
regionStartBP = int(intervals_[start])
regionEndBP = int(ends_[end])
else:
regionStartBP = int(start)
regionEndBP = int(end + 1)
selectedLocal = int(np.sum(localMask))
selectedNonPositive = int(np.sum(localMask & (localRawScores <= 0.0)))
_writeSubproblemDiagnostic(
{
"event": "subproblem",
"status": "solved",
"chromosome": diagnosticLabel,
"iter": int(iterIdx + 1),
"region": int(regionIdx),
"mode": str(localMode),
"bins": int(end - start + 1),
"bp": int(regionLengthBP),
"range_start": int(regionStartBP),
"range_end": int(regionEndBP),
"selected": int(selectedLocal),
"selected_possible": int(end - start + 1),
"nonpos_selected": int(selectedNonPositive),
"min_child_bins": int(localMinChildBins),
"budget_target": int(localBudgetTarget),
"anchor_local": int(anchorLocal),
"anchor_selected": bool(localMask[anchorLocal]),
"empty_solution": bool(localEmptySolution),
"anchor_fallback": bool(localAnchorFallbackUsed),
"expanded_short_runs": int(
expandedRuns if not localEmptySolution else 0
),
"objective": float(_localObjective),
"penalized": float(localDetails["penalized_objective"]),
"solver_penalty": float(localDetails["selection_penalty"]),
"budget_penalty": float(localBudgetPenalty),
"soft_penalty_extra": float(
localPenaltyDetails["extra_penalty"]
),
"score_min": float(np.min(localRawScores)),
"score_max": float(np.max(localRawScores)),
"score_mean": float(np.mean(localRawScores)),
}
)
newMask &= previous
jaccard = _maskJaccard(previous, newMask)
selectedBefore = int(np.sum(previous))
selectedAfter = int(np.sum(newMask))
objectivePrevious = _roccoObjectiveForSolution(
scores_,
previous.astype(np.uint8),
parentGamma,
)
objectiveAfter = _roccoObjectiveForSolution(
scores_,
newMask.astype(np.uint8),
parentGamma,
)
runsAfter = _selectedRunBounds(newMask)
peakCountMonotonicityViolations = int(max(len(runs) - len(runsAfter), 0))
coverageExpansionViolations = int(max(selectedAfter - selectedBefore, 0))
history.append(
{
"iter": int(iterIdx + 1),
"num_parent_peaks": int(len(runs)),
"num_parent_peaks_after": int(len(runsAfter)),
"num_input_regions": int(len(runs)),
"num_skipped_short_regions": int(skippedShort),
"num_budget_constrained_regions": int(budgetConstrainedRegions),
"num_empty_local_solutions": int(emptyLocalSolutions),
"num_budget_fallback_windows": int(budgetFallbackWindows),
"num_anchor_fallback_windows": int(anchorFallbackWindows),
"num_parent_erasure_violations": int(parentErasureViolations),
"num_anchor_survival_violations": int(anchorSurvivalViolations),
"num_peak_count_monotonicity_violations": int(
peakCountMonotonicityViolations
),
"num_coverage_expansion_violations": int(coverageExpansionViolations),
"num_short_child_runs_expanded": int(expandedShortChildRuns),
"num_short_child_bins_added": int(expandedShortChildBins),
"local_penalty_extra_mean": float(
localPenaltyExtraTotal / max(len(runs) - skippedShort, 1)
),
"local_penalty_extra_max": float(localPenaltyExtraMax),
"budget_scale": float(iterBudgetScale),
"selected_count_before": int(selectedBefore),
"selected_count_after": int(selectedAfter),
"selected_count_delta": int(selectedAfter - selectedBefore),
"objective": float(objectiveAfter),
"objective_previous": float(objectivePrevious),
"objective_delta": float(objectiveAfter - objectivePrevious),
"jaccard": float(jaccard),
}
)
if diagnostics_:
logger.info(
"nested ROCCO%s iter=%d done selected_before=%d selected_after=%d parent_regions=%d parent_regions_after=%d skipped_short=%d budget_constrained=%d empty_solutions=%d anchor_fallback=%d parent_erasure_violations=%d anchor_survival_violations=%d peak_count_violations=%d coverage_expansion_violations=%d short_child_expanded=%d local_penalty_extra_mean=%.6g objective=%.6g objective_delta=%.6g jaccard=%.6f",
label_,
int(iterIdx + 1),
int(selectedBefore),
int(selectedAfter),
int(len(runs)),
int(len(runsAfter)),
int(skippedShort),
int(budgetConstrainedRegions),
int(emptyLocalSolutions),
int(anchorFallbackWindows),
int(parentErasureViolations),
int(anchorSurvivalViolations),
int(peakCountMonotonicityViolations),
int(coverageExpansionViolations),
int(expandedShortChildRuns),
float(localPenaltyExtraTotal / max(len(runs) - skippedShort, 1)),
float(objectiveAfter),
float(objectiveAfter - objectivePrevious),
float(jaccard),
)
details["completed_iters"] = int(iterIdx + 1)
current = newMask
details["final_selected_count"] = int(selectedAfter)
if np.array_equal(newMask, previous):
details["stop_reason"] = "mask_equal"
if diagnostics_:
logger.info(
"nested ROCCO%s stop iter=%d reason=mask_equal",
label_,
int(iterIdx + 1),
)
break
if jaccard >= jaccardThreshold_:
details["stop_reason"] = "jaccard"
if diagnostics_:
logger.info(
"nested ROCCO%s stop iter=%d reason=jaccard threshold=%.6f observed=%.6f",
label_,
int(iterIdx + 1),
float(jaccardThreshold_),
float(jaccard),
)
break
else:
details["stop_reason"] = "max_iter"
if diagnostics_:
logger.info(
"nested ROCCO%s stop iter=%d reason=max_iter",
label_,
int(maxIters),
)
if diagnosticDetailHandle is not None:
diagnosticDetailHandle.close()
return current.astype(np.uint8), details
def _readAlignedConsenrichBedGraphs(
stateBedGraphFile: str,
uncertaintyBedGraphFile: str | None = None,
chromosomes: Iterable[str] | None = None,
) -> Dict[str, Dict[str, np.ndarray]]:
colsState = ["chromosome", "start", "end", "state"]
stateDF = pd.read_csv(
stateBedGraphFile,
sep="\t",
header=None,
names=colsState,
dtype={
"chromosome": str,
"start": np.int64,
"end": np.int64,
"state": np.float64,
},
)
stateDF.sort_values(
by=["chromosome", "start", "end"],
kind="mergesort",
inplace=True,
)
stateDF.reset_index(drop=True, inplace=True)
uncertaintyDF: pd.DataFrame | None = None
if uncertaintyBedGraphFile is not None:
colsUnc = ["chromosome", "start", "end", "uncertainty"]
uncertaintyDF = pd.read_csv(
uncertaintyBedGraphFile,
sep="\t",
header=None,
names=colsUnc,
dtype={
"chromosome": str,
"start": np.int64,
"end": np.int64,
"uncertainty": np.float64,
},
)
uncertaintyDF.sort_values(
by=["chromosome", "start", "end"],
kind="mergesort",
inplace=True,
)
uncertaintyDF.reset_index(drop=True, inplace=True)
if not stateDF[["chromosome", "start", "end"]].equals(
uncertaintyDF[["chromosome", "start", "end"]]
):
raise ValueError(
"`stateBedGraphFile` and `uncertaintyBedGraphFile` are not aligned."
)
allowedChroms = set(chromosomes) if chromosomes is not None else None
out: Dict[str, Dict[str, np.ndarray]] = {}
for chromosome, chromStateDF in stateDF.groupby("chromosome", sort=False):
if allowedChroms is not None and chromosome not in allowedChroms:
continue
chromUncDF = None
if uncertaintyDF is not None:
chromUncDF = uncertaintyDF[uncertaintyDF["chromosome"] == chromosome]
out[str(chromosome)] = {
"intervals": chromStateDF["start"].to_numpy(dtype=np.int64, copy=True),
"ends": chromStateDF["end"].to_numpy(dtype=np.int64, copy=True),
"state": chromStateDF["state"].to_numpy(dtype=np.float64, copy=True),
"uncertainty": (
chromUncDF["uncertainty"].to_numpy(dtype=np.float64, copy=True)
if chromUncDF is not None
else None
),
}
return out
def _selectSubpeakSummits(
segmentState: np.ndarray,
nullScale: float,
contextSpan: int,
) -> np.ndarray:
segState = np.asarray(segmentState, dtype=np.float64)
n = int(segState.size)
if n == 0:
return np.zeros(0, dtype=np.int64)
globalPeak = int(np.argmax(segState))
if n < 4:
return np.asarray([globalPeak], dtype=np.int64)
candidatePeaks, _ = signal.find_peaks(
segState,
)
if candidatePeaks.size == 0:
return np.asarray([globalPeak], dtype=np.int64)
blockMaxExcess = float(max(np.max(segState) - np.min(segState), 0.0))
nullProminence = float(max(float(nullScale), 1.0e-6))
prominenceThreshold = float(
max(nullProminence, min(0.25 * blockMaxExcess, 4.0 * nullProminence))
)
if candidatePeaks.size > 0:
prominences = signal.peak_prominences(segState, candidatePeaks)[0]
candidatePeaks = candidatePeaks[prominences >= prominenceThreshold]
if candidatePeaks.size == 0:
return np.asarray([globalPeak], dtype=np.int64)
if globalPeak not in set(candidatePeaks.tolist()):
candidatePeaks = np.sort(
np.unique(
np.concatenate(
[
np.asarray(candidatePeaks, dtype=np.int64),
np.asarray([globalPeak], dtype=np.int64),
]
)
)
)
kept = sorted(set(int(idx) for idx in candidatePeaks.tolist()))
if len(kept) <= 1:
return np.asarray([globalPeak], dtype=np.int64)
while len(kept) > 1:
removed = False
for i in range(len(kept) - 1):
left = int(kept[i])
right = int(kept[i + 1])
valley = left + int(np.argmin(segState[left : right + 1]))
leftProm = float(segState[left] - segState[valley])
rightProm = float(segState[right] - segState[valley])
if min(leftProm, rightProm) >= prominenceThreshold:
continue
if float(segState[left]) >= float(segState[right]):
drop = right
else:
drop = left
kept = [idx for idx in kept if idx != drop]
removed = True
break
if not removed:
break
if len(kept) == 0:
kept = [globalPeak]
if globalPeak not in kept:
kept.append(globalPeak)
kept = sorted(set(kept))
return np.asarray(kept, dtype=np.int64)
def _splitSelectedSegment(
segmentState: np.ndarray,
startIdx: int,
endIdx: int,
nullScale: float,
contextSpan: int,
) -> List[Dict[str, int | float | bool]]:
segState = np.asarray(segmentState, dtype=np.float64)
summits = _selectSubpeakSummits(
segState,
nullScale=float(nullScale),
contextSpan=int(contextSpan),
)
if summits.size <= 1:
summitLocal = int(summits[0]) if summits.size == 1 else int(np.argmax(segState))
return [
{
"start_idx": int(startIdx),
"end_idx": int(endIdx),
"summit_idx": int(startIdx + summitLocal),
"segment_length_bins": int(max(endIdx - startIdx + 1, 0)),
"num_subpeaks": 1,
"split_from_parent": False,
}
]
splitPoints: List[int] = []
sortedSummits = np.sort(summits.astype(np.int64, copy=False))
for left, right in zip(sortedSummits[:-1], sortedSummits[1:]):
valleyLocal = int(left + np.argmin(segState[left : right + 1]))
splitPoints.append(valleyLocal)
localRanges: List[Tuple[int, int]] = []
localStart = 0
for splitLocal in splitPoints:
splitLocal_ = int(splitLocal)
if splitLocal_ > localStart:
localRanges.append((localStart, splitLocal_ - 1))
localStart = int(splitLocal) + 1
if localStart <= int(segState.size - 1):
localRanges.append((localStart, int(segState.size - 1)))
out: List[Dict[str, int | float | bool]] = []
numSubpeaks = int(len(localRanges))
for localLeft, localRight in localRanges:
childState = segState[localLeft : localRight + 1]
summitLocal = int(localLeft + np.argmax(childState))
out.append(
{
"start_idx": int(startIdx + localLeft),
"end_idx": int(startIdx + localRight),
"summit_idx": int(startIdx + summitLocal),
"segment_length_bins": int(localRight - localLeft + 1),
"num_subpeaks": int(numSubpeaks),
"split_from_parent": True,
}
)
return out
def _trimChildSegmentAroundSummit(
childStartIdx: int,
childEndIdx: int,
summitIdx: int,
scores: np.ndarray,
trimScoreFloor: float | None,
) -> Tuple[int, int, bool]:
if trimScoreFloor is None:
return int(childStartIdx), int(childEndIdx), False
floor = float(trimScoreFloor)
if not np.isfinite(floor):
return int(childStartIdx), int(childEndIdx), False
start = int(childStartIdx)
end = int(childEndIdx)
summit = int(np.clip(int(summitIdx), start, end))
localSummit = int(summit - start)
childScores = np.asarray(scores[start : end + 1], dtype=np.float64)
if childScores.size == 0:
return start, end, False
keep = childScores > floor
if not bool(keep[localSummit]):
return start, end, False
left = localSummit
while left > 0 and bool(keep[left - 1]):
left -= 1
right = localSummit
while right + 1 < keep.size and bool(keep[right + 1]):
right += 1
trimmedStart = int(start + left)
trimmedEnd = int(start + right)
trimmed = bool(trimmedStart != start or trimmedEnd != end)
return trimmedStart, trimmedEnd, trimmed
def _solutionToChromNarrowPeakRows(
chromosome: str,
intervals: np.ndarray,
ends: np.ndarray,
state: np.ndarray,
scores: np.ndarray,
solution: np.ndarray,
prefix: str,
nullScale: float,
contextSpan: int,
uncertainty: np.ndarray | None = None,
splitSubpeaks: bool = True,
trimScoreFloor: float | None = 0.0,
dropMedianSignalBelowNegativeLocalP: bool = True,
exportFilterUncertaintyMultiplier: float = (
_EXPORT_MEDIAN_SIGNAL_LOCAL_UNCERTAINTY_MULTIPLIER
),
scoreFloor: float = 250.0,
scoreCeil: float = 1000.0,
returnExportDetails: bool = False,
) -> (
Tuple[List[List[str | int | float]], List[Dict[str, Any]]]
| Tuple[List[List[str | int | float]], List[Dict[str, Any]], Dict[str, Any]]
):
rowsRaw: List[Dict[str, float | int | str]] = []
rowsMeta: List[Dict[str, Any]] = []
state_ = np.asarray(state, dtype=np.float64)
scores_ = np.asarray(scores, dtype=np.float64)
uncertainty_: np.ndarray | None = None
if uncertainty is not None:
uncertainty_ = np.asarray(uncertainty, dtype=np.float64).ravel()
if uncertainty_.size != state_.size:
raise ValueError("`uncertainty` must match `state` length")
exportFilterUncertaintyMultiplier_ = _validateExportFilterUncertaintyMultiplier(
exportFilterUncertaintyMultiplier
)
n = int(solution.size)
exportDetails: Dict[str, Any] = {
"num_candidate_segments": 0,
"num_segments_dropped_median_signal_local_p": 0,
"median_signal_local_p_multiplier": float(exportFilterUncertaintyMultiplier_),
"median_signal_local_p_filter_active": bool(
dropMedianSignalBelowNegativeLocalP and uncertainty_ is not None
),
}
i = 0
while i < n:
if int(solution[i]) <= 0:
i += 1
continue
startIdx = i
while i + 1 < n and int(solution[i + 1]) > 0:
i += 1
endIdx = i
segState = np.asarray(state_[startIdx : endIdx + 1], dtype=np.float64)
if splitSubpeaks:
childSegments = _splitSelectedSegment(
segState,
startIdx=startIdx,
endIdx=endIdx,
nullScale=float(nullScale),
contextSpan=int(contextSpan),
)
else:
summitLocal = int(np.argmax(segState))
childSegments = [
{
"start_idx": int(startIdx),
"end_idx": int(endIdx),
"summit_idx": int(startIdx + summitLocal),
"segment_length_bins": int(max(endIdx - startIdx + 1, 0)),
"num_subpeaks": 1,
"split_from_parent": False,
}
]
for child in childSegments:
untrimmedStartIdx = int(child["start_idx"])
untrimmedEndIdx = int(child["end_idx"])
summitIdx = int(child["summit_idx"])
childStartIdx, childEndIdx, wasTrimmed = _trimChildSegmentAroundSummit(
untrimmedStartIdx,
untrimmedEndIdx,
summitIdx,
scores_,
trimScoreFloor,
)
childState = np.asarray(
state_[childStartIdx : childEndIdx + 1], dtype=np.float64
)
childScores = np.asarray(
scores_[childStartIdx : childEndIdx + 1], dtype=np.float64
)
exportDetails["num_candidate_segments"] += 1
medianState = float(np.median(childState))
localMedianP = None
medianSignalThreshold = None
if dropMedianSignalBelowNegativeLocalP and uncertainty_ is not None:
localP = np.asarray(
uncertainty_[childStartIdx : childEndIdx + 1],
dtype=np.float64,
)
localP = localP[np.isfinite(localP)]
if localP.size > 0:
localMedianP = float(np.median(localP))
medianSignalThreshold = float(
-exportFilterUncertaintyMultiplier_ * localMedianP
)
if medianState < medianSignalThreshold:
exportDetails["num_segments_dropped_median_signal_local_p"] += 1
continue
summitAbs = int(
intervals[summitIdx]
+ max(1, int((ends[summitIdx] - intervals[summitIdx]) // 2))
)
chromStart = int(intervals[childStartIdx])
chromEnd = int(ends[childEndIdx])
peakOffset = int(max(0, summitAbs - chromStart))
peakName = f"{prefix}_{chromosome}_{len(rowsRaw)+1}"
rowsRaw.append(
{
"chromosome": str(chromosome),
"start": chromStart,
"end": chromEnd,
"signal": float(np.max(childState)),
"raw_score": float(np.max(childScores)),
"peak": peakOffset,
"name": str(peakName),
}
)
rowsMeta.append(
{
"name": str(peakName),
"chromosome": str(chromosome),
"start": int(chromStart),
"end": int(chromEnd),
"summit": int(summitAbs),
"segment_length_bins": int(child["segment_length_bins"]),
"median_state": float(medianState),
"local_median_p": (
None if localMedianP is None else float(localMedianP)
),
"median_signal_threshold": (
None
if medianSignalThreshold is None
else float(medianSignalThreshold)
),
"max_state": float(np.max(childState)),
"max_score": float(np.max(childScores)),
"num_subpeaks": int(child["num_subpeaks"]),
"split_from_parent": bool(child["split_from_parent"]),
"trimmed_from_parent": bool(wasTrimmed),
"trim_score_floor": (
None if trimScoreFloor is None else float(trimScoreFloor)
),
"untrimmed_start": int(intervals[untrimmedStartIdx]),
"untrimmed_end": int(ends[untrimmedEndIdx]),
}
)
i += 1
if len(rowsRaw) == 0:
if returnExportDetails:
exportDetails["num_segments_kept"] = 0
return [], [], exportDetails
return [], []
rawScores = np.asarray([row["raw_score"] for row in rowsRaw], dtype=np.float64)
minScore = float(np.min(rawScores))
maxScore = float(np.max(rawScores))
span = max(maxScore - minScore, 1.0e-12)
outRows: List[List[str | int | float]] = []
for row in rowsRaw:
scaled = scoreFloor + (scoreCeil - scoreFloor) * (
(float(row["raw_score"]) - minScore) / span
)
outRows.append(
[
str(row["chromosome"]),
int(row["start"]),
int(row["end"]),
str(row["name"]),
int(round(scaled)),
".",
float(row["signal"]),
".",
".",
int(row["peak"]),
]
)
exportDetails["num_segments_kept"] = int(len(outRows))
if returnExportDetails:
return outRows, rowsMeta, exportDetails
return outRows, rowsMeta
[docs]
def solveRocco(
stateBedGraphFile: str,
uncertaintyBedGraphFile: str | None = None,
chromosomes: Iterable[str] | None = None,
tau0: float = 1.0,
numBootstrap: int = _ROCCO_NUM_BOOTSTRAP_DEFAULT,
thresholdZ: float = _ROCCO_THRESHOLD_Z_DEFAULT,
dependenceSpan: int | None = None,
gamma: float | None = 0.5,
selectionPenalty: float | None = None,
gammaScale: float = 0.5,
nestedRoccoIters: int = _NESTED_ROCCO_ITERS_DEFAULT,
nestedRoccoBudgetScale: float = _NESTED_ROCCO_BUDGET_SCALE_DEFAULT,
exportFilterUncertaintyMultiplier: float = (
_EXPORT_MEDIAN_SIGNAL_LOCAL_UNCERTAINTY_MULTIPLIER
),
randSeed: int = 42,
outPath: str | None = None,
metaPath: str | None = None,
verbose: bool = False,
) -> str:
r"""Run Consenrich+ROCCO peak caller directly on bedGraphs."""
exportFilterUncertaintyMultiplier_ = _validateExportFilterUncertaintyMultiplier(
exportFilterUncertaintyMultiplier
)
chromData = _readAlignedConsenrichBedGraphs(
stateBedGraphFile,
uncertaintyBedGraphFile=uncertaintyBedGraphFile,
chromosomes=chromosomes,
)
stateBase = Path(stateBedGraphFile)
if outPath is None:
outPath = str(stateBase.with_name(f"{stateBase.stem}_rocco.narrowPeak"))
if metaPath is None:
metaPath = f"{outPath}.json"
nestedRoccoSubproblemDetailsPath: str | None = None
if verbose:
nestedRoccoSubproblemDetailsPath = f"{outPath}.nested_rocco_subproblems.jsonl"
Path(nestedRoccoSubproblemDetailsPath).parent.mkdir(
parents=True,
exist_ok=True,
)
Path(nestedRoccoSubproblemDetailsPath).write_text("", encoding="utf-8")
logger.info(
"writing nested ROCCO subproblem solving details to %s",
nestedRoccoSubproblemDetailsPath,
)
allRows: List[List[str | int | float]] = []
meta: Dict[str, Any] = {
"settings": {
"state_bedgraph": str(stateBedGraphFile),
"uncertainty_bedgraph": (
None
if uncertaintyBedGraphFile is None
else str(uncertaintyBedGraphFile)
),
"tau0": float(tau0),
"budget_method": "dwb_integrated_excess_tail",
"null_calibration_method": "stationary_null_dwb",
"num_bootstrap": int(numBootstrap),
"threshold_z": float(thresholdZ),
"null_quantile": float(_ROCCO_NULL_QUANTILE),
"threshold_z_grid": [float(z) for z in _resolveThresholdZGrid(thresholdZ)],
"nested_rocco_iters": int(max(int(nestedRoccoIters), 0)),
"nested_rocco_budget_scale": float(
np.clip(float(nestedRoccoBudgetScale), 0.0, 1.0)
),
"nested_rocco_jaccard": float(_NESTED_ROCCO_JACCARD_DEFAULT),
"nested_rocco_min_parent_steps": int(_NESTED_ROCCO_MIN_PARENT_STEPS),
"nested_rocco_min_child_steps": int(_NESTED_ROCCO_MIN_CHILD_STEPS),
"nested_rocco_subproblem_policy": "anchored_exact_min_run_soft_budget_penalty",
"nested_rocco_diagnostics": bool(verbose),
"nested_rocco_subproblem_details": nestedRoccoSubproblemDetailsPath,
"export_trim": "summit_component_score_above_floor",
"export_trim_score_floor": 0.0,
"export_filter": "drop_median_signal_below_negative_local_median_p",
"export_filter_threshold": f"-{exportFilterUncertaintyMultiplier_:g} * median(local_uncertainty)",
"export_filter_uncertainty_multiplier": float(
exportFilterUncertaintyMultiplier_
),
"export_filter_uses_uncertainty_bedgraph": True,
"rand_seed": int(randSeed),
},
"pooled_null_floor": None,
"budget_shrinkage": None,
"chromosomes": {},
}
chromWork: Dict[str, Dict[str, Any]] = {}
for chromIndex, (chromosome, data) in enumerate(chromData.items()):
state = np.asarray(data["state"], dtype=np.float64)
intervals = np.asarray(data["intervals"], dtype=np.int64)
ends = np.asarray(data["ends"], dtype=np.int64)
uncertainty = (
None
if data["uncertainty"] is None
else np.asarray(data["uncertainty"], dtype=np.float64)
)
if state.size == 0:
continue
prepared = _prepareROCCOScoreAndNull(
state,
uncertainty=uncertainty,
tau0=tau0,
thresholdZ=thresholdZ,
numBootstrap=numBootstrap,
dependenceSpan=dependenceSpan,
kernel="bartlett",
randomSeed=int(randSeed) + chromIndex,
nullQuantile=_ROCCO_NULL_QUANTILE,
thresholdZGrid=_ROCCO_BUDGET_Z_GRID,
)
scoreTrack = np.asarray(prepared["score_track"], dtype=np.float64)
budgetRaw, budgetDetails = _estimateBudgetForPreparedROCCOScore(
prepared,
statistic="integrated",
numBootstrap=numBootstrap,
dependenceSpan=dependenceSpan,
thresholdZ=thresholdZ,
randomSeed=int(randSeed) + chromIndex,
tau0=tau0,
nullQuantile=_ROCCO_NULL_QUANTILE,
returnDetails=True,
)
gamma_, gammaDetails = estimateROCCOGamma(
scoreTrack,
dependenceSpan=budgetDetails["dependence_span"],
gammaSpan=budgetDetails.get(
"context_span_lower",
budgetDetails["dependence_span"],
),
gamma=gamma,
gammaScale=gammaScale,
nullCenter=float(prepared["null_center"]),
threshold=float(prepared["threshold"]),
returnDetails=True,
)
chromWork[str(chromosome)] = {
"state": state,
"uncertainty": uncertainty,
"intervals": intervals,
"ends": ends,
"score_track": scoreTrack,
"null_scale": float(prepared["null_scale"]),
"budget_raw": float(budgetRaw),
"budget_details": dict(budgetDetails),
"gamma": float(gamma_),
"gamma_details": dict(gammaDetails),
"interval_bp": int(np.median(ends - intervals)),
}
for chromosome, work in chromWork.items():
state = np.asarray(work["state"], dtype=np.float64)
intervals = np.asarray(work["intervals"], dtype=np.int64)
ends = np.asarray(work["ends"], dtype=np.int64)
scoreTrack = np.asarray(work["score_track"], dtype=np.float64)
uncertainty = (
None
if work["uncertainty"] is None
else np.asarray(work["uncertainty"], dtype=np.float64)
)
nullScale = float(work["null_scale"])
budget = float(work["budget_raw"])
budgetDetails = dict(work["budget_details"])
budgetDetails["budget_pre_shrink"] = float(work["budget_raw"])
budgetDetails["budget_post_shrink"] = float(budget)
budgetDetails["budget_shrink_delta"] = 0.0
budgetDetails["budget_shrinkage_meta"] = None
solution, objective, solveDetails = solveChromROCCO(
scoreTrack,
budget=budget,
gamma=float(work["gamma"]),
selectionPenalty=selectionPenalty,
returnDetails=True,
)
firstPassSolution = np.asarray(solution, dtype=np.uint8)
refinedSolution, nestedDetails = _refineNestedROCCOSolution(
scoreTrack,
firstPassSolution,
gamma=float(work["gamma"]),
selectionPenalty=float(solveDetails["selection_penalty"]),
nestedRoccoIters=nestedRoccoIters,
nestedRoccoBudgetScale=nestedRoccoBudgetScale,
jaccardThreshold=_NESTED_ROCCO_JACCARD_DEFAULT,
intervals=intervals,
ends=ends,
rawScores=scoreTrack,
minRegionBP=int(
_NESTED_ROCCO_MIN_PARENT_STEPS * max(int(work["interval_bp"]), 1)
),
diagnostics=bool(verbose),
diagnosticLabel=str(chromosome),
diagnosticDetailPath=nestedRoccoSubproblemDetailsPath,
)
solution = np.asarray(refinedSolution, dtype=np.uint8)
finalObjective = _roccoObjectiveForSolution(
scoreTrack,
solution,
gamma=float(work["gamma"]),
)
solveDetails = dict(solveDetails)
solveDetails["first_pass_selected_count"] = int(np.sum(firstPassSolution))
solveDetails["final_selected_count"] = int(np.sum(solution))
solveDetails["nested_rocco_iters"] = int(max(int(nestedRoccoIters), 0))
solveDetails["nested_rocco_budget_scale"] = float(
np.clip(float(nestedRoccoBudgetScale), 0.0, 1.0)
)
solveDetails["nested_rocco_stop_reason"] = str(nestedDetails["stop_reason"])
exportTrimScoreFloor = 0.0
rows, peakMeta, exportDetails = _solutionToChromNarrowPeakRows(
str(chromosome),
intervals,
ends,
state,
np.asarray(scoreTrack, dtype=np.float64),
solution,
prefix="consenrichROCCO",
nullScale=float(nullScale),
contextSpan=int(budgetDetails.get("dependence_span", 4)),
uncertainty=uncertainty,
trimScoreFloor=float(exportTrimScoreFloor),
exportFilterUncertaintyMultiplier=float(exportFilterUncertaintyMultiplier_),
returnExportDetails=True,
)
allRows.extend(rows)
meta["chromosomes"][str(chromosome)] = {
"n_loci": int(state.size),
"interval_bp": int(work["interval_bp"]),
"budget": float(budget),
"objective": float(finalObjective),
"first_pass_objective": float(objective),
"num_parent_segments": int(
np.sum(
np.diff(
np.pad(
np.asarray(firstPassSolution, dtype=np.int8),
(1, 1),
mode="constant",
)
)
== 1
)
),
"num_segments": int(len(rows)),
"num_candidate_segments": int(exportDetails["num_candidate_segments"]),
"num_segments_dropped_median_signal_local_p": int(
exportDetails["num_segments_dropped_median_signal_local_p"]
),
"budget_details": budgetDetails,
"gamma_details": dict(work["gamma_details"]),
"solve_details": solveDetails,
"nested_rocco_details": nestedDetails,
"export_trim_score_floor": float(exportTrimScoreFloor),
"export_details": exportDetails,
"peak_details": peakMeta,
}
allRows.sort(key=lambda row: (str(row[0]), int(row[1]), int(row[2])))
with open(outPath, "w", encoding="utf-8") as handle:
for row in allRows:
handle.write("\t".join(map(str, row)) + "\n")
with open(metaPath, "w", encoding="utf-8") as handle:
json.dump(meta, handle, indent=2, sort_keys=True)
return str(outPath)