Source code for anomsmith.primitives.detectors.pca

"""PCA-based anomaly detection detector.

Uses Principal Component Analysis to model healthy operation boundaries.
Anomalies are detected using either:
- Mahalanobis distance in the principal component space
- Reconstruction error (difference between original and reconstructed data)
"""

import logging
from typing import TYPE_CHECKING, Literal, Optional, Union

import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from anomsmith.objects.views import LabelView, ScoreView
from anomsmith.primitives.base import BaseDetector

if TYPE_CHECKING:
    try:
        from timesmith.typing import SeriesLike
    except ImportError:
        SeriesLike = None

logger = logging.getLogger(__name__)


[docs] class PCADetector(BaseDetector): """PCA-based anomaly detector. Uses Principal Component Analysis to model healthy operation boundaries. Anomalies are detected using either: - Mahalanobis distance in the principal component space - Reconstruction error (difference between original and reconstructed data) Args: n_components: Number of components to keep. If 0 < n_components < 1, select the number of components such that the amount of variance that needs to be explained is greater than the percentage specified. score_method: Method for computing anomaly scores: - 'reconstruction': Use reconstruction error - 'mahalanobis': Use Mahalanobis distance in PC space - 'both': Use both and return average contamination: Expected proportion of outliers in the data (used for threshold) random_state: Random state for reproducibility """ def __init__( self, n_components: Union[float, int] = 0.95, score_method: Literal["reconstruction", "mahalanobis", "both"] = "reconstruction", contamination: float = 0.05, random_state: Optional[int] = None, ) -> None: self.n_components = n_components self.score_method = score_method self.contamination = contamination self.random_state = random_state self.pca_: PCA | None = None self.scaler_ = StandardScaler() self.threshold_: float | None = None self.mean_: np.ndarray | None = None self.cov_: np.ndarray | None = None super().__init__( n_components=n_components, score_method=score_method, contamination=contamination, random_state=random_state, ) self._fitted = False
[docs] def fit( self, y: Union[np.ndarray, pd.Series, "SeriesLike"], X: np.ndarray | pd.DataFrame | None = None, ) -> "PCADetector": """Fit the PCA detector on healthy operation data. Args: y: Training data (target) X: Optional features (if None, uses y) Returns: Self for method chaining """ # Use X if provided, otherwise use y if X is not None: if isinstance(X, pd.DataFrame): X_data = X.values else: X_data = X if X_data.ndim == 1: X_data = X_data.reshape(-1, 1) else: if isinstance(y, pd.Series): X_data = y.values.reshape(-1, 1) else: X_data = y.reshape(-1, 1) if y.ndim == 1 else y X_scaled = self.scaler_.fit_transform(X_data) # Fit PCA self.pca_ = PCA(n_components=self.n_components, random_state=self.random_state) X_pca = self.pca_.fit_transform(X_scaled) # Compute statistics in PC space for Mahalanobis distance self.mean_ = np.mean(X_pca, axis=0) self.cov_ = np.cov(X_pca.T) # Compute threshold based on training data scores = self._compute_scores(X_scaled, X_pca) self.threshold_ = np.percentile(scores, 100 * (1 - self.contamination)) self._fitted = True logger.debug(f"Fitted PCADetector: threshold={self.threshold_}") return self
[docs] def predict(self, y: np.ndarray | pd.Series) -> LabelView: """Predict anomaly labels. Args: y: Time series to detect anomalies in Returns: LabelView with binary labels (1 = anomaly, 0 = normal) """ score_view = self.score(y) if self.threshold_ is None: raise ValueError("Detector must be fitted before prediction.") labels = (score_view.scores > self.threshold_).astype(int) return LabelView(index=score_view.index, labels=labels)
[docs] def score(self, y: np.ndarray | pd.Series) -> ScoreView: """Score anomalies. Args: y: Time series to score Returns: ScoreView with anomaly scores """ self._check_fitted() if isinstance(y, pd.Series): index = y.index values = y.values.reshape(-1, 1) else: index = pd.RangeIndex(start=0, stop=len(y)) values = y.reshape(-1, 1) if y.ndim == 1 else y X_scaled = self.scaler_.transform(values) X_pca = self.pca_.transform(X_scaled) # type: ignore scores = self._compute_scores(X_scaled, X_pca) return ScoreView(index=index, scores=scores)
def _compute_scores(self, X_scaled: np.ndarray, X_pca: np.ndarray) -> np.ndarray: """Compute anomaly scores using specified method. Args: X_scaled: Scaled input data X_pca: Data transformed to principal component space Returns: Anomaly scores """ if self.score_method == "reconstruction": # Reconstruction error X_reconstructed = self.pca_.inverse_transform(X_pca) # type: ignore reconstruction_error = np.sum((X_scaled - X_reconstructed) ** 2, axis=1) return reconstruction_error elif self.score_method == "mahalanobis": # Mahalanobis distance in PC space if self.mean_ is None or self.cov_ is None: raise ValueError("PCA must be fitted before computing Mahalanobis distance.") # Compute Mahalanobis distance diff = X_pca - self.mean_ try: inv_cov = np.linalg.inv(self.cov_) mahalanobis_dist = np.sqrt(np.sum(diff @ inv_cov * diff, axis=1)) except np.linalg.LinAlgError: # If covariance is singular, use pseudo-inverse inv_cov = np.linalg.pinv(self.cov_) mahalanobis_dist = np.sqrt(np.sum(diff @ inv_cov * diff, axis=1)) return mahalanobis_dist elif self.score_method == "both": # Average of both methods recon_scores = self._compute_scores_with_method(X_scaled, X_pca, "reconstruction") maha_scores = self._compute_scores_with_method(X_scaled, X_pca, "mahalanobis") # Normalize and average recon_norm = (recon_scores - recon_scores.min()) / ( recon_scores.max() - recon_scores.min() + 1e-10 ) maha_norm = (maha_scores - maha_scores.min()) / ( maha_scores.max() - maha_scores.min() + 1e-10 ) return (recon_norm + maha_norm) / 2 else: raise ValueError(f"Unknown score_method: {self.score_method}") def _compute_scores_with_method( self, X_scaled: np.ndarray, X_pca: np.ndarray, method: str ) -> np.ndarray: """Compute scores with a specific method.""" if method == "reconstruction": X_reconstructed = self.pca_.inverse_transform(X_pca) # type: ignore return np.sum((X_scaled - X_reconstructed) ** 2, axis=1) elif method == "mahalanobis": if self.mean_ is None or self.cov_ is None: raise ValueError("PCA must be fitted before computing Mahalanobis distance.") diff = X_pca - self.mean_ try: inv_cov = np.linalg.inv(self.cov_) return np.sqrt(np.sum(diff @ inv_cov * diff, axis=1)) except np.linalg.LinAlgError: inv_cov = np.linalg.pinv(self.cov_) return np.sqrt(np.sum(diff @ inv_cov * diff, axis=1)) else: raise ValueError(f"Unknown method: {method}")