Source code for anomsmith.workflows.model_monitoring
"""Model monitoring and performance tracking utilities.
Designed for integration with monitoring systems like AWS CloudWatch,
Azure Monitor, or GCP Cloud Monitoring.
"""
import logging
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
import numpy as np
import pandas as pd
from anomsmith.constants import (
DEFAULT_DRIFT_DETECTION_STDDEV_THRESHOLD,
DEFAULT_MODEL_METRIC_DEGRADATION_THRESHOLD,
)
from anomsmith.workflows.eval.metrics import (
average_run_length,
compute_f1,
compute_precision,
compute_recall,
)
if TYPE_CHECKING:
try:
from timesmith.typing import SeriesLike
except ImportError:
SeriesLike = None
logger = logging.getLogger(__name__)
[docs]
def compute_performance_metrics(
true_labels: np.ndarray | pd.Series,
predicted_labels: np.ndarray | pd.Series,
scores: np.ndarray | pd.Series | None = None,
) -> dict[str, float]:
"""Compute comprehensive performance metrics for model monitoring.
Returns metrics suitable for CloudWatch, Prometheus, or similar
monitoring systems.
Args:
true_labels: Ground truth binary labels (0 = normal, 1 = anomaly)
predicted_labels: Predicted binary labels
scores: Optional anomaly scores (for threshold-independent metrics)
Returns:
Dictionary with metrics:
- precision: Precision score
- recall: Recall score
- f1: F1 score
- true_positives: Number of true positives
- false_positives: Number of false positives
- false_negatives: Number of false negatives
- true_negatives: Number of true negatives
- anomaly_rate: Proportion of predicted anomalies
- avg_run_length: Average length of anomaly runs (if scores provided)
Examples:
>>> metrics = compute_performance_metrics(true_labels, pred_labels, scores)
>>> # Send to CloudWatch
>>> cloudwatch.put_metric_data(
... Namespace="AnomalyDetection",
... MetricData=[{"MetricName": "F1", "Value": metrics["f1"]}]
... )
"""
# Convert to numpy arrays
if isinstance(true_labels, pd.Series):
y_true = true_labels.values
else:
y_true = np.asarray(true_labels)
if isinstance(predicted_labels, pd.Series):
y_pred = predicted_labels.values
else:
y_pred = np.asarray(predicted_labels)
# Compute confusion matrix components
tp = np.sum((y_true == 1) & (y_pred == 1))
fp = np.sum((y_true == 0) & (y_pred == 1))
fn = np.sum((y_true == 1) & (y_pred == 0))
tn = np.sum((y_true == 0) & (y_pred == 0))
# Compute metrics
precision = compute_precision(y_true, y_pred)
recall = compute_recall(y_true, y_pred)
f1 = compute_f1(y_true, y_pred)
anomaly_rate = y_pred.mean()
metrics = {
"precision": float(precision),
"recall": float(recall),
"f1": float(f1),
"true_positives": int(tp),
"false_positives": int(fp),
"false_negatives": int(fn),
"true_negatives": int(tn),
"anomaly_rate": float(anomaly_rate),
}
# Add average run length if scores provided
if scores is not None:
if isinstance(scores, pd.Series):
scores_array = scores.values
else:
scores_array = np.asarray(scores)
# Use predictions to compute run length
avg_run_len = average_run_length(y_pred)
metrics["avg_run_length"] = float(avg_run_len)
# Add score statistics
metrics["mean_score"] = float(np.mean(scores_array))
metrics["max_score"] = float(np.max(scores_array))
metrics["min_score"] = float(np.min(scores_array))
metrics["score_std"] = float(np.std(scores_array))
return metrics
[docs]
def detect_concept_drift(
recent_scores: np.ndarray | pd.Series,
historical_scores: np.ndarray | pd.Series,
threshold: float = DEFAULT_DRIFT_DETECTION_STDDEV_THRESHOLD,
) -> dict[str, Any]:
"""Detect concept drift in model scores.
Compares recent score distribution to historical distribution
using statistical tests. Useful for triggering model retraining.
Args:
recent_scores: Recent anomaly scores (last N samples)
historical_scores: Historical anomaly scores (training/baseline period)
threshold: Threshold for drift detection (default 2.0 std devs)
Returns:
Dictionary with drift detection results:
- drift_detected: Boolean indicating if drift detected
- recent_mean: Mean of recent scores
- historical_mean: Mean of historical scores
- drift_magnitude: Difference in means normalized by historical std
- ks_statistic: Kolmogorov-Smirnov test statistic (if scipy available)
- p_value: P-value from KS test (if scipy available)
Examples:
>>> drift_info = detect_concept_drift(
... recent_scores=model_scores[-1000:],
... historical_scores=baseline_scores
... )
>>> if drift_info["drift_detected"]:
... trigger_model_retraining()
"""
# Convert to numpy arrays
if isinstance(recent_scores, pd.Series):
recent = recent_scores.values
else:
recent = np.asarray(recent_scores)
if isinstance(historical_scores, pd.Series):
historical = historical_scores.values
else:
historical = np.asarray(historical_scores)
# Compute statistics
recent_mean = np.mean(recent)
historical_mean = np.mean(historical)
historical_std = np.std(historical)
# Normalized drift magnitude
if historical_std > 0:
drift_magnitude = abs(recent_mean - historical_mean) / historical_std
else:
drift_magnitude = 0.0
drift_detected = drift_magnitude > threshold
result = {
"drift_detected": bool(drift_detected),
"recent_mean": float(recent_mean),
"historical_mean": float(historical_mean),
"drift_magnitude": float(drift_magnitude),
"threshold": float(threshold),
}
# Try to compute KS test if scipy available
try:
from scipy import stats
ks_statistic, p_value = stats.ks_2samp(historical, recent)
result["ks_statistic"] = float(ks_statistic)
result["p_value"] = float(p_value)
except ImportError:
logger.debug("scipy not available, skipping KS test")
return result
[docs]
def aggregate_metrics_for_cloudwatch(
metrics_list: list[dict[str, float]],
namespace: str = "AnomalyDetection",
model_name: str | None = None,
timestamp: datetime | None = None,
) -> list[dict[str, Any]]:
"""Format metrics for AWS CloudWatch PutMetricData API.
Aggregates multiple metric dictionaries into CloudWatch format.
Args:
metrics_list: List of metric dictionaries from compute_performance_metrics
namespace: CloudWatch namespace (default "AnomalyDetection")
model_name: Optional model name for dimension
timestamp: Optional timestamp (default: now)
Returns:
List of CloudWatch metric data dictionaries
Examples:
>>> metrics = [compute_performance_metrics(y1, pred1), ...]
>>> cw_metrics = aggregate_metrics_for_cloudwatch(metrics, model_name="IsolationForest")
>>> cloudwatch.put_metric_data(
... Namespace="AnomalyDetection",
... MetricData=cw_metrics
... )
"""
if timestamp is None:
timestamp = datetime.now(UTC)
cloudwatch_metrics = []
# Aggregate across all metrics
aggregated = {}
for metrics in metrics_list:
for key, value in metrics.items():
if key not in aggregated:
aggregated[key] = []
aggregated[key].append(value)
# Create CloudWatch metric for each aggregated metric
for metric_name, values in aggregated.items():
metric_data = {
"MetricName": metric_name,
"Timestamp": timestamp,
"Value": float(np.mean(values)),
"Unit": "None",
}
if model_name:
metric_data["Dimensions"] = [{"Name": "ModelName", "Value": model_name}]
cloudwatch_metrics.append(metric_data)
# Add statistics
if len(values) > 1:
metric_data_min = {
"MetricName": f"{metric_name}_Min",
"Timestamp": timestamp,
"Value": float(np.min(values)),
"Unit": "None",
}
metric_data_max = {
"MetricName": f"{metric_name}_Max",
"Timestamp": timestamp,
"Value": float(np.max(values)),
"Unit": "None",
}
if model_name:
metric_data_min["Dimensions"] = [
{"Name": "ModelName", "Value": model_name}
]
metric_data_max["Dimensions"] = [
{"Name": "ModelName", "Value": model_name}
]
cloudwatch_metrics.extend([metric_data_min, metric_data_max])
return cloudwatch_metrics
[docs]
class ModelPerformanceTracker:
"""Track model performance over time for monitoring and alerting.
Maintains a rolling window of performance metrics and can detect
degradation or drift.
Attributes:
window_size: Number of recent predictions to keep in window
metrics_history: DataFrame with historical metrics
"""
def __init__(self, window_size: int = 1000, model_name: str | None = None) -> None:
"""Initialize performance tracker.
Args:
window_size: Number of recent samples to track
model_name: Optional model name for identification
"""
self.window_size = window_size
self.model_name = model_name
self._metrics_rows: list[dict[str, Any]] = []
self.metrics_history: pd.DataFrame = pd.DataFrame()
self.score_history: list[float] = []
self.label_history: list[int] = []
self.true_label_history: list[int] = []
[docs]
def update(
self,
scores: np.ndarray | pd.Series,
predicted_labels: np.ndarray | pd.Series,
true_labels: np.ndarray | pd.Series | None = None,
timestamp: datetime | None = None,
) -> dict[str, float]:
"""Update tracker with new predictions.
Args:
scores: Anomaly scores
predicted_labels: Predicted binary labels
true_labels: Optional ground truth labels
timestamp: Optional timestamp for this update
Returns:
Current performance metrics
"""
if timestamp is None:
timestamp = datetime.now(UTC)
# Convert to arrays
if isinstance(scores, pd.Series):
scores_array = scores.values
else:
scores_array = np.asarray(scores)
if isinstance(predicted_labels, pd.Series):
labels_array = predicted_labels.values
else:
labels_array = np.asarray(predicted_labels)
# Update history (keep only recent window)
self.score_history.extend(scores_array.tolist())
self.label_history.extend(labels_array.tolist())
if len(self.score_history) > self.window_size:
self.score_history = self.score_history[-self.window_size :]
self.label_history = self.label_history[-self.window_size :]
# Compute metrics if true labels provided
if true_labels is not None:
if isinstance(true_labels, pd.Series):
true_array = true_labels.values
else:
true_array = np.asarray(true_labels)
self.true_label_history.extend(true_array.tolist())
if len(self.true_label_history) > self.window_size:
self.true_label_history = self.true_label_history[-self.window_size :]
metrics = compute_performance_metrics(
np.array(self.true_label_history),
np.array(self.label_history),
np.array(self.score_history),
)
else:
# Just track score statistics
metrics = {
"mean_score": float(np.mean(self.score_history)),
"max_score": float(np.max(self.score_history)),
"min_score": float(np.min(self.score_history)),
"score_std": float(np.std(self.score_history)),
"anomaly_rate": float(np.mean(self.label_history)),
}
# Add timestamp
metrics["timestamp"] = timestamp
# Update history (avoid pd.concat in loop; use list + single DataFrame build)
self._metrics_rows.append(metrics)
if len(self._metrics_rows) > self.window_size:
self._metrics_rows = self._metrics_rows[-self.window_size :]
self.metrics_history = pd.DataFrame(self._metrics_rows)
return metrics
[docs]
def get_current_metrics(self) -> dict[str, float]:
"""Get current performance metrics.
Returns:
Dictionary with latest metrics
"""
if self.metrics_history.empty:
return {}
return self.metrics_history.iloc[-1].to_dict()
[docs]
def detect_degradation(
self,
baseline_metrics: dict[str, float],
threshold: float = DEFAULT_MODEL_METRIC_DEGRADATION_THRESHOLD,
) -> bool:
"""Detect if performance has degraded compared to baseline.
Args:
baseline_metrics: Baseline metrics (e.g., from training)
threshold: Relative degradation threshold (default 0.1 = 10%)
Returns:
True if degradation detected
"""
current = self.get_current_metrics()
if "f1" in baseline_metrics and "f1" in current:
baseline_f1 = baseline_metrics["f1"]
current_f1 = current["f1"]
degradation = (
(baseline_f1 - current_f1) / baseline_f1 if baseline_f1 > 0 else 0.0
)
return degradation > threshold
return False