"""Statistical anomaly detection scorers."""
import logging
from typing import TYPE_CHECKING, Union
import numpy as np
import pandas as pd
from anomsmith.constants import (
DEFAULT_UNIT_SCALE_FOR_ZERO_SPREAD,
IQR_BINARY_INLIER_SCORE,
IQR_BINARY_OUTLIER_SCORE,
NUMERICAL_EPSILON,
TUKEY_FENCE_IQR_MULTIPLIER,
)
from anomsmith.objects.views import ScoreView
from anomsmith.primitives.base import BaseScorer
if TYPE_CHECKING:
try:
from timesmith.typing import SeriesLike
except ImportError:
SeriesLike = None
logger = logging.getLogger(__name__)
[docs]
class ZScoreScorer(BaseScorer):
"""Z-score based anomaly scorer.
Computes absolute Z-scores relative to mean and standard deviation.
Higher scores indicate more anomalous points.
Args:
n_std: Number of standard deviations (used for thresholding, not scoring)
random_state: Random state for reproducibility (not used, kept for compatibility)
"""
def __init__(self, n_std: float = 3.0, random_state: int | None = None) -> None:
self.n_std = n_std
self.random_state = random_state
self.mean_: np.ndarray | None = None
self.std_: np.ndarray | None = None
super().__init__(n_std=n_std, random_state=random_state)
self._fitted = False
[docs]
def fit(
self,
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame | None = None,
) -> "ZScoreScorer":
"""Fit the scorer by computing mean and standard deviation.
Args:
y: Training data
X: Optional features (not used)
Returns:
Self for method chaining
"""
if isinstance(y, pd.Series):
values = y.values
else:
values = y
# Handle 1D and 2D cases
if values.ndim == 1:
values = values.reshape(-1, 1)
self.mean_ = np.mean(values, axis=0)
self.std_ = np.std(values, axis=0)
# Avoid division by zero
self.std_ = np.where(self.std_ == 0, 1.0, self.std_)
self._fitted = True
logger.debug(f"Fitted ZScoreScorer: mean={self.mean_}, std={self.std_}")
return self
[docs]
def score(self, y: Union[np.ndarray, pd.Series, "SeriesLike"]) -> ScoreView:
"""Score anomalies using Z-scores.
Args:
y: Time series to score
Returns:
ScoreView with absolute Z-scores
"""
self._check_fitted()
if isinstance(y, pd.Series):
index = y.index
values = y.values
else:
index = pd.RangeIndex(start=0, stop=len(y))
values = y
# Handle 1D and 2D cases
if values.ndim == 1:
values = values.reshape(-1, 1)
z_scores = np.abs((values - self.mean_) / self.std_)
# Return maximum Z-score across features
if z_scores.ndim > 1:
scores = np.max(z_scores, axis=1)
else:
scores = z_scores.flatten()
return ScoreView(index=index, scores=scores)
[docs]
class IQRScorer(BaseScorer):
"""Interquartile Range (IQR) based outlier scorer.
Computes outlier scores based on IQR bounds.
Higher scores indicate more anomalous points.
Args:
factor: IQR multiplier for outlier bounds (default: 1.5)
random_state: Random state for reproducibility (not used, kept for compatibility)
"""
def __init__(
self,
factor: float = TUKEY_FENCE_IQR_MULTIPLIER,
random_state: int | None = None,
) -> None:
self.factor = factor
self.random_state = random_state
self.q1_: np.ndarray | None = None
self.q3_: np.ndarray | None = None
self.iqr_: np.ndarray | None = None
super().__init__(factor=factor, random_state=random_state)
self._fitted = False
[docs]
def fit(
self,
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame | None = None,
) -> "IQRScorer":
"""Fit the scorer by computing quartiles.
Args:
y: Training data
X: Optional features (not used)
Returns:
Self for method chaining
"""
if isinstance(y, pd.Series):
values = y.values
else:
values = y
# Handle 1D and 2D cases
if values.ndim == 1:
values = values.reshape(-1, 1)
self.q1_ = np.percentile(values, 25, axis=0)
self.q3_ = np.percentile(values, 75, axis=0)
self.iqr_ = self.q3_ - self.q1_
# Avoid division by zero
self.iqr_ = np.where(
self.iqr_ == 0, DEFAULT_UNIT_SCALE_FOR_ZERO_SPREAD, self.iqr_
)
self._fitted = True
logger.debug(f"Fitted IQRScorer: q1={self.q1_}, q3={self.q3_}")
return self
[docs]
def score(self, y: np.ndarray | pd.Series) -> ScoreView:
"""Score anomalies using IQR bounds.
Args:
y: Time series to score
Returns:
ScoreView with IQR-based scores
"""
self._check_fitted()
if isinstance(y, pd.Series):
index = y.index
values = y.values
else:
index = pd.RangeIndex(start=0, stop=len(y))
values = y
# Handle 1D and 2D cases
if values.ndim == 1:
values = values.reshape(-1, 1)
# Check if outside bounds for any feature
lower_bound = self.q1_ - self.factor * self.iqr_
upper_bound = self.q3_ + self.factor * self.iqr_
outlier_mask = (values < lower_bound) | (values > upper_bound)
if outlier_mask.ndim > 1:
# Vectorized scoring: distance from bounds for all samples
# Compute distances for all samples at once
dist_lower = np.maximum(0, lower_bound - values) # (n_samples, n_features)
dist_upper = np.maximum(0, values - upper_bound) # (n_samples, n_features)
# Max distance across features for each sample
max_dist_lower = np.max(dist_lower, axis=1) # (n_samples,)
max_dist_upper = np.max(dist_upper, axis=1) # (n_samples,)
# Check if any feature is outside bounds for each sample
is_outlier = np.any(outlier_mask, axis=1) # (n_samples,)
# For outliers: use max distance from bounds
# For normal: use negative min distance to nearest bound
outlier_scores = np.maximum(max_dist_lower, max_dist_upper)
normal_scores = -np.minimum(max_dist_lower, max_dist_upper)
# Normalize by max IQR
scale = self.iqr_.max() + NUMERICAL_EPSILON
scores = np.where(is_outlier, outlier_scores / scale, normal_scores / scale)
else:
# 1D case: simple binary scoring (vectorized)
scores = np.where(
outlier_mask, IQR_BINARY_OUTLIER_SCORE, IQR_BINARY_INLIER_SCORE
).astype(float)
return ScoreView(index=index, scores=scores)