Source code for anomsmith.primitives.detectors.autoencoders

"""Autoencoder-based anomaly detectors (optional TensorFlow / PyTorch).

Migrated from the former *Anomaly Detection Toolkit*. Install ``anomsmith[deep]`` for
Keras (LSTM) and/or PyTorch backends.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

from anomsmith.objects.views import LabelView, ScoreView
from anomsmith.primitives.base import BaseDetector
from anomsmith.primitives.detectors._utils import extract_index_and_values

if TYPE_CHECKING:
    try:
        from timesmith.typing import SeriesLike
    except ImportError:
        SeriesLike = None

logger = logging.getLogger(__name__)

try:
    import torch
    import torch.nn as nn

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False
    torch = None  # type: ignore[assignment,misc]
    nn = None  # type: ignore[assignment,misc]

try:
    from keras.callbacks import EarlyStopping
    from keras.layers import LSTM, Dense, RepeatVector, TimeDistributed
    from keras.models import Sequential

    KERAS_AVAILABLE = True
except ImportError:
    KERAS_AVAILABLE = False
    EarlyStopping = None  # type: ignore[assignment,misc]
    LSTM = None  # type: ignore[assignment,misc]
    Dense = None  # type: ignore[assignment,misc]
    RepeatVector = None  # type: ignore[assignment,misc]
    TimeDistributed = None  # type: ignore[assignment,misc]
    Sequential = None  # type: ignore[assignment,misc]


def _univariate_series(
    y: np.ndarray | pd.Series | SeriesLike,
) -> tuple[pd.Index, np.ndarray]:
    """Return index and 1D float values for univariate input."""
    if isinstance(y, pd.Series):
        index, values = extract_index_and_values(y)
        flat = np.asarray(values, dtype=float).ravel()
        return index, flat
    arr = np.asarray(y, dtype=float)
    if arr.ndim > 1 and arr.shape[1] > 1:
        raise ValueError("Autoencoder detectors only support univariate series.")
    flat = arr.ravel()
    index = pd.RangeIndex(start=0, stop=flat.shape[0])
    return index, flat


[docs] class LSTMAutoencoderDetector(BaseDetector): """LSTM autoencoder: high reconstruction error ⇒ anomaly (univariate only).""" def __init__( self, window_size: int = 20, lstm_units: list[int] | None = None, contamination: float = 0.05, threshold_std: float = 3.0, epochs: int = 50, batch_size: int = 32, random_state: int | None = None, ) -> None: units = lstm_units or [32, 16] self.window_size = window_size self.lstm_units = units self.contamination = contamination self.threshold_std = threshold_std self.epochs = epochs self.batch_size = batch_size self.model_: Sequential | None = None # type: ignore[valid-type] self.scaler_ = MinMaxScaler() self.reconstruction_errors_: np.ndarray | None = None super().__init__( window_size=window_size, lstm_units=units, contamination=contamination, threshold_std=threshold_std, epochs=epochs, batch_size=batch_size, random_state=random_state, ) def _create_windows(self, data: np.ndarray) -> np.ndarray: if len(data) < self.window_size: return np.empty((0, self.window_size, 1)) if data.ndim > 1: data = data.flatten() n_windows = len(data) - self.window_size + 1 indices = np.arange(self.window_size) + np.arange(n_windows)[:, np.newaxis] windows = data[indices] return windows[:, :, np.newaxis] def _create_model(self, input_shape: tuple[int, int]) -> Sequential: # type: ignore[valid-type] if not KERAS_AVAILABLE or Sequential is None or LSTM is None: raise ImportError("Keras/TensorFlow is required for LSTMAutoencoderDetector.") if Dense is None or RepeatVector is None or TimeDistributed is None: raise ImportError("Keras/TensorFlow is required for LSTMAutoencoderDetector.") model = Sequential( [ LSTM( self.lstm_units[0], activation="relu", input_shape=input_shape, return_sequences=True, ), LSTM(self.lstm_units[1], activation="relu", return_sequences=False), RepeatVector(self.window_size), LSTM(self.lstm_units[1], activation="relu", return_sequences=True), LSTM(self.lstm_units[0], activation="relu", return_sequences=True), TimeDistributed(Dense(1)), ] ) model.compile(optimizer="adam", loss="mse") return model def _raw_window_scores(self, flat: np.ndarray) -> np.ndarray: if self.model_ is None: raise ValueError("LSTMAutoencoderDetector must be fitted before scoring.") X_scaled = self.scaler_.transform(flat.reshape(-1, 1)).flatten() X_windows = self._create_windows(X_scaled) if len(X_windows) == 0: return np.array([]) X_pred = self.model_.predict(X_windows, verbose=0) return np.mean(np.abs(X_windows - X_pred), axis=(1, 2))
[docs] def fit( self, y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | None = None, ) -> LSTMAutoencoderDetector: if not KERAS_AVAILABLE or Sequential is None or EarlyStopping is None: raise ImportError( "Keras/TensorFlow is required for LSTMAutoencoderDetector. " "Install with: pip install 'anomsmith[deep]'" ) _, flat = _univariate_series(y) X_scaled = self.scaler_.fit_transform(flat.reshape(-1, 1)).flatten() X_windows = self._create_windows(X_scaled) if len(X_windows) == 0: raise ValueError(f"Series too short for window_size={self.window_size}") self.model_ = self._create_model((self.window_size, 1)) early_stopping = EarlyStopping( monitor="val_loss", patience=5, restore_best_weights=True, verbose=0 ) self.model_.fit( X_windows, X_windows, epochs=self.epochs, batch_size=self.batch_size, validation_split=0.2, callbacks=[early_stopping], verbose=0, ) self._fitted = True logger.debug("Fitted LSTMAutoencoderDetector") return self
[docs] def score(self, y: np.ndarray | pd.Series | SeriesLike) -> ScoreView: self._check_fitted() index, flat = _univariate_series(y) re = self._raw_window_scores(flat) self.reconstruction_errors_ = re full: np.ndarray = np.zeros(len(flat), dtype=float) if re.size > 0: full[self.window_size - 1 :] = re return ScoreView(index=index, scores=full)
[docs] def predict(self, y: np.ndarray | pd.Series | SeriesLike) -> LabelView: sv = self.score(y) s = np.asarray(sv.scores, dtype=float) active = s[self.window_size - 1 :] if active.size == 0: return LabelView(index=sv.index, labels=np.zeros(len(s), dtype=int)) mean = float(np.mean(active)) std = float(np.std(active)) std = std if std > 0 else 1.0 thr = mean + self.threshold_std * std labels = (s > thr).astype(int) return LabelView(index=sv.index, labels=labels)
[docs] class PyTorchAutoencoderDetector(BaseDetector): """Feedforward autoencoder on sliding windows (PyTorch, univariate only).""" def __init__( self, window_size: int = 24, hidden_dims: list[int] | None = None, learning_rate: float = 1e-3, epochs: int = 200, batch_size: int = 32, threshold_std: float = 3.0, random_state: int | None = None, ) -> None: dims = hidden_dims or [64, 16, 4] self.window_size = window_size self.hidden_dims = dims self.learning_rate = learning_rate self.epochs = epochs self.batch_size = batch_size self.threshold_std = threshold_std if TORCH_AVAILABLE and torch is not None: self.device_ = torch.device("cuda" if torch.cuda.is_available() else "cpu") else: self.device_ = None self.model_: nn.Module | None = None self.reconstruction_errors_: np.ndarray | None = None self.X_mean_: float | None = None self.X_std_: float | None = None if TORCH_AVAILABLE and torch is not None and random_state is not None: torch.manual_seed(random_state) super().__init__( window_size=window_size, hidden_dims=dims, learning_rate=learning_rate, epochs=epochs, batch_size=batch_size, threshold_std=threshold_std, random_state=random_state, ) def _create_windows(self, data: np.ndarray) -> np.ndarray: if len(data) < self.window_size: return np.empty((0, self.window_size)) n_windows = len(data) - self.window_size + 1 shape = (n_windows, self.window_size) strides = (data.strides[0], data.strides[0]) return np.lib.stride_tricks.as_strided( data, shape=shape, strides=strides, writeable=False ).copy() def _create_model(self, input_dim: int) -> nn.Module: if not TORCH_AVAILABLE or torch is None or nn is None: raise ImportError("PyTorch is required.") class Autoencoder(nn.Module): def __init__(self, in_dim: int, hidden_dims: list[int]) -> None: super().__init__() encoder_layers: list[nn.Module] = [] prev_dim = in_dim for hidden_dim in hidden_dims: encoder_layers.extend([nn.Linear(prev_dim, hidden_dim), nn.ReLU()]) prev_dim = hidden_dim decoder_layers: list[nn.Module] = [] for i in range(len(hidden_dims) - 2, -1, -1): decoder_layers.extend( [nn.Linear(hidden_dims[i + 1], hidden_dims[i]), nn.ReLU()] ) decoder_layers.append(nn.Linear(hidden_dims[0], in_dim)) self.encoder = nn.Sequential(*encoder_layers) self.decoder = nn.Sequential(*decoder_layers) def forward(self, x: torch.Tensor) -> torch.Tensor: z = self.encoder(x) return self.decoder(z) return Autoencoder(input_dim, self.hidden_dims).to(self.device_)
[docs] def fit( self, y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | None = None, ) -> PyTorchAutoencoderDetector: if not TORCH_AVAILABLE or torch is None or nn is None: raise ImportError( "PyTorch is required for PyTorchAutoencoderDetector. " "Install with: pip install 'anomsmith[deep]'" ) if self.device_ is None: self.device_ = torch.device("cuda" if torch.cuda.is_available() else "cpu") _, flat = _univariate_series(y) x_mean = float(np.mean(flat)) x_std = float(np.std(flat)) x_std = x_std if x_std > 0 else 1.0 X_normalized = (flat - x_mean) / x_std X_windows = self._create_windows(X_normalized) if len(X_windows) == 0: raise ValueError(f"Series too short for window_size={self.window_size}") n = len(X_windows) lo, hi = int(0.1 * n), int(0.9 * n) X_train = X_windows[lo:hi] self.model_ = self._create_model(self.window_size) optimizer = torch.optim.Adam(self.model_.parameters(), lr=self.learning_rate) loss_fn = nn.MSELoss() from torch.utils.data import DataLoader, TensorDataset dataset = TensorDataset(torch.from_numpy(X_train).float()) dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True) self.model_.train() for _ in range(self.epochs): for (batch,) in dataloader: batch = batch.to(self.device_) optimizer.zero_grad() recon = self.model_(batch) loss = loss_fn(recon, batch) loss.backward() optimizer.step() self.X_mean_ = x_mean self.X_std_ = x_std self._fitted = True logger.debug("Fitted PyTorchAutoencoderDetector") return self
[docs] def score(self, y: np.ndarray | pd.Series | SeriesLike) -> ScoreView: self._check_fitted() if ( self.model_ is None or self.X_mean_ is None or self.X_std_ is None or torch is None ): raise ValueError("PyTorchAutoencoderDetector is not fitted.") index, flat = _univariate_series(y) x_std = self.X_std_ if self.X_std_ > 0 else 1.0 X_normalized = (flat - self.X_mean_) / x_std X_windows = self._create_windows(X_normalized) full: np.ndarray = np.zeros(len(flat), dtype=float) if len(X_windows) == 0: return ScoreView(index=index, scores=full) self.model_.eval() with torch.no_grad(): X_tensor = torch.from_numpy(X_windows).float().to(self.device_) X_recon = self.model_(X_tensor).cpu().numpy() reconstruction_errors = np.mean((X_windows - X_recon) ** 2, axis=1) self.reconstruction_errors_ = reconstruction_errors full[self.window_size - 1 :] = reconstruction_errors return ScoreView(index=index, scores=full)
[docs] def predict(self, y: np.ndarray | pd.Series | SeriesLike) -> LabelView: sv = self.score(y) s = np.asarray(sv.scores, dtype=float) active = s[self.window_size - 1 :] if active.size == 0: return LabelView(index=sv.index, labels=np.zeros(len(s), dtype=int)) mean = float(np.mean(active)) std = float(np.std(active)) std = std if std > 0 else 1.0 thr = mean + self.threshold_std * std labels = (s > thr).astype(int) return LabelView(index=sv.index, labels=labels)