Source code for anomsmith.workflows.detect

"""Public workflow functions for anomaly detection."""

import logging
from typing import TYPE_CHECKING, Any, Union

import numpy as np
import pandas as pd

from anomsmith.constants import DEFAULT_DETECTION_REPORT_TOP_ANOMALIES
from anomsmith.primitives.base import BaseDetector, BaseScorer
from anomsmith.primitives.thresholding import ThresholdRule, apply_threshold
from anomsmith.tasks.detect import run_detection, run_scoring

if TYPE_CHECKING:
    try:
        from timesmith.typing import SeriesLike
    except ImportError:
        SeriesLike = None

logger = logging.getLogger(__name__)


[docs] def score_anomalies( y: Union[pd.Series, np.ndarray, "SeriesLike"], scorer: BaseScorer, ) -> pd.Series: """Score anomalies in a time series. Args: y: Time series to score scorer: BaseScorer instance Returns: pandas Series of anomaly scores with same index as y """ logger.info(f"Scoring anomalies with {scorer.__class__.__name__}") score_view = run_scoring(y, scorer) return pd.Series(score_view.scores, index=score_view.index)
[docs] def detect_anomalies( y: Union[pd.Series, np.ndarray, "SeriesLike"], detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule, ) -> pd.DataFrame: """Detect anomalies in a time series. Args: y: Time series to detect anomalies in detector: BaseDetector or BaseScorer instance threshold_rule: ThresholdRule to apply Returns: pandas DataFrame with 'score' and 'flag' columns, indexed by y's index """ logger.info( f"Detecting anomalies with {detector.__class__.__name__} " f"and threshold {threshold_rule}" ) label_view, score_view = run_detection(y, detector) # Apply threshold if scorer was provided if isinstance(detector, BaseScorer): label_view = apply_threshold(score_view, threshold_rule) result = pd.DataFrame( { "score": score_view.scores, "flag": label_view.labels, }, index=score_view.index, ) return result
[docs] def sweep_thresholds( y: Union[pd.Series, np.ndarray, "SeriesLike"], scorer: BaseScorer, threshold_values: list[float] | np.ndarray, labels: Union[pd.Series, np.ndarray, "SeriesLike", None] = None, ) -> pd.DataFrame: """Evaluate multiple threshold values and return metrics. Args: y: Time series to score scorer: BaseScorer instance threshold_values: List of threshold values to evaluate labels: Optional ground truth labels Returns: pandas DataFrame with columns: threshold, precision, recall, f1 (metrics are NaN if labels not provided) """ logger.info(f"Sweeping {len(threshold_values)} threshold values") score_view = run_scoring(y, scorer) # Convert threshold_values to numpy array for vectorized operations thresholds = np.asarray(threshold_values) scores = score_view.scores # Pre-compute aligned labels once if provided if labels is not None: if isinstance(labels, pd.Series): aligned_labels = labels.reindex(score_view.index, fill_value=0).values else: labels_arr = np.asarray(labels) if len(labels_arr) != len(score_view.index): raise ValueError( f"labels length ({len(labels_arr)}) must match y length ({len(score_view.index)})" ) aligned_labels = (labels_arr != 0).astype(int) from anomsmith.workflows.eval.metrics import ( compute_f1, compute_precision, compute_recall, ) precision_list: list[float] = [] recall_list: list[float] = [] f1_list: list[float] = [] for t in thresholds: y_pred = (scores >= t).astype(int) precision_list.append(float(compute_precision(aligned_labels, y_pred))) recall_list.append(float(compute_recall(aligned_labels, y_pred))) f1_list.append(float(compute_f1(aligned_labels, y_pred))) precision = np.asarray(precision_list) recall = np.asarray(recall_list) f1 = np.asarray(f1_list) else: precision = np.full(len(thresholds), np.nan) recall = np.full(len(thresholds), np.nan) f1 = np.full(len(thresholds), np.nan) # Build results DataFrame directly (vectorized) return pd.DataFrame( { "threshold": thresholds, "precision": precision, "recall": recall, "f1": f1, } )
[docs] def report_detection( y: Union[pd.Series, np.ndarray, "SeriesLike"], detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule, ) -> dict[str, Any]: """Generate detection report with summary stats. Args: y: Time series that was analyzed detector: BaseDetector or BaseScorer instance used threshold_rule: ThresholdRule applied Returns: Dictionary with summary stats and top anomaly timestamps """ logger.info("Generating detection report") result_df = detect_anomalies(y, detector, threshold_rule) n_anomalies = result_df["flag"].sum() n_total = len(result_df) anomaly_rate = n_anomalies / n_total if n_total > 0 else 0.0 # Top anomalies by score top_anomalies = ( result_df[result_df["flag"] == 1] .nlargest(DEFAULT_DETECTION_REPORT_TOP_ANOMALIES, "score") .index.tolist() ) report = { "n_anomalies": int(n_anomalies), "n_total": int(n_total), "anomaly_rate": float(anomaly_rate), "mean_score": float(result_df["score"].mean()), "max_score": float(result_df["score"].max()), "min_score": float(result_df["score"].min()), "top_anomaly_timestamps": [str(ts) for ts in top_anomalies], } return report