Source code for anomsmith.primitives.scorers.seasonal

"""Seasonal baseline anomaly scorer.

Detects anomalies by comparing values to seasonal baselines (e.g., weekly, monthly patterns).
"""

import logging
from typing import TYPE_CHECKING, Literal, Union

import numpy as np
import pandas as pd

from anomsmith.objects.views import ScoreView
from anomsmith.primitives.base import BaseScorer

if TYPE_CHECKING:
    try:
        from timesmith.typing import PanelLike, SeriesLike
    except ImportError:
        SeriesLike = None
        PanelLike = None

logger = logging.getLogger(__name__)


[docs] class SeasonalBaselineScorer(BaseScorer): """Seasonal baseline anomaly scorer. Calculates seasonal baselines (e.g., weekly, monthly) and scores points that deviate significantly from expected seasonal patterns. Args: seasonality: Seasonality to use. Options: 'week', 'month', 'day', 'hour'. random_state: Random state for reproducibility (not used, kept for compatibility) """ def __init__( self, seasonality: Literal["week", "month", "day", "hour"] = "week", random_state: int | None = None, ) -> None: """Initialize SeasonalBaselineScorer. Args: seasonality: Seasonality to use ('week', 'month', 'day', 'hour') random_state: Random state (not used, kept for compatibility) """ self.seasonality = seasonality self.random_state = random_state self.seasonal_stats_: pd.DataFrame | None = None super().__init__(seasonality=seasonality, random_state=random_state) self._fitted = False def _get_seasonal_key(self, dates: pd.DatetimeIndex) -> pd.Series: """Extract seasonal key from dates based on seasonality type.""" seasonality_map = { "week": lambda d: d.isocalendar().week, "month": lambda d: d.month, "day": lambda d: d.dayofyear, "hour": lambda d: d.hour, } if self.seasonality not in seasonality_map: raise ValueError(f"Unknown seasonality: {self.seasonality}") return pd.Series(seasonality_map[self.seasonality](dates), index=dates)
[docs] def fit( self, y: Union[np.ndarray, pd.Series, "SeriesLike"], X: Union[np.ndarray, pd.DataFrame, "PanelLike", None] = None, ) -> "SeasonalBaselineScorer": """Fit the scorer by computing seasonal baselines. Args: y: Time series with datetime index X: Optional features (not used) Returns: Self for method chaining """ if isinstance(y, pd.Series): index = y.index values = y.values else: # For numpy arrays, create a default datetime index index = pd.date_range("2020-01-01", periods=len(y), freq="D") values = y if not isinstance(index, pd.DatetimeIndex): raise ValueError( "SeasonalBaselineScorer requires a DatetimeIndex. " f"Got {type(index)} instead." ) # Compute seasonal statistics df = pd.DataFrame({"value": values, "date": index}) df["seasonal_key"] = self._get_seasonal_key(index) seasonal_stats = ( df.groupby("seasonal_key").agg({"value": ["mean", "std"]}).reset_index() ) seasonal_stats.columns = ["seasonal_key", "mean", "std"] seasonal_stats["std"] = seasonal_stats["std"].fillna(0) seasonal_stats["std"] = seasonal_stats["std"].replace( 0, 1.0 ) # Avoid division by zero self.seasonal_stats_ = seasonal_stats self._fitted = True logger.debug( f"Fitted SeasonalBaselineScorer with {len(seasonal_stats)} seasonal periods" ) return self
[docs] def score(self, y: Union[np.ndarray, pd.Series, "SeriesLike"]) -> ScoreView: """Score anomalies using seasonal baseline. Args: y: Time series to score Returns: ScoreView with seasonal z-scores """ self._check_fitted() if isinstance(y, pd.Series): index = y.index values = y.values else: index = pd.RangeIndex(start=0, stop=len(y)) values = y if not isinstance(index, pd.DatetimeIndex): raise ValueError( "SeasonalBaselineScorer requires a DatetimeIndex. " f"Got {type(index)} instead." ) # Get seasonal keys for scoring data seasonal_keys = self._get_seasonal_key(index) # Merge with seasonal stats - vectorized pandas operation df = pd.DataFrame({"value": values, "seasonal_key": seasonal_keys}, index=index) df = df.merge(self.seasonal_stats_, on="seasonal_key", how="left") # Compute Z-scores relative to seasonal baseline - vectorized z_scores = np.abs((df["value"] - df["mean"]) / df["std"]) scores = z_scores.fillna(0).values return ScoreView(index=index, scores=scores)