Source code for anomsmith.primitives.detectors.ml

"""Machine learning based anomaly detectors.

These detectors use sklearn models and must be in Layer 2 (primitives).
sklearn is allowed in Layer 2 as it's a core ML library.
"""

import logging

import numpy as np
import pandas as pd
from sklearn.covariance import EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler

from anomsmith.constants import (
    DEFAULT_ELLIPTIC_ENVELOPE_SUPPORT_FRACTION,
    DEFAULT_ISOLATION_FOREST_N_ESTIMATORS,
    DEFAULT_LOF_N_NEIGHBORS,
    DEFAULT_OUTLIER_CONTAMINATION,
)
from anomsmith.objects.views import LabelView, ScoreView
from anomsmith.primitives.base import BaseDetector
from anomsmith.primitives.detectors._utils import (
    extract_index_and_values,
    prepare_input_data,
)

logger = logging.getLogger(__name__)


[docs] class IsolationForestDetector(BaseDetector): """Isolation Forest anomaly detector. Isolation Forest is an ensemble method that isolates anomalies by randomly selecting features and splitting values. Args: contamination: Expected proportion of outliers in the data n_estimators: Number of base estimators random_state: Random state for reproducibility n_jobs: Number of jobs to run in parallel """ def __init__( self, contamination: float = DEFAULT_OUTLIER_CONTAMINATION, n_estimators: int = DEFAULT_ISOLATION_FOREST_N_ESTIMATORS, random_state: int | None = None, n_jobs: int = -1, ) -> None: self.contamination = contamination self.n_estimators = n_estimators self.random_state = random_state self.n_jobs = n_jobs self.model_: IsolationForest | None = None self.scaler_ = StandardScaler() super().__init__( contamination=contamination, n_estimators=n_estimators, random_state=random_state, n_jobs=n_jobs, ) self._fitted = False
[docs] def fit( self, y: np.ndarray | pd.Series, X: np.ndarray | pd.DataFrame | None = None, ) -> "IsolationForestDetector": """Fit the Isolation Forest detector. Args: y: Training data (target) X: Optional features (if None, uses y) Returns: Self for method chaining """ X_data = prepare_input_data(y, X) X_scaled = self.scaler_.fit_transform(X_data) self.model_ = IsolationForest( contamination=self.contamination, n_estimators=self.n_estimators, random_state=self.random_state, n_jobs=self.n_jobs, ) self.model_.fit(X_scaled) self._fitted = True logger.debug("Fitted IsolationForestDetector") return self
[docs] def predict(self, y: np.ndarray | pd.Series) -> LabelView: """Predict anomaly labels. Args: y: Time series to detect anomalies in Returns: LabelView with binary labels (1 = anomaly, 0 = normal) """ self._check_fitted() index, values = extract_index_and_values(y) X_data = prepare_input_data(values) X_scaled = self.scaler_.transform(X_data) predictions = self.model_.predict( X_scaled ) # Returns -1 (anomaly) or 1 (normal) # Convert to 0/1 (anomsmith convention) labels = (predictions == -1).astype(int) return LabelView(index=index, labels=labels)
[docs] def score(self, y: np.ndarray | pd.Series) -> ScoreView: """Score anomalies. Args: y: Time series to score Returns: ScoreView with anomaly scores """ self._check_fitted() index, values = extract_index_and_values(y) X_data = prepare_input_data(values) X_scaled = self.scaler_.transform(X_data) scores = self.model_.decision_function(X_scaled) # Invert so higher scores = more anomalous scores = -scores return ScoreView(index=index, scores=scores)
[docs] class LOFDetector(BaseDetector): """Local Outlier Factor (LOF) anomaly detector. LOF measures the local deviation of density of a given sample with respect to its neighbors. Args: contamination: Expected proportion of outliers in the data n_neighbors: Number of neighbors to use random_state: Random state for reproducibility n_jobs: Number of jobs to run in parallel """ def __init__( self, contamination: float = DEFAULT_OUTLIER_CONTAMINATION, n_neighbors: int = DEFAULT_LOF_N_NEIGHBORS, random_state: int | None = None, n_jobs: int = -1, ) -> None: self.contamination = contamination self.n_neighbors = n_neighbors self.random_state = random_state self.n_jobs = n_jobs self.model_: LocalOutlierFactor | None = None self.scaler_ = StandardScaler() super().__init__( contamination=contamination, n_neighbors=n_neighbors, random_state=random_state, n_jobs=n_jobs, ) self._fitted = False
[docs] def fit( self, y: np.ndarray | pd.Series, X: np.ndarray | pd.DataFrame | None = None, ) -> "LOFDetector": """Fit the LOF detector. Args: y: Training data (target) X: Optional features (if None, uses y) Returns: Self for method chaining """ X_data = prepare_input_data(y, X) X_scaled = self.scaler_.fit_transform(X_data) # Set novelty=True to enable predict() method for new data self.model_ = LocalOutlierFactor( contamination=self.contamination, n_neighbors=self.n_neighbors, n_jobs=self.n_jobs, novelty=True, ) self.model_.fit(X_scaled) self._fitted = True logger.debug("Fitted LOFDetector") return self
[docs] def predict(self, y: np.ndarray | pd.Series) -> LabelView: """Predict anomaly labels. Args: y: Time series to detect anomalies in Returns: LabelView with binary labels (1 = anomaly, 0 = normal) """ self._check_fitted() index, values = extract_index_and_values(y) X_data = prepare_input_data(values) X_scaled = self.scaler_.transform(X_data) predictions = self.model_.predict( X_scaled ) # Returns -1 (anomaly) or 1 (normal) # Convert to 0/1 (anomsmith convention) labels = (predictions == -1).astype(int) return LabelView(index=index, labels=labels)
[docs] def score(self, y: np.ndarray | pd.Series) -> ScoreView: """Score anomalies. Uses score_samples() method from fitted LOF model with novelty=True to compute scores for new data without data leakage. Args: y: Time series to score Returns: ScoreView with anomaly scores """ self._check_fitted() index, values = extract_index_and_values(y) X_data = prepare_input_data(values) X_scaled = self.scaler_.transform(X_data) # Use score_samples() from fitted novelty=True model (no refitting = no leakage) # score_samples returns negative outlier factor, where lower = more anomalous scores = self.model_.score_samples(X_scaled) # type: ignore # Invert so higher scores = more anomalous (consistent with other detectors) scores = -scores return ScoreView(index=index, scores=scores)
[docs] class RobustCovarianceDetector(BaseDetector): """Robust Covariance (Elliptic Envelope) anomaly detector. Assumes that the data is Gaussian distributed and fits an elliptic envelope to the data. Args: contamination: Expected proportion of outliers in the data support_fraction: Proportion of points to be used as support random_state: Random state for reproducibility """ def __init__( self, contamination: float = DEFAULT_OUTLIER_CONTAMINATION, support_fraction: float = DEFAULT_ELLIPTIC_ENVELOPE_SUPPORT_FRACTION, random_state: int | None = None, ) -> None: self.contamination = contamination self.support_fraction = support_fraction self.random_state = random_state self.model_: EllipticEnvelope | None = None self.scaler_ = StandardScaler() super().__init__( contamination=contamination, support_fraction=support_fraction, random_state=random_state, ) self._fitted = False
[docs] def fit( self, y: np.ndarray | pd.Series, X: np.ndarray | pd.DataFrame | None = None, ) -> "RobustCovarianceDetector": """Fit the Robust Covariance detector. Args: y: Training data (target) X: Optional features (if None, uses y) Returns: Self for method chaining """ X_data = prepare_input_data(y, X) X_scaled = self.scaler_.fit_transform(X_data) self.model_ = EllipticEnvelope( contamination=self.contamination, support_fraction=self.support_fraction, random_state=self.random_state, ) self.model_.fit(X_scaled) self._fitted = True logger.debug("Fitted RobustCovarianceDetector") return self
[docs] def predict(self, y: np.ndarray | pd.Series) -> LabelView: """Predict anomaly labels. Args: y: Time series to detect anomalies in Returns: LabelView with binary labels (1 = anomaly, 0 = normal) """ self._check_fitted() index, values = extract_index_and_values(y) X_data = prepare_input_data(values) X_scaled = self.scaler_.transform(X_data) predictions = self.model_.predict( X_scaled ) # Returns -1 (anomaly) or 1 (normal) # Convert to 0/1 (anomsmith convention) labels = (predictions == -1).astype(int) return LabelView(index=index, labels=labels)
[docs] def score(self, y: np.ndarray | pd.Series) -> ScoreView: """Score anomalies. Args: y: Time series to score Returns: ScoreView with anomaly scores """ self._check_fitted() index, values = extract_index_and_values(y) X_data = prepare_input_data(values) X_scaled = self.scaler_.transform(X_data) scores = self.model_.decision_function(X_scaled) # Invert so higher scores = more anomalous scores = -scores return ScoreView(index=index, scores=scores)