API Reference

Anomsmith: Anomaly detection workflows that turn time series signals into actionable decisions.

class anomsmith.ARIMADriftDetector(order: tuple[int, int, int] = (1, 1, 1), threshold_std: float = 2.0, random_state: int | None = None)[source]

Bases: BaseDetector

ARIMA-based drift detector for time series.

Uses ARIMA forecasting to detect drift. If actual values diverge significantly from forecasts, the series is flagged as drifting.

Parameters:
  • order – ARIMA order (p, d, q). Default (1, 1, 1)

  • threshold_std – Number of standard deviations for drift threshold (default 2.0)

  • random_state – Random state for reproducibility (not used, kept for compatibility)

fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) ARIMADriftDetector[source]

Fit the ARIMA model on training data.

Parameters:
  • y – Training time series (1D)

  • X – Optional features (not used for ARIMA)

Returns:

Self for method chaining

predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]

Predict drift labels.

Parameters:

y – Time series to detect drift in

Returns:

LabelView with binary labels (1 = drift, 0 = normal)

score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]

Score drift using ARIMA residuals.

Parameters:

y – Time series to score

Returns:

ScoreView with drift scores (residual magnitudes)

class anomsmith.Alert(timestamp: datetime, level: AlertLevel, message: str, feature: str, value: float, threshold: float, asset_id: str | None = None, metadata: dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Represents a predictive maintenance alert.

asset_id: str | None = None
feature: str
level: AlertLevel
message: str
metadata: dict[str, Any]
threshold: float
timestamp: datetime
value: float
class anomsmith.AlertLevel(*values)[source]

Bases: Enum

Alert severity levels.

CRITICAL = 'critical'
FAILURE = 'failure'
INFO = 'info'
WARNING = 'warning'
class anomsmith.AlertSystem(thresholds: dict[str, dict[str, float]] | None = None, escalation_rules: dict[str, dict[str, Any]] | None = None)[source]

Bases: object

Alert system for predictive maintenance with escalation rules.

alert_counts: dict[str, int]
alert_history: deque
check_thresholds(features: ndarray | DataFrame | Series, feature_names: list[str] | None = None, timestamp: datetime | None = None, asset_id: str | None = None) list[Alert][source]

Check features against thresholds and generate alerts.

Parameters

featuresarray-like

Feature values to check. Can be single value, array, or DataFrame.

feature_nameslist of str, optional

Names of features. Required if features is array.

timestampdatetime, optional

Timestamp for alerts. Defaults to current time.

asset_idstr, optional

Asset identifier.

Returns

alertslist of Alert

List of generated alerts.

get_recent_alerts(n: int = 10, level: AlertLevel | None = None, asset_id: str | None = None) list[Alert][source]

Get recent alerts.

Parameters

nint, default=10

Number of recent alerts to return.

levelAlertLevel, optional

Filter by alert level.

asset_idstr, optional

Filter by asset ID.

Returns

alertslist of Alert

Recent alerts matching criteria.

class anomsmith.BaseDetector(**params: Any)[source]

Bases: BaseEstimator

Base class for anomaly detectors.

Detectors produce both scores and binary labels.

abstractmethod predict(y: ndarray | Series | SeriesLike) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary anomaly labels

abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.BaseScorer(**params: Any)[source]

Bases: BaseEstimator

Base class for anomaly scorers.

Scorers assign anomaly scores to time series points. Higher scores indicate more anomalous points.

abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.DashboardVisualizer(figsize: tuple[int, int] = (15, 10))[source]

Bases: object

Dashboard visualization utilities for predictive maintenance monitoring.

create_dashboard(results_history: dict[str, list[dict[str, Any]]], sensor_data: dict[str, DataFrame] | None = None, save_path: str | None = None)[source]

Create comprehensive dashboard visualization.

Parameters

results_historydict

Dictionary mapping asset_id to list of processing results.

sensor_datadict, optional

Dictionary mapping asset_id to DataFrame with sensor readings.

save_pathstr, optional

Path to save the dashboard figure.

Returns

figmatplotlib.figure.Figure

Dashboard figure.

create_summary_dashboard(results_history: dict[str, list[dict[str, Any]]], save_path: str | None = None)[source]

Create summary dashboard with key metrics.

Parameters

results_historydict

Dictionary mapping asset_id to list of processing results.

save_pathstr, optional

Path to save the dashboard figure.

Returns

figmatplotlib.figure.Figure

Summary dashboard figure.

anomsmith.EnsembleDetector

alias of ScoreCombiningEnsembleDetector

class anomsmith.FailureClassifier(n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]

Bases: object

Classify normal vs. failure states.

fit(X: ndarray | DataFrame, y: ndarray | Series)[source]

Fit the failure classifier.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

yarray-like of shape (n_samples,)

Binary labels: 0 for normal, 1 for failure.

model_: RandomForestClassifier | None
predict(X: ndarray | DataFrame) ndarray[source]

Predict failure states.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

Returns

predictionsndarray of shape (n_samples,)

Binary predictions: 0 for normal, 1 for failure.

predict_proba(X: ndarray | DataFrame) ndarray[source]

Predict failure probabilities.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

Returns

probabilitiesndarray of shape (n_samples, 2)

Probability of [normal, failure] for each sample.

scaler_: StandardScaler | None
class anomsmith.FeatureExtractor(rolling_windows: list[int] | None = None, frequency_features: bool = True, change_detection: bool = True)[source]

Bases: object

Extract predictive maintenance features from time series data.

extract(data: ndarray | Series | DataFrame, columns: list[str] | None = None) DataFrame[source]

Extract features from time series data.

Parameters

dataarray-like

Time series data. Can be 1D array, Series, or DataFrame.

columnslist of str, optional

Column names if data is a DataFrame. If None, uses ‘value’ for 1D data.

Returns

featuresDataFrame

Extracted features with named columns.

feature_names_: list[str]
class anomsmith.IQRScorer(factor: float = 1.5, random_state: int | None = None)[source]

Bases: BaseScorer

Interquartile Range (IQR) based outlier scorer.

Computes outlier scores based on IQR bounds. Higher scores indicate more anomalous points.

Parameters:
  • factor – IQR multiplier for outlier bounds (default: 1.5)

  • random_state – Random state for reproducibility (not used, kept for compatibility)

fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) IQRScorer[source]

Fit the scorer by computing quartiles.

Parameters:
  • y – Training data

  • X – Optional features (not used)

Returns:

Self for method chaining

score(y: ndarray | Series) ScoreView[source]

Score anomalies using IQR bounds.

Parameters:

y – Time series to score

Returns:

ScoreView with IQR-based scores

class anomsmith.IsolationForestDetector(contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None, n_jobs: int = -1)[source]

Bases: BaseDetector

Isolation Forest anomaly detector.

Isolation Forest is an ensemble method that isolates anomalies by randomly selecting features and splitting values.

Parameters:
  • contamination – Expected proportion of outliers in the data

  • n_estimators – Number of base estimators

  • random_state – Random state for reproducibility

  • n_jobs – Number of jobs to run in parallel

fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) IsolationForestDetector[source]

Fit the Isolation Forest detector.

Parameters:
  • y – Training data (target)

  • X – Optional features (if None, uses y)

Returns:

Self for method chaining

predict(y: ndarray | Series) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary labels (1 = anomaly, 0 = normal)

score(y: ndarray | Series) ScoreView[source]

Score anomalies.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.LOFDetector(contamination: float = 0.05, n_neighbors: int = 20, random_state: int | None = None, n_jobs: int = -1)[source]

Bases: BaseDetector

Local Outlier Factor (LOF) anomaly detector.

LOF measures the local deviation of density of a given sample with respect to its neighbors.

Parameters:
  • contamination – Expected proportion of outliers in the data

  • n_neighbors – Number of neighbors to use

  • random_state – Random state for reproducibility

  • n_jobs – Number of jobs to run in parallel

fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) LOFDetector[source]

Fit the LOF detector.

Parameters:
  • y – Training data (target)

  • X – Optional features (if None, uses y)

Returns:

Self for method chaining

predict(y: ndarray | Series) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary labels (1 = anomaly, 0 = normal)

score(y: ndarray | Series) ScoreView[source]

Score anomalies.

Uses score_samples() method from fitted LOF model with novelty=True to compute scores for new data without data leakage.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.LSTMAutoencoderDetector(window_size: int = 20, lstm_units: list[int] | None = None, contamination: float = 0.05, threshold_std: float = 3.0, epochs: int = 50, batch_size: int = 32, random_state: int | None = None)[source]

Bases: BaseDetector

LSTM autoencoder: high reconstruction error ⇒ anomaly (univariate only).

fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | None = None) LSTMAutoencoderDetector[source]

Fit the estimator.

Parameters:
  • y – Target values

  • X – Optional features

Returns:

Self for method chaining

predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary anomaly labels

score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.ModelPerformanceTracker(window_size: int = 1000, model_name: str | None = None)[source]

Bases: object

Track model performance over time for monitoring and alerting.

Maintains a rolling window of performance metrics and can detect degradation or drift.

window_size

Number of recent predictions to keep in window

metrics_history

DataFrame with historical metrics

detect_degradation(baseline_metrics: dict[str, float], threshold: float = 0.1) bool[source]

Detect if performance has degraded compared to baseline.

Parameters:
  • baseline_metrics – Baseline metrics (e.g., from training)

  • threshold – Relative degradation threshold (default 0.1 = 10%)

Returns:

True if degradation detected

get_current_metrics() dict[str, float][source]

Get current performance metrics.

Returns:

Dictionary with latest metrics

update(scores: ndarray | Series, predicted_labels: ndarray | Series, true_labels: ndarray | Series | None = None, timestamp: datetime | None = None) dict[str, float][source]

Update tracker with new predictions.

Parameters:
  • scores – Anomaly scores

  • predicted_labels – Predicted binary labels

  • true_labels – Optional ground truth labels

  • timestamp – Optional timestamp for this update

Returns:

Current performance metrics

class anomsmith.PCADetector(n_components: float | int = 0.95, score_method: Literal['reconstruction', 'mahalanobis', 'both'] = 'reconstruction', contamination: float = 0.05, random_state: int | None = None)[source]

Bases: BaseDetector

PCA-based anomaly detector.

Uses Principal Component Analysis to model healthy operation boundaries. Anomalies are detected using either: - Mahalanobis distance in the principal component space - Reconstruction error (difference between original and reconstructed data)

Parameters:
  • n_components – Number of components to keep. If 0 < n_components < 1, select the number of components such that the amount of variance that needs to be explained is greater than the percentage specified.

  • score_method – Method for computing anomaly scores: - ‘reconstruction’: Use reconstruction error - ‘mahalanobis’: Use Mahalanobis distance in PC space - ‘both’: Use both and return average

  • contamination – Expected proportion of outliers in the data (used for threshold)

  • random_state – Random state for reproducibility

fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | None = None) PCADetector[source]

Fit the PCA detector on healthy operation data.

Parameters:
  • y – Training data (target)

  • X – Optional features (if None, uses y)

Returns:

Self for method chaining

predict(y: ndarray | Series) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary labels (1 = anomaly, 0 = normal)

score(y: ndarray | Series) ScoreView[source]

Score anomalies.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.PanelLike(*args, **kwargs)[source]

Bases: Protocol

Protocol for panel-like data: DataFrame with entity key plus time index.

Can be a DataFrame with MultiIndex (entity, time) or a regular DataFrame with an entity column and time index.

columns: Index
index: DatetimeIndex | MultiIndex | Index
class anomsmith.PredictiveMaintenanceSystem(feature_extractor: FeatureExtractor | None = None, rul_estimator: RULEstimator | None = None, failure_classifier: FailureClassifier | None = None, alert_system: AlertSystem | None = None, anomaly_detector: BaseDetector | None = None)[source]

Bases: object

Complete predictive maintenance system integrating all components.

process(data: ndarray | Series | DataFrame, timestamp: datetime | None = None, asset_id: str | None = None, return_features: bool = False) dict[str, Any][source]

Process new data and generate predictions/alerts.

Parameters

dataarray-like

Time series data to process.

timestampdatetime, optional

Timestamp for the data.

asset_idstr, optional

Asset identifier.

return_featuresbool, default=False

Whether to return extracted features.

Returns

resultsdict

Dictionary containing: - ‘features’: extracted features (if return_features=True) - ‘rul’: predicted RUL - ‘failure_probability’: probability of failure - ‘failure_prediction’: binary failure prediction - ‘anomaly_score’: anomaly score from anomsmith.primitives.base.BaseDetector.score() - ‘anomaly_prediction’: 0 (normal) or 1 (anomaly) from LabelView labels - ‘alerts’: list of alerts

class anomsmith.PyTorchAutoencoderDetector(window_size: int = 24, hidden_dims: list[int] | None = None, learning_rate: float = 0.001, epochs: int = 200, batch_size: int = 32, threshold_std: float = 3.0, random_state: int | None = None)[source]

Bases: BaseDetector

Feedforward autoencoder on sliding windows (PyTorch, univariate only).

fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | None = None) PyTorchAutoencoderDetector[source]

Fit the estimator.

Parameters:
  • y – Target values

  • X – Optional features

Returns:

Self for method chaining

predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary anomaly labels

score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.RULEstimator(method: str = 'regression', n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]

Bases: object

Estimate Remaining Useful Life (RUL) for assets.

fit(X: ndarray | DataFrame, y: ndarray | Series, degradation_threshold: float | None = None)[source]

Fit the RUL estimator.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix (e.g., from FeatureExtractor).

yarray-like of shape (n_samples,)

RUL values (time until failure) or degradation values.

degradation_thresholdfloat, optional

Threshold for degradation-based method. If provided, converts degradation values to RUL.

model_: RandomForestRegressor | None
predict(X: ndarray | DataFrame) ndarray[source]

Predict RUL for new data.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

Returns

rulndarray of shape (n_samples,)

Predicted RUL values.

scaler_: StandardScaler | None
class anomsmith.RealTimeIngestion(pm_system: PredictiveMaintenanceSystem, window_size: int = 100, update_frequency: int | None = None)[source]

Bases: object

Real-time data ingestion system for predictive maintenance.

data_buffers: dict[str, deque]
get_all_assets() list[str][source]

Get list of all asset IDs being monitored.

get_latest_results(asset_id: str, n: int = 1) list[dict[str, Any]][source]

Get latest processing results for an asset.

Parameters

asset_idstr

Asset identifier.

nint, default=1

Number of latest results to return.

Returns

resultslist of dict

Latest results.

ingest(data: float | ndarray | Series, asset_id: str, timestamp: datetime | None = None, sensor_name: str | None = None) dict[str, Any][source]

Ingest new data point and process if window is full.

Parameters

datafloat, array-like, or Series

New sensor reading(s).

asset_idstr

Asset identifier.

timestampdatetime, optional

Timestamp for the data. Defaults to current time.

sensor_namestr, optional

Name of sensor/feature. Required if data is scalar.

Returns

resultsdict

Processing results if window is processed, else None.

process_window(asset_id: str) dict[str, Any][source]

Process current window for an asset.

Parameters

asset_idstr

Asset identifier.

Returns

resultsdict

Processing results.

results_history: dict[str, list[dict[str, Any]]]
timestamp_buffers: dict[str, deque]
class anomsmith.RobustCovarianceDetector(contamination: float = 0.05, support_fraction: float = 0.8, random_state: int | None = None)[source]

Bases: BaseDetector

Robust Covariance (Elliptic Envelope) anomaly detector.

Assumes that the data is Gaussian distributed and fits an elliptic envelope to the data.

Parameters:
  • contamination – Expected proportion of outliers in the data

  • support_fraction – Proportion of points to be used as support

  • random_state – Random state for reproducibility

fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) RobustCovarianceDetector[source]

Fit the Robust Covariance detector.

Parameters:
  • y – Training data (target)

  • X – Optional features (if None, uses y)

Returns:

Self for method chaining

predict(y: ndarray | Series) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary labels (1 = anomaly, 0 = normal)

score(y: ndarray | Series) ScoreView[source]

Score anomalies.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.RobustZScoreScorer(epsilon: float = 1e-08)[source]

Bases: BaseScorer

Robust Z-Score anomaly scorer.

Uses median and MAD for robust scaling, then computes absolute z-scores. Higher scores indicate more anomalous points.

fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | None = None) RobustZScoreScorer[source]

Fit the scorer (no-op for this scorer).

Parameters:
  • y – Target values (not used, kept for interface compatibility)

  • X – Optional features (not used)

Returns:

Self for method chaining

score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies using robust z-scores.

Parameters:

y – Time series to score

Returns:

ScoreView with absolute robust z-scores

class anomsmith.ScoreCombiningEnsembleDetector(detectors: list[BaseDetector | BaseScorer], combination_method: str = 'mean', score_percentile: float = 95.0, random_state: int | None = None)[source]

Bases: BaseDetector

Combine scores from multiple detectors/scorers (mean, max, min, or median).

Replaces the former toolkit EnsembleDetector score-combination path: labels are produced by thresholding the combined score at a fixed percentile. For hard voting over member predictions, use VotingEnsembleDetector instead.

fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) ScoreCombiningEnsembleDetector[source]

Fit the estimator.

Parameters:
  • y – Target values

  • X – Optional features

Returns:

Self for method chaining

predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary anomaly labels

score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.SeasonalBaselineScorer(seasonality: Literal['week', 'month', 'day', 'hour'] = 'week', random_state: int | None = None)[source]

Bases: BaseScorer

Seasonal baseline anomaly scorer.

Calculates seasonal baselines (e.g., weekly, monthly) and scores points that deviate significantly from expected seasonal patterns.

Parameters:
  • seasonality – Seasonality to use. Options: ‘week’, ‘month’, ‘day’, ‘hour’.

  • random_state – Random state for reproducibility (not used, kept for compatibility)

fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | PanelLike | None = None) SeasonalBaselineScorer[source]

Fit the scorer by computing seasonal baselines.

Parameters:
  • y – Time series with datetime index

  • X – Optional features (not used)

Returns:

Self for method chaining

score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies using seasonal baseline.

Parameters:

y – Time series to score

Returns:

ScoreView with seasonal z-scores

class anomsmith.SeriesLike(*args, **kwargs)[source]

Bases: Protocol

Protocol for series-like data: pandas Series or single-column DataFrame.

Must have a datetime or integer index.

index: DatetimeIndex | Index
values: object
class anomsmith.ThresholdRule(method: Literal['absolute', 'quantile'], value: float, quantile: float | None = None)[source]

Bases: object

Rule for thresholding anomaly scores.

method

‘absolute’ (use value directly) or ‘quantile’ (use quantile)

Type:

Literal[‘absolute’, ‘quantile’]

value

Threshold value (absolute) or quantile (0-1)

Type:

float

quantile

If method is ‘quantile’, this is the quantile to use

Type:

float | None

method: Literal['absolute', 'quantile']
quantile: float | None = None
value: float
anomsmith.VotingEnsemble

alias of VotingEnsembleDetector

class anomsmith.VotingEnsembleDetector(detectors: list[BaseDetector | BaseScorer], voting_threshold: int = 2, random_state: int | None = None)[source]

Bases: BaseDetector

Voting ensemble that combines predictions from multiple detectors.

An anomaly is flagged if at least voting_threshold detectors agree.

Parameters:
  • detectors – List of anomaly detectors or scorers to ensemble

  • voting_threshold – Minimum number of detectors that must flag a sample as anomalous

  • random_state – Random state for reproducibility (not used, kept for compatibility)

fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) VotingEnsembleDetector[source]

Fit all detectors in the ensemble.

Parameters:
  • y – Training time series

  • X – Optional features (not used)

Returns:

Self for method chaining

get_vote_counts(y: np.ndarray | pd.Series | SeriesLike) np.ndarray[source]

Get vote counts for each sample.

Parameters:

y – Time series to analyze

Returns:

Array of vote counts (number of detectors that flagged each sample as anomalous)

predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]

Predict anomalies using voting.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary labels

score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]

Compute ensemble scores as mean of individual detector scores.

Parameters:

y – Time series to score

Returns:

ScoreView with average anomaly scores

class anomsmith.WaveletDenoiser(wavelet: str = 'db4', threshold_mode: str = 'soft', level: int = 5)[source]

Bases: object

Wavelet soft/hard thresholding denoising (requires PyWavelets).

Useful as a preprocessing step before scoring or for visualization. This is not a BaseDetector; it only returns a denoised array.

denoise(data: ndarray) ndarray[source]

Denoise a 1D signal in-place safe manner.

class anomsmith.WaveletDetector(wavelet: str = 'db4', threshold_factor: float = 3.0, level: int = 5, random_state: int | None = None)[source]

Bases: BaseDetector

Wavelet-based anomaly detector for time series.

Detects anomalies by identifying large coefficients in wavelet detail levels, which indicate sudden changes or anomalies.

Parameters:
  • wavelet – Wavelet type (e.g., ‘db4’, ‘haar’, ‘bior2.2’). Default ‘db4’.

  • threshold_factor – Threshold factor for anomaly detection (in terms of MAD). Default 3.0.

  • level – Decomposition level. Default 5.

  • random_state – Random state for reproducibility (not used, kept for compatibility)

fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) WaveletDetector[source]

Fit the wavelet detector.

Parameters:
  • y – Time series data (1D)

  • X – Optional features (not used)

Returns:

Self for method chaining

predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary labels

score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]

Score anomalies using wavelet decomposition.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.ZScoreScorer(n_std: float = 3.0, random_state: int | None = None)[source]

Bases: BaseScorer

Z-score based anomaly scorer.

Computes absolute Z-scores relative to mean and standard deviation. Higher scores indicate more anomalous points.

Parameters:
  • n_std – Number of standard deviations (used for thresholding, not scoring)

  • random_state – Random state for reproducibility (not used, kept for compatibility)

fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) ZScoreScorer[source]

Fit the scorer by computing mean and standard deviation.

Parameters:
  • y – Training data

  • X – Optional features (not used)

Returns:

Self for method chaining

score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies using Z-scores.

Parameters:

y – Time series to score

Returns:

ScoreView with absolute Z-scores

anomsmith.add_degradation_rates(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', periods: list[int] | None = None) DataFrame[source]

Add degradation rate features (rate of change) for feature columns.

Parameters

dfDataFrame

DataFrame with asset and feature columns.

feature_colslist of str

Feature column names to compute degradation rates for.

asset_id_colstr, default=’asset_id’

Column name for asset identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step.

periodslist of int, optional

Periods for rate of change calculation. Default: [1, 3, 5].

Returns

dfDataFrame

DataFrame with added degradation rate columns.

anomsmith.add_rolling_statistics(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', window: int = 5, stats: list[str] | None = None) DataFrame[source]

Add rolling window statistics for feature columns, grouped by asset.

Parameters

dfDataFrame

DataFrame with asset and feature columns.

feature_colslist of str

Feature column names to compute rolling statistics for.

asset_id_colstr, default=’asset_id’

Column name for asset identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step (used for sorting).

windowint, default=5

Rolling window size.

statslist of str, optional

Statistics to compute. Default: [‘mean’, ‘std’, ‘min’, ‘max’].

Returns

dfDataFrame

DataFrame with added rolling statistic columns.

anomsmith.aggregate_metrics_for_cloudwatch(metrics_list: list[dict[str, float]], namespace: str = 'AnomalyDetection', model_name: str | None = None, timestamp: datetime | None = None) list[dict[str, Any]][source]

Format metrics for AWS CloudWatch PutMetricData API.

Aggregates multiple metric dictionaries into CloudWatch format.

Parameters:
  • metrics_list – List of metric dictionaries from compute_performance_metrics

  • namespace – CloudWatch namespace (default “AnomalyDetection”)

  • model_name – Optional model name for dimension

  • timestamp – Optional timestamp (default: now)

Returns:

List of CloudWatch metric data dictionaries

Examples

>>> metrics = [compute_performance_metrics(y1, pred1), ...]
>>> cw_metrics = aggregate_metrics_for_cloudwatch(metrics, model_name="IsolationForest")
>>> cloudwatch.put_metric_data(
...     Namespace="AnomalyDetection",
...     MetricData=cw_metrics
... )
anomsmith.apply_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) DataFrame[source]

Apply decision policy to health states.

Parameters:
  • health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)

  • previous_states – Previous health states for transition detection (optional)

  • intervene_cost – Cost of intervention action (default 100)

  • review_cost – Cost of review action (default 30)

  • wait_cost – Cost of wait action (default 0)

  • base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])

  • intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)

  • review_risk_reduction – Risk reduction factor for review (default 0.75)

Returns:

pandas DataFrame with health_states, actions, costs, and risks

Examples

>>> import pandas as pd
>>> states = pd.Series([0, 0, 1, 2, 2])
>>> result = apply_policy(states)
>>> result['action'].values
array([0, 0, 1, 2, 2])
anomsmith.assess_asset_health(sensor_data: DataFrame, asset_ids: Series | None = None, feature_cols: list[str] | None = None, failure_labels: Series | ndarray | None = None, use_classification: bool = True, use_anomaly_detection: bool = True, contamination: float = 0.05, n_estimators: int = 100, isolation_n_estimators: int = 200, random_state: int | None = None, *, risk_proba_warning_threshold: float = 0.5, risk_proba_distress_threshold: float = 0.8, classification_weight: float = 0.6, anomaly_weight: float = 0.4) DataFrame[source]

Assess asset health using classification and anomaly detection.

Combines failure risk classification with anomaly detection to provide comprehensive asset health assessment. Results can be used to prioritize maintenance actions.

Parameters:
  • sensor_data – DataFrame with sensor readings (columns are features, rows are assets)

  • asset_ids – Optional Series of asset IDs (defaults to sensor_data index)

  • feature_cols – Optional list of feature column names (defaults to all numeric columns)

  • failure_labels – Optional binary labels for training classifier (1 = failure, 0 = healthy)

  • use_classification – Whether to use failure risk classification (default True)

  • use_anomaly_detection – Whether to use anomaly detection (default True)

  • contamination – Expected proportion of anomalies (see DEFAULT_OUTLIER_CONTAMINATION)

  • n_estimators – Number of trees for Random Forest (see DEFAULT_RANDOM_FOREST_N_ESTIMATORS)

  • isolation_n_estimators – Number of trees for Isolation Forest when anomaly detection is on (see DEFAULT_ISOLATION_FOREST_N_ESTIMATORS).

  • random_state – Random state for reproducibility

  • risk_proba_warning_threshold – Min failure probability for warning health state when using classification (default from anomsmith.constants).

  • risk_proba_distress_threshold – Min failure probability for distress state (must exceed warning threshold; enforced by FailureRiskClassifier).

  • classification_weight – Weight on normalized classification risk in combined_risk when both classification and anomaly detection run (must sum to 1 with anomaly_weight).

  • anomaly_weight – Weight on normalized anomaly score in combined_risk.

Returns:

  • asset_id: Asset identifier

  • failure_risk: Probability of failure (if classification used)

  • health_state: Predicted health state (0=Healthy, 1=Warning, 2=Distress)

  • is_anomaly: Binary anomaly flag (if anomaly detection used)

  • anomaly_score: Anomaly score (if anomaly detection used)

  • combined_risk: Combined risk score (higher = more urgent)

Return type:

DataFrame with columns

Examples

>>> import pandas as pd
>>> import numpy as np
>>> sensor_data = pd.DataFrame({
...     'temperature': [60, 65, 70, 80],
...     'vibration': [0.2, 0.25, 0.3, 0.4],
...     'pressure': [25, 24, 23, 20]
... })
>>> result = assess_asset_health(sensor_data)
>>> result.head()
anomsmith.assess_health_with_pca(X: ndarray | DataFrame, detector: PCADetector, healthy_threshold: float, warning_threshold: float, index: Index | None = None) DataFrame[source]

Assess equipment health using PCA and Mahalanobis distance.

Complete workflow for PCA-based predictive maintenance: 1. Compute Mahalanobis distance from healthy center 2. Classify health states based on distance thresholds 3. Return results as a DataFrame for easy tracking

Parameters:
  • X – Feature matrix (n_samples, n_features) with sensor readings

  • detector – Fitted PCADetector (must use score_method=’mahalanobis’)

  • healthy_threshold – Distance threshold for Healthy state

  • warning_threshold – Distance threshold for Warning state

  • index – Optional index for the results

Returns:

‘mahalanobis_distance’, ‘health_state’

Return type:

DataFrame with columns

Examples

>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train)  # Fit on healthy operation data
>>> # Set thresholds based on training data
>>> healthy_threshold = np.percentile(detector.score(X_train).scores, 75)
>>> warning_threshold = np.percentile(detector.score(X_train).scores, 95)
>>> health_df = assess_health_with_pca(
...     X_monitor, detector, healthy_threshold, warning_threshold
... )
>>> # Track health over time
>>> critical_units = health_df[health_df['health_state'] == 2]
anomsmith.backtest_detector(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule, labels: Series | ndarray | SeriesLike | None = None, n_splits: int = 5, min_train_size: int = 10) DataFrame[source]

Run backtest of detector across expanding windows.

Parameters:
  • y – Time series to backtest on

  • detector – BaseDetector or BaseScorer instance

  • threshold_rule – ThresholdRule to apply

  • labels – Optional ground truth labels

  • n_splits – Number of splits

  • min_train_size – Minimum training set size

Returns:

fold, precision, recall, f1, avg_run_length

Return type:

pandas DataFrame with columns

anomsmith.batch_predict(data_iterator: Iterator[ndarray | Series | DataFrame], detector: BaseDetector) Iterator[tuple[LabelView, ScoreView]][source]

Predict anomalies in batches for efficient processing.

Parameters:
  • data_iterator – Iterator yielding batches of time series data

  • detector – Fitted BaseDetector instance

Yields:

Tuple of (LabelView, ScoreView) for each batch

Examples

>>> detector = IsolationForestDetector(contamination=0.05)
>>> detector.fit(X_train)
>>> for labels, scores in batch_predict(data_stream(), detector):
...     process_predictions(labels, scores)
anomsmith.batch_score(data_iterator: Iterator[ndarray | Series | DataFrame], scorer: BaseScorer) Iterator[ScoreView][source]

Score anomalies in batches for efficient processing of large datasets.

Designed for stream processing (e.g., AWS Kinesis, S3 batch jobs) where data arrives in chunks.

Parameters:
  • data_iterator – Iterator yielding batches of time series data

  • scorer – Fitted BaseScorer instance

Yields:

ScoreView for each batch

Examples

>>> def data_stream():
...     for i in range(0, 10000, 1000):
...         yield pd.Series(np.random.randn(1000), index=pd.date_range(start=f"2024-01-01", periods=1000, freq="H") + pd.Timedelta(hours=i))
>>> scorer = RobustZScoreScorer()
>>> scorer.fit(y_train)
>>> for batch_scores in batch_score(data_stream(), scorer):
...     process_scores(batch_scores)
anomsmith.calculate_confusion_matrix_metrics(predictions: ndarray, y_true: ndarray) dict[str, int][source]

Confusion matrix counts with 1 = predicted / true anomaly.

anomsmith.calculate_lead_time(predictions: ndarray, true_labels: ndarray, timestamps: ndarray | None = None) dict[str, float | int][source]

Lead time between anomaly detections and failure events.

Parameters:
  • predictions – Detector labels (1 = anomaly, 0 = normal).

  • true_labels – Ground truth (1 = anomaly, 0 = normal).

  • timestamps – Optional timestamps aligned to predictions.

Returns:

Dictionary with mean/median/min/max lead time and early/late detection counts.

anomsmith.calculate_rul(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', failure_cycle_col: str | None = None) Series[source]

Calculate Remaining Useful Life (RUL) for each record.

RUL is calculated as: max_cycle - current_cycle for each asset.

Parameters

dfDataFrame

DataFrame with asset_id and cycle columns.

asset_id_colstr, default=’asset_id’

Column name for asset/equipment identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step.

failure_cycle_colstr, optional

Column name for failure cycle. If provided, uses this instead of max cycle.

Returns

rulSeries

Remaining Useful Life for each record.

anomsmith.classify_health_from_distance(distances: Series | ndarray | SeriesLike, healthy_threshold: float, warning_threshold: float, index: Index | None = None) HealthStateView[source]

Classify health states from Mahalanobis distance thresholds.

Maps Mahalanobis distance values to health states: - distance <= healthy_threshold: Healthy (0) - healthy_threshold < distance <= warning_threshold: Warning (1) - distance > warning_threshold: Critical/Distress (2)

This creates probabilistic zones of “normality” based on distance from the healthy center, minimizing false positives by having a wide decision space for normal operation.

Parameters:
  • distances – Mahalanobis distance values (n_samples,)

  • healthy_threshold – Distance threshold for Healthy state

  • warning_threshold – Distance threshold for Warning state (must be > healthy_threshold)

  • index – Optional index for the health states

Returns:

HealthStateView with classified health states

Examples

>>> distances = track_mahalanobis_distance(X_monitor, detector)
>>> # Set thresholds based on training data (e.g., percentiles)
>>> healthy_threshold = np.percentile(distances, 75)
>>> warning_threshold = np.percentile(distances, 95)
>>> health_states = classify_health_from_distance(
...     distances, healthy_threshold, warning_threshold
... )
anomsmith.compare_detectors(detectors: dict[str, BaseDetector], X: ndarray | DataFrame, y_true: ndarray, timestamps: ndarray | None = None) DataFrame[source]

Compare multiple fitted detectors side-by-side.

anomsmith.compare_survival_models(models: dict[str, CoxSurvivalModel], X_test: ndarray | DataFrame, durations_test: ndarray | Series, events_test: ndarray | Series | None = None) DataFrame[source]

Compare multiple survival models.

Evaluates multiple survival models and returns comparison metrics.

Parameters:
  • models – Dictionary mapping model names to fitted CoxSurvivalModel instances

  • X_test – Test feature matrix

  • durations_test – Test time-to-event values

  • events_test – Test event indicators, optional

Returns:

DataFrame with comparison metrics (C-index, MAE, etc.) for each model

Examples

>>> models = {
...     "CoxPH": cox_model,
...     "LogisticHazard": lhaz_model,
...     "DeepSurv": deepsurv_model
... }
>>> comparison = compare_survival_models(models, X_test, durations_test, events_test)
>>> print(comparison)
anomsmith.compute_concordance_index(durations: ndarray | Series, risk_scores: ndarray | Series, events: ndarray | Series | None = None) float[source]

Compute concordance index (C-index) for survival model evaluation.

C-index measures how well a model ranks survival times. A score of 0.5 implies random ordering; 1.0 implies perfect prediction.

Uses lifelines if available, otherwise computes manually.

Parameters:
  • durations – Actual time-to-event values (n_samples,)

  • risk_scores – Predicted risk scores (n_samples,) - higher = higher risk

  • events – Event indicators (1 = event occurred, 0 = censored), optional

Returns:

C-index between 0.0 and 1.0

Examples

>>> c_index = compute_concordance_index(true_durations, risk_scores, events)
>>> print(f"C-index: {c_index:.3f}")
anomsmith.compute_pca_health_thresholds(X_train: ndarray | DataFrame, detector: PCADetector, healthy_percentile: float = 75.0, warning_percentile: float = 95.0) tuple[float, float][source]

Compute health state thresholds from training data.

Determines distance thresholds for health state classification based on percentiles of Mahalanobis distances in the training (healthy) data.

Parameters:
  • X_train – Training data (should be healthy operation data)

  • detector – Fitted PCADetector (must use score_method=’mahalanobis’)

  • healthy_percentile – Percentile for healthy threshold (default 75.0)

  • warning_percentile – Percentile for warning threshold (default 95.0)

Returns:

Tuple of (healthy_threshold, warning_threshold)

Examples

>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train)  # Fit on healthy operation data
>>> healthy_threshold, warning_threshold = compute_pca_health_thresholds(
...     X_train, detector, healthy_percentile=75, warning_percentile=95
... )
anomsmith.compute_performance_metrics(true_labels: ndarray | Series, predicted_labels: ndarray | Series, scores: ndarray | Series | None = None) dict[str, float][source]

Compute comprehensive performance metrics for model monitoring.

Returns metrics suitable for CloudWatch, Prometheus, or similar monitoring systems.

Parameters:
  • true_labels – Ground truth binary labels (0 = normal, 1 = anomaly)

  • predicted_labels – Predicted binary labels

  • scores – Optional anomaly scores (for threshold-independent metrics)

Returns:

  • precision: Precision score

  • recall: Recall score

  • f1: F1 score

  • true_positives: Number of true positives

  • false_positives: Number of false positives

  • false_negatives: Number of false negatives

  • true_negatives: Number of true negatives

  • anomaly_rate: Proportion of predicted anomalies

  • avg_run_length: Average length of anomaly runs (if scores provided)

Return type:

Dictionary with metrics

Examples

>>> metrics = compute_performance_metrics(true_labels, pred_labels, scores)
>>> # Send to CloudWatch
>>> cloudwatch.put_metric_data(
...     Namespace="AnomalyDetection",
...     MetricData=[{"MetricName": "F1", "Value": metrics["f1"]}]
... )
anomsmith.create_rul_labels(df: DataFrame, rul_col: str = 'RUL', warning_threshold: int = 30, critical_threshold: int = 15) DataFrame[source]

Create health status labels based on RUL values.

Parameters

dfDataFrame

DataFrame with RUL column.

rul_colstr, default=’RUL’

Column name for RUL values.

warning_thresholdint, default=30

RUL threshold for warning state.

critical_thresholdint, default=15

RUL threshold for critical state.

Returns

dfDataFrame

DataFrame with added columns: - health_status: ‘healthy’, ‘warning’, ‘critical’, ‘failed’ - binary_label: 0 (healthy) or 1 (failure/warning/critical) - multi_class_label: 0 (healthy), 1 (warning), 2 (critical), 3 (failed)

anomsmith.detect_anomalies(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule) DataFrame[source]

Detect anomalies in a time series.

Parameters:
  • y – Time series to detect anomalies in

  • detector – BaseDetector or BaseScorer instance

  • threshold_rule – ThresholdRule to apply

Returns:

pandas DataFrame with ‘score’ and ‘flag’ columns, indexed by y’s index

anomsmith.detect_concept_drift(recent_scores: ndarray | Series, historical_scores: ndarray | Series, threshold: float = 2.0) dict[str, Any][source]

Detect concept drift in model scores.

Compares recent score distribution to historical distribution using statistical tests. Useful for triggering model retraining.

Parameters:
  • recent_scores – Recent anomaly scores (last N samples)

  • historical_scores – Historical anomaly scores (training/baseline period)

  • threshold – Threshold for drift detection (default 2.0 std devs)

Returns:

  • drift_detected: Boolean indicating if drift detected

  • recent_mean: Mean of recent scores

  • historical_mean: Mean of historical scores

  • drift_magnitude: Difference in means normalized by historical std

  • ks_statistic: Kolmogorov-Smirnov test statistic (if scipy available)

  • p_value: P-value from KS test (if scipy available)

Return type:

Dictionary with drift detection results

Examples

>>> drift_info = detect_concept_drift(
...     recent_scores=model_scores[-1000:],
...     historical_scores=baseline_scores
... )
>>> if drift_info["drift_detected"]:
...     trigger_model_retraining()
anomsmith.discretize_rul(rul: Series | ndarray | SeriesLike, healthy_threshold: float = 30.0, warning_threshold: float = 10.0) Series[source]

Discretize RUL values into health states.

Maps RUL values to health states: - RUL > healthy_threshold: Healthy (0) - warning_threshold < RUL <= healthy_threshold: Warning (1) - RUL <= warning_threshold: Distress (2)

Parameters:
  • rul – Remaining Useful Life values

  • healthy_threshold – RUL threshold for Healthy state (default 30)

  • warning_threshold – RUL threshold for Warning state (default 10)

Returns:

pandas Series with health states aligned to input index

Examples

>>> import pandas as pd
>>> import numpy as np
>>> rul = pd.Series([50, 25, 5, 0])
>>> states = discretize_rul(rul, healthy_threshold=30, warning_threshold=10)
>>> states.values
array([0, 1, 2, 2])
anomsmith.evaluate_detector(detector: BaseDetector, X: ndarray | DataFrame, y_true: ndarray, scores: ndarray | None = None, timestamps: ndarray | None = None) dict[str, float | int][source]

Evaluate a fitted anomsmith detector on tabular test data.

anomsmith.evaluate_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) dict[str, float][source]

Evaluate policy performance metrics.

Parameters:
  • health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)

  • previous_states – Previous health states for transition detection (optional)

  • intervene_cost – Cost of intervention action (default 100)

  • review_cost – Cost of review action (default 30)

  • wait_cost – Cost of wait action (default 0)

  • base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])

  • intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)

  • review_risk_reduction – Risk reduction factor for review (default 0.75)

Returns:

Dictionary with total_cost, total_risk, interventions, reviews, waits

Examples

>>> import pandas as pd
>>> states = pd.Series([0, 0, 1, 2, 2])
>>> metrics = evaluate_policy(states)
>>> metrics['total_cost']
230.0
anomsmith.evaluate_survival_model(surv_df: DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, risk_scores: ndarray | Series | None = None) dict[str, float][source]

Evaluate survival model performance.

Computes comprehensive metrics for survival model evaluation.

Parameters:
  • surv_df – Survival function DataFrame (rows = time points, cols = samples)

  • durations – Actual time-to-event values (n_samples,)

  • events – Event indicators (1 = event occurred, 0 = censored), optional

  • risk_scores – Optional risk scores for C-index (if None, computed from survival)

Returns:

  • c_index: Concordance index

  • mean_absolute_error: Mean absolute error in predicted vs actual durations

  • median_survival_error: Error in median survival predictions

Return type:

Dictionary with evaluation metrics

Examples

>>> surv_df = model.predict_survival_function(X_test)
>>> metrics = evaluate_survival_model(surv_df, durations_test, events_test)
>>> print(f"C-index: {metrics['c_index']:.3f}")
anomsmith.fit_survival_model_for_maintenance(X: ndarray | DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, model_type: str = 'logistic_hazard', **model_kwargs) CoxSurvivalModel[source]

Fit a survival model for predictive maintenance.

Convenience function that fits a survival model with sensible defaults for predictive maintenance use cases.

Parameters:
  • X – Feature matrix (n_samples, n_features) - sensor readings

  • durations – Time-to-failure values (n_samples,)

  • events – Event indicators (1 = failure, 0 = censored), optional

  • model_type – Model type - ‘cox’ (lifelines), ‘logistic_hazard’, or ‘deepsurv’

  • **model_kwargs – Additional model parameters

Returns:

Fitted survival model

Examples

>>> model = fit_survival_model_for_maintenance(
...     X_train, durations_train, events_train,
...     model_type="logistic_hazard", n_bins=50
... )
anomsmith.plot_comparison_metrics(comparison_df: DataFrame, metrics: list[str] | None = None, save_path: str | None = None)[source]

Create comparison chart for multiple detectors.

Parameters

comparison_dfDataFrame

DataFrame from compare_detectors().

metricslist of str, optional

Metrics to plot. Default: [‘precision’, ‘recall’, ‘f1’].

save_pathstr, optional

Path to save the figure.

anomsmith.plot_pca_boundary(detector: PCADetector, X: ndarray | DataFrame, y_true: ndarray | None = None, n_components_plot: int = 2, save_path: str | None = None)[source]

Visualize PCA boundary in 2D projection (anomsmith PCADetector).

anomsmith.plot_reconstruction_error(detector, X: ndarray | DataFrame, y_true: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]

Plot reconstruction error over time for LSTM or PCA detector.

Parameters

detectorBaseDetector

Fitted detector (PCA or LSTM).

Xarray-like

Data to plot.

y_truendarray, optional

True labels for marking actual anomalies.

timestampsndarray, optional

Timestamps for x-axis.

save_pathstr, optional

Path to save the figure.

anomsmith.plot_sensor_drift(sensor_data: ndarray | Series, predictions: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]

Visualize sensor drift with anomaly flags.

Parameters

sensor_dataarray-like

Sensor readings over time.

predictionsndarray, optional

Anomaly predictions (1 for anomaly, 0 for normal).

timestampsndarray, optional

Timestamps for x-axis.

save_pathstr, optional

Path to save the figure.

anomsmith.predict_health_states_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, healthy_threshold: float = 30.0, warning_threshold: float = 10.0, threshold: float = 0.5) HealthStateView[source]

Predict health states from survival model.

Converts survival model predictions to health states by: 1. Predicting RUL from survival model 2. Discretizing RUL into health states

Parameters:
  • model – Fitted survival model

  • X – Feature matrix (n_samples, n_features)

  • healthy_threshold – RUL threshold for Healthy state (default 30)

  • warning_threshold – RUL threshold for Warning state (default 10)

  • threshold – Survival probability threshold for median RUL (default 0.5)

Returns:

HealthStateView with predicted health states

Examples

>>> health_states = predict_health_states_from_survival(
...     model, X_test, healthy_threshold=30, warning_threshold=10
... )
anomsmith.predict_rul_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, threshold: float = 0.5, index: Index | None = None) Series[source]

Predict Remaining Useful Life (RUL) from survival model.

Uses median survival time (where survival probability = threshold) as predicted RUL.

Parameters:
  • model – Fitted survival model

  • X – Feature matrix (n_samples, n_features)

  • threshold – Survival probability threshold for median (default 0.5)

  • index – Optional row index for the returned Series (defaults to X.index for DataFrame inputs, else a pandas.RangeIndex)

Returns:

Series of predicted RUL values

Examples

>>> rul_predictions = predict_rul_from_survival(survival_model, X_test)
>>> health_states = predict_health_states_from_survival(
...     survival_model, X_test, healthy_threshold=30, warning_threshold=10
... )
anomsmith.prepare_pm_features(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', feature_cols: list[str] | None = None, calculate_rul_flag: bool = True, add_labels: bool = True, add_rolling_stats: bool = True, include_degradation_rates: bool = False, rolling_window: int = 5, warning_threshold: int = 30, critical_threshold: int = 15, failure_cycle_col: str | None = None) DataFrame[source]

Prepare predictive maintenance features from raw sensor data.

This is a convenience function that combines: - RUL calculation - Health status labeling - Rolling statistics - Degradation rates

Parameters

dfDataFrame

Input DataFrame with asset_id, cycle, and sensor/feature columns.

asset_id_colstr, default=’asset_id’

Column name for asset identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step.

feature_colslist of str, optional

Feature column names. If None, auto-detects (excludes asset_id, cycle, RUL, etc.).

calculate_rul_flagbool, default=True

Whether to calculate RUL.

add_labelsbool, default=True

Whether to add health status labels.

add_rolling_statsbool, default=True

Whether to add rolling statistics.

include_degradation_ratesbool, default=False

Whether to add degradation rate features.

rolling_windowint, default=5

Window size for rolling statistics.

warning_thresholdint, default=30

RUL threshold for warning state.

critical_thresholdint, default=15

RUL threshold for critical state.

failure_cycle_colstr, optional

Column name for failure cycle (if available).

Returns

dfDataFrame

DataFrame with all engineered features.

anomsmith.rank_assets_by_risk(asset_health: DataFrame, top_n: int | None = None) DataFrame[source]

Rank assets by combined risk score.

Parameters:
  • asset_health – DataFrame from assess_asset_health()

  • top_n – Optional number of top assets to return (default None = all)

Returns:

DataFrame ranked by combined_risk (highest first)

anomsmith.score_anomalies(y: Series | ndarray | SeriesLike, scorer: BaseScorer) Series[source]

Score anomalies in a time series.

Parameters:
  • y – Time series to score

  • scorer – BaseScorer instance

Returns:

pandas Series of anomaly scores with same index as y

anomsmith.sweep_thresholds(y: Series | ndarray | SeriesLike, scorer: BaseScorer, threshold_values: list[float] | ndarray, labels: Series | ndarray | SeriesLike | None = None) DataFrame[source]

Evaluate multiple threshold values and return metrics.

Parameters:
  • y – Time series to score

  • scorer – BaseScorer instance

  • threshold_values – List of threshold values to evaluate

  • labels – Optional ground truth labels

Returns:

threshold, precision, recall, f1 (metrics are NaN if labels not provided)

Return type:

pandas DataFrame with columns

anomsmith.track_mahalanobis_distance(X: ndarray | DataFrame, detector: PCADetector, index: Index | None = None) Series[source]

Track Mahalanobis distance over time as a single metric.

Computes Mahalanobis distance from the “normal” center in PCA space for each time point. This provides a single metric that can be tracked as a time series to monitor equipment health drift.

Delegates scoring to PCADetector.score() so Mahalanobis math stays in the primitive layer (single implementation).

Parameters:
  • X – Feature matrix (n_samples, n_features) with sensor readings

  • detector – Fitted PCADetector (fitted detector with PCA and mean/covariance computed)

  • index – Optional index for the resulting Series

Returns:

pandas Series with Mahalanobis distance values, indexed by time

Examples

>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train)  # Fit on healthy operation data
>>> distances = track_mahalanobis_distance(X_monitor, detector)
>>> # Track distance over time to detect drift

Objects

Layer 1: Data and representations.

This layer uses timesmith’s SeriesLike and PanelLike types for time series data. ScoreView and LabelView are kept for anomaly-specific outputs. No domain libraries (sklearn, matplotlib, etc.) are imported here. Only numpy and pandas are allowed.

class anomsmith.objects.Action(*values)[source]

Bases: IntEnum

Action categories for decision policies.

INTERVENE = 2
REVIEW = 1
WAIT = 0
class anomsmith.objects.ActionView(index: Index, actions: ndarray)[source]

Bases: object

Action labels aligned to time series index.

index

Time series index

Type:

pandas.core.indexes.base.Index

actions

Action values (0=wait, 1=review, 2=intervene)

Type:

numpy.ndarray

actions: ndarray
index: Index
to_series() Series[source]

Convert to pandas Series.

class anomsmith.objects.HealthState(*values)[source]

Bases: IntEnum

Health state categories for predictive maintenance.

States are ordered from healthy (0) to distressed (highest value).

DISTRESS = 2
HEALTHY = 0
WARNING = 1
class anomsmith.objects.HealthStateView(index: Index, states: ndarray)[source]

Bases: object

Health state labels aligned to time series index.

index

Time series index

Type:

pandas.core.indexes.base.Index

states

Health state values (0=Healthy, 1=Warning, 2=Distress)

Type:

numpy.ndarray

index: Index
states: ndarray
to_series() Series[source]

Convert to pandas Series.

class anomsmith.objects.LabelView(index: Index, labels: ndarray)[source]

Bases: object

Immutable view of binary anomaly labels aligned to an index.

index

Time index (must match input series index)

Type:

pandas.core.indexes.base.Index

labels

Binary flags as 1D array (1 = anomaly, 0 = normal)

Type:

numpy.ndarray

index: Index
labels: ndarray
class anomsmith.objects.PanelLike(*args, **kwargs)[source]

Bases: Protocol

Protocol for panel-like data: DataFrame with entity key plus time index.

Can be a DataFrame with MultiIndex (entity, time) or a regular DataFrame with an entity column and time index.

columns: Index
index: DatetimeIndex | MultiIndex | Index
anomsmith.objects.PanelView

alias of PanelLike

class anomsmith.objects.PolicyResult(health_states: HealthStateView, actions: ActionView, costs: ndarray, risks: ndarray)[source]

Bases: object

Result of applying a decision policy.

health_states

Predicted health states

Type:

anomsmith.objects.health_state.HealthStateView

actions

Recommended actions

Type:

anomsmith.objects.health_state.ActionView

costs

Action costs

Type:

numpy.ndarray

risks

Failure risks after actions

Type:

numpy.ndarray

actions: ActionView
costs: ndarray
health_states: HealthStateView
risks: ndarray
to_dataframe() DataFrame[source]

Convert to pandas DataFrame.

class anomsmith.objects.ScoreView(index: Index, scores: ndarray)[source]

Bases: object

Immutable view of anomaly scores aligned to an index.

index

Time index (must match input series index)

Type:

pandas.core.indexes.base.Index

scores

Anomaly scores as 1D array (higher = more anomalous)

Type:

numpy.ndarray

index: Index
scores: ndarray
class anomsmith.objects.SeriesLike(*args, **kwargs)[source]

Bases: Protocol

Protocol for series-like data: pandas Series or single-column DataFrame.

Must have a datetime or integer index.

index: DatetimeIndex | Index
values: object
anomsmith.objects.SeriesView

alias of SeriesLike

class anomsmith.objects.WindowSpec(length: int, step: int = 1, alignment: Literal['left', 'right', 'center'] = 'right')[source]

Bases: object

Specification for sliding or expanding windows.

length

Window length in time steps

Type:

int

step

Step size between windows (default: 1)

Type:

int

alignment

‘left’ (start at beginning), ‘right’ (end at current), or ‘center’ (centered on current point)

Type:

Literal[‘left’, ‘right’, ‘center’]

alignment: Literal['left', 'right', 'center'] = 'right'
length: int
step: int = 1

Primitives

Layer 2: Primitives.

This layer defines algorithm interfaces and thin utilities. It must not know about tasks or evaluation. Only numpy and pandas are allowed (no sklearn, matplotlib, etc.).

class anomsmith.primitives.BaseDetector(**params: Any)[source]

Bases: BaseEstimator

Base class for anomaly detectors.

Detectors produce both scores and binary labels.

abstractmethod predict(y: ndarray | Series | SeriesLike) LabelView[source]

Predict anomaly labels.

Parameters:

y – Time series to detect anomalies in

Returns:

LabelView with binary anomaly labels

abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.primitives.BaseEstimator(**params: Any)[source]

Bases: BaseObject

Base class for estimators with fit and fitted state.

_fitted

Whether the estimator has been fitted

abstractmethod fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | None = None) BaseEstimator[source]

Fit the estimator.

Parameters:
  • y – Target values

  • X – Optional features

Returns:

Self for method chaining

property is_fitted: bool

Check if estimator is fitted.

Returns:

True if fitted, False otherwise

class anomsmith.primitives.BaseObject(**params: Any)[source]

Bases: ABC

Base class for all primitives with parameter management.

Provides get_params, set_params, clone, and repr methods.

clone() BaseObject[source]

Create a deep copy of this object.

Returns:

Deep copy of this object

get_params(deep: bool = True) dict[str, Any][source]

Get parameters for this object.

Parameters:

deep – If True, return deep copy of parameters

Returns:

Dictionary of parameter names to values

set_params(**params: Any) BaseObject[source]

Set parameters for this object.

Parameters:

**params – Parameters to set

Returns:

Self for method chaining

property tags: dict[str, Any]

Get tags for this object.

Returns:

Dictionary of tag names to values

class anomsmith.primitives.BaseScorer(**params: Any)[source]

Bases: BaseEstimator

Base class for anomaly scorers.

Scorers assign anomaly scores to time series points. Higher scores indicate more anomalous points.

abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]

Score anomalies in a time series.

Parameters:

y – Time series to score

Returns:

ScoreView with anomaly scores

class anomsmith.primitives.ThresholdRule(method: Literal['absolute', 'quantile'], value: float, quantile: float | None = None)[source]

Bases: object

Rule for thresholding anomaly scores.

method

‘absolute’ (use value directly) or ‘quantile’ (use quantile)

Type:

Literal[‘absolute’, ‘quantile’]

value

Threshold value (absolute) or quantile (0-1)

Type:

float

quantile

If method is ‘quantile’, this is the quantile to use

Type:

float | None

method: Literal['absolute', 'quantile']
quantile: float | None = None
value: float
anomsmith.primitives.apply_threshold(score_view: ScoreView, rule: ThresholdRule) LabelView[source]

Apply threshold rule to scores to produce binary labels.

Parameters:
  • score_view – ScoreView with anomaly scores

  • rule – ThresholdRule to apply

Returns:

LabelView with binary labels (1 = anomaly, 0 = normal)

anomsmith.primitives.export_model_for_sagemaker(model: BaseEstimator, s3_path: str, metadata: dict[str, Any] | None = None, local_path: str | Path | None = None) dict[str, Any][source]

Export model in format ready for AWS SageMaker deployment.

Creates a model package that can be uploaded to S3 and deployed as a SageMaker endpoint. The model is saved locally first, then S3 upload instructions are returned.

Parameters:
  • model – An anomsmith estimator to export

  • s3_path – S3 path where model will be uploaded (e.g., “s3://bucket/models/v1/”)

  • metadata – Optional metadata for deployment

  • local_path – Local path to save model (default: temp directory)

Returns:

  • local_path: Local path where model was saved

  • s3_path: S3 path for upload

  • upload_command: AWS CLI command to upload

  • inference_code_template: Template for SageMaker inference script

Return type:

Dictionary with export information including

Examples

>>> export_info = export_model_for_sagemaker(
...     model, "s3://my-bucket/models/anomaly-detector/v1.0"
... )
>>> print(export_info["upload_command"])
anomsmith.primitives.load_model(path: str | Path) BaseEstimator[source]

Load an anomsmith model from disk.

Warning

Models are loaded using pickle. Only load from trusted sources. Unpickling data from untrusted origins can execute arbitrary code.

Parameters:

path – Directory path where model was saved

Returns:

Loaded model instance

Raises:

Examples

>>> model = load_model("models/robust_zscore_v1")
>>> scores = model.score(y_test)
anomsmith.primitives.robust_zscore(values: ndarray, epsilon: float = 1e-08) ndarray[source]

Compute robust z-scores using median and MAD.

Uses median as center and Median Absolute Deviation (MAD) as scale. Includes epsilon guard to prevent division by zero.

Parameters:
  • values – Input values to scale

  • epsilon – Small value to prevent division by zero

Returns:

Robust z-scores (same shape as input)

anomsmith.primitives.save_model(model: BaseEstimator, path: str | Path, metadata: dict[str, Any] | None = None) None[source]

Save an anomsmith model to disk for deployment.

Saves the model’s state, parameters, and metadata in a format suitable for cloud deployment (e.g., AWS SageMaker, containerized endpoints).

Parameters:
  • model – An anomsmith estimator (BaseScorer, BaseDetector, etc.)

  • path – Directory path where model will be saved

  • metadata – Optional metadata dict (model version, training date, etc.)

Raises:

Examples

>>> from anomsmith.primitives.scorers.robust_zscore import RobustZScoreScorer
>>> scorer = RobustZScoreScorer()
>>> scorer.fit(y_train)
>>> save_model(scorer, "models/robust_zscore_v1", metadata={"version": "1.0"})

Workflows

Layer 4: Workflows.

Workflows provide the public entry points users call. Workflows can import matplotlib only if plots are added (not in first pass).

class anomsmith.workflows.ModelPerformanceTracker(window_size: int = 1000, model_name: str | None = None)[source]

Bases: object

Track model performance over time for monitoring and alerting.

Maintains a rolling window of performance metrics and can detect degradation or drift.

window_size

Number of recent predictions to keep in window

metrics_history

DataFrame with historical metrics

detect_degradation(baseline_metrics: dict[str, float], threshold: float = 0.1) bool[source]

Detect if performance has degraded compared to baseline.

Parameters:
  • baseline_metrics – Baseline metrics (e.g., from training)

  • threshold – Relative degradation threshold (default 0.1 = 10%)

Returns:

True if degradation detected

get_current_metrics() dict[str, float][source]

Get current performance metrics.

Returns:

Dictionary with latest metrics

update(scores: ndarray | Series, predicted_labels: ndarray | Series, true_labels: ndarray | Series | None = None, timestamp: datetime | None = None) dict[str, float][source]

Update tracker with new predictions.

Parameters:
  • scores – Anomaly scores

  • predicted_labels – Predicted binary labels

  • true_labels – Optional ground truth labels

  • timestamp – Optional timestamp for this update

Returns:

Current performance metrics

anomsmith.workflows.aggregate_metrics_for_cloudwatch(metrics_list: list[dict[str, float]], namespace: str = 'AnomalyDetection', model_name: str | None = None, timestamp: datetime | None = None) list[dict[str, Any]][source]

Format metrics for AWS CloudWatch PutMetricData API.

Aggregates multiple metric dictionaries into CloudWatch format.

Parameters:
  • metrics_list – List of metric dictionaries from compute_performance_metrics

  • namespace – CloudWatch namespace (default “AnomalyDetection”)

  • model_name – Optional model name for dimension

  • timestamp – Optional timestamp (default: now)

Returns:

List of CloudWatch metric data dictionaries

Examples

>>> metrics = [compute_performance_metrics(y1, pred1), ...]
>>> cw_metrics = aggregate_metrics_for_cloudwatch(metrics, model_name="IsolationForest")
>>> cloudwatch.put_metric_data(
...     Namespace="AnomalyDetection",
...     MetricData=cw_metrics
... )
anomsmith.workflows.aggregate_undirected_edges(communications: DataFrame, *, sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', drop_self_loops: bool = True) DataFrame[source]

Aggregate communication rows into undirected weighted edges.

Mirrors the aggregation in org_network_analysis business logic: each unique unordered pair (min(a,b), max(a,b)) gets weight equal to the number of rows (communication events) between those endpoints.

Parameters:
  • communications – One row per event; must include sender/receiver columns.

  • sender_col – Column name for the sender endpoint (default sender_id).

  • receiver_col – Column name for the receiver endpoint (default receiver_id).

  • drop_self_loops – If True, rows where sender equals receiver are skipped.

Returns:

DataFrame with columns u, v, weight (integer counts), sorted by u, v.

anomsmith.workflows.apply_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) DataFrame[source]

Apply decision policy to health states.

Parameters:
  • health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)

  • previous_states – Previous health states for transition detection (optional)

  • intervene_cost – Cost of intervention action (default 100)

  • review_cost – Cost of review action (default 30)

  • wait_cost – Cost of wait action (default 0)

  • base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])

  • intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)

  • review_risk_reduction – Risk reduction factor for review (default 0.75)

Returns:

pandas DataFrame with health_states, actions, costs, and risks

Examples

>>> import pandas as pd
>>> states = pd.Series([0, 0, 1, 2, 2])
>>> result = apply_policy(states)
>>> result['action'].values
array([0, 0, 1, 2, 2])
anomsmith.workflows.assess_asset_health(sensor_data: DataFrame, asset_ids: Series | None = None, feature_cols: list[str] | None = None, failure_labels: Series | ndarray | None = None, use_classification: bool = True, use_anomaly_detection: bool = True, contamination: float = 0.05, n_estimators: int = 100, isolation_n_estimators: int = 200, random_state: int | None = None, *, risk_proba_warning_threshold: float = 0.5, risk_proba_distress_threshold: float = 0.8, classification_weight: float = 0.6, anomaly_weight: float = 0.4) DataFrame[source]

Assess asset health using classification and anomaly detection.

Combines failure risk classification with anomaly detection to provide comprehensive asset health assessment. Results can be used to prioritize maintenance actions.

Parameters:
  • sensor_data – DataFrame with sensor readings (columns are features, rows are assets)

  • asset_ids – Optional Series of asset IDs (defaults to sensor_data index)

  • feature_cols – Optional list of feature column names (defaults to all numeric columns)

  • failure_labels – Optional binary labels for training classifier (1 = failure, 0 = healthy)

  • use_classification – Whether to use failure risk classification (default True)

  • use_anomaly_detection – Whether to use anomaly detection (default True)

  • contamination – Expected proportion of anomalies (see DEFAULT_OUTLIER_CONTAMINATION)

  • n_estimators – Number of trees for Random Forest (see DEFAULT_RANDOM_FOREST_N_ESTIMATORS)

  • isolation_n_estimators – Number of trees for Isolation Forest when anomaly detection is on (see DEFAULT_ISOLATION_FOREST_N_ESTIMATORS).

  • random_state – Random state for reproducibility

  • risk_proba_warning_threshold – Min failure probability for warning health state when using classification (default from anomsmith.constants).

  • risk_proba_distress_threshold – Min failure probability for distress state (must exceed warning threshold; enforced by FailureRiskClassifier).

  • classification_weight – Weight on normalized classification risk in combined_risk when both classification and anomaly detection run (must sum to 1 with anomaly_weight).

  • anomaly_weight – Weight on normalized anomaly score in combined_risk.

Returns:

  • asset_id: Asset identifier

  • failure_risk: Probability of failure (if classification used)

  • health_state: Predicted health state (0=Healthy, 1=Warning, 2=Distress)

  • is_anomaly: Binary anomaly flag (if anomaly detection used)

  • anomaly_score: Anomaly score (if anomaly detection used)

  • combined_risk: Combined risk score (higher = more urgent)

Return type:

DataFrame with columns

Examples

>>> import pandas as pd
>>> import numpy as np
>>> sensor_data = pd.DataFrame({
...     'temperature': [60, 65, 70, 80],
...     'vibration': [0.2, 0.25, 0.3, 0.4],
...     'pressure': [25, 24, 23, 20]
... })
>>> result = assess_asset_health(sensor_data)
>>> result.head()
anomsmith.workflows.assess_health_with_pca(X: ndarray | DataFrame, detector: PCADetector, healthy_threshold: float, warning_threshold: float, index: Index | None = None) DataFrame[source]

Assess equipment health using PCA and Mahalanobis distance.

Complete workflow for PCA-based predictive maintenance: 1. Compute Mahalanobis distance from healthy center 2. Classify health states based on distance thresholds 3. Return results as a DataFrame for easy tracking

Parameters:
  • X – Feature matrix (n_samples, n_features) with sensor readings

  • detector – Fitted PCADetector (must use score_method=’mahalanobis’)

  • healthy_threshold – Distance threshold for Healthy state

  • warning_threshold – Distance threshold for Warning state

  • index – Optional index for the results

Returns:

‘mahalanobis_distance’, ‘health_state’

Return type:

DataFrame with columns

Examples

>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train)  # Fit on healthy operation data
>>> # Set thresholds based on training data
>>> healthy_threshold = np.percentile(detector.score(X_train).scores, 75)
>>> warning_threshold = np.percentile(detector.score(X_train).scores, 95)
>>> health_df = assess_health_with_pca(
...     X_monitor, detector, healthy_threshold, warning_threshold
... )
>>> # Track health over time
>>> critical_units = health_df[health_df['health_state'] == 2]
anomsmith.workflows.backtest_detector(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule, labels: Series | ndarray | SeriesLike | None = None, n_splits: int = 5, min_train_size: int = 10) DataFrame[source]

Run backtest of detector across expanding windows.

Parameters:
  • y – Time series to backtest on

  • detector – BaseDetector or BaseScorer instance

  • threshold_rule – ThresholdRule to apply

  • labels – Optional ground truth labels

  • n_splits – Number of splits

  • min_train_size – Minimum training set size

Returns:

fold, precision, recall, f1, avg_run_length

Return type:

pandas DataFrame with columns

anomsmith.workflows.batch_predict(data_iterator: Iterator[ndarray | Series | DataFrame], detector: BaseDetector) Iterator[tuple[LabelView, ScoreView]][source]

Predict anomalies in batches for efficient processing.

Parameters:
  • data_iterator – Iterator yielding batches of time series data

  • detector – Fitted BaseDetector instance

Yields:

Tuple of (LabelView, ScoreView) for each batch

Examples

>>> detector = IsolationForestDetector(contamination=0.05)
>>> detector.fit(X_train)
>>> for labels, scores in batch_predict(data_stream(), detector):
...     process_predictions(labels, scores)
anomsmith.workflows.batch_score(data_iterator: Iterator[ndarray | Series | DataFrame], scorer: BaseScorer) Iterator[ScoreView][source]

Score anomalies in batches for efficient processing of large datasets.

Designed for stream processing (e.g., AWS Kinesis, S3 batch jobs) where data arrives in chunks.

Parameters:
  • data_iterator – Iterator yielding batches of time series data

  • scorer – Fitted BaseScorer instance

Yields:

ScoreView for each batch

Examples

>>> def data_stream():
...     for i in range(0, 10000, 1000):
...         yield pd.Series(np.random.randn(1000), index=pd.date_range(start=f"2024-01-01", periods=1000, freq="H") + pd.Timedelta(hours=i))
>>> scorer = RobustZScoreScorer()
>>> scorer.fit(y_train)
>>> for batch_scores in batch_score(data_stream(), scorer):
...     process_scores(batch_scores)
anomsmith.workflows.classify_health_from_distance(distances: Series | ndarray | SeriesLike, healthy_threshold: float, warning_threshold: float, index: Index | None = None) HealthStateView[source]

Classify health states from Mahalanobis distance thresholds.

Maps Mahalanobis distance values to health states: - distance <= healthy_threshold: Healthy (0) - healthy_threshold < distance <= warning_threshold: Warning (1) - distance > warning_threshold: Critical/Distress (2)

This creates probabilistic zones of “normality” based on distance from the healthy center, minimizing false positives by having a wide decision space for normal operation.

Parameters:
  • distances – Mahalanobis distance values (n_samples,)

  • healthy_threshold – Distance threshold for Healthy state

  • warning_threshold – Distance threshold for Warning state (must be > healthy_threshold)

  • index – Optional index for the health states

Returns:

HealthStateView with classified health states

Examples

>>> distances = track_mahalanobis_distance(X_monitor, detector)
>>> # Set thresholds based on training data (e.g., percentiles)
>>> healthy_threshold = np.percentile(distances, 75)
>>> warning_threshold = np.percentile(distances, 95)
>>> health_states = classify_health_from_distance(
...     distances, healthy_threshold, warning_threshold
... )
anomsmith.workflows.compare_survival_models(models: dict[str, CoxSurvivalModel], X_test: ndarray | DataFrame, durations_test: ndarray | Series, events_test: ndarray | Series | None = None) DataFrame[source]

Compare multiple survival models.

Evaluates multiple survival models and returns comparison metrics.

Parameters:
  • models – Dictionary mapping model names to fitted CoxSurvivalModel instances

  • X_test – Test feature matrix

  • durations_test – Test time-to-event values

  • events_test – Test event indicators, optional

Returns:

DataFrame with comparison metrics (C-index, MAE, etc.) for each model

Examples

>>> models = {
...     "CoxPH": cox_model,
...     "LogisticHazard": lhaz_model,
...     "DeepSurv": deepsurv_model
... }
>>> comparison = compare_survival_models(models, X_test, durations_test, events_test)
>>> print(comparison)
anomsmith.workflows.compute_concordance_index(durations: ndarray | Series, risk_scores: ndarray | Series, events: ndarray | Series | None = None) float[source]

Compute concordance index (C-index) for survival model evaluation.

C-index measures how well a model ranks survival times. A score of 0.5 implies random ordering; 1.0 implies perfect prediction.

Uses lifelines if available, otherwise computes manually.

Parameters:
  • durations – Actual time-to-event values (n_samples,)

  • risk_scores – Predicted risk scores (n_samples,) - higher = higher risk

  • events – Event indicators (1 = event occurred, 0 = censored), optional

Returns:

C-index between 0.0 and 1.0

Examples

>>> c_index = compute_concordance_index(true_durations, risk_scores, events)
>>> print(f"C-index: {c_index:.3f}")
anomsmith.workflows.compute_pca_health_thresholds(X_train: ndarray | DataFrame, detector: PCADetector, healthy_percentile: float = 75.0, warning_percentile: float = 95.0) tuple[float, float][source]

Compute health state thresholds from training data.

Determines distance thresholds for health state classification based on percentiles of Mahalanobis distances in the training (healthy) data.

Parameters:
  • X_train – Training data (should be healthy operation data)

  • detector – Fitted PCADetector (must use score_method=’mahalanobis’)

  • healthy_percentile – Percentile for healthy threshold (default 75.0)

  • warning_percentile – Percentile for warning threshold (default 95.0)

Returns:

Tuple of (healthy_threshold, warning_threshold)

Examples

>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train)  # Fit on healthy operation data
>>> healthy_threshold, warning_threshold = compute_pca_health_thresholds(
...     X_train, detector, healthy_percentile=75, warning_percentile=95
... )
anomsmith.workflows.compute_performance_metrics(true_labels: ndarray | Series, predicted_labels: ndarray | Series, scores: ndarray | Series | None = None) dict[str, float][source]

Compute comprehensive performance metrics for model monitoring.

Returns metrics suitable for CloudWatch, Prometheus, or similar monitoring systems.

Parameters:
  • true_labels – Ground truth binary labels (0 = normal, 1 = anomaly)

  • predicted_labels – Predicted binary labels

  • scores – Optional anomaly scores (for threshold-independent metrics)

Returns:

  • precision: Precision score

  • recall: Recall score

  • f1: F1 score

  • true_positives: Number of true positives

  • false_positives: Number of false positives

  • false_negatives: Number of false negatives

  • true_negatives: Number of true negatives

  • anomaly_rate: Proportion of predicted anomalies

  • avg_run_length: Average length of anomaly runs (if scores provided)

Return type:

Dictionary with metrics

Examples

>>> metrics = compute_performance_metrics(true_labels, pred_labels, scores)
>>> # Send to CloudWatch
>>> cloudwatch.put_metric_data(
...     Namespace="AnomalyDetection",
...     MetricData=[{"MetricName": "F1", "Value": metrics["f1"]}]
... )
anomsmith.workflows.detect_anomalies(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule) DataFrame[source]

Detect anomalies in a time series.

Parameters:
  • y – Time series to detect anomalies in

  • detector – BaseDetector or BaseScorer instance

  • threshold_rule – ThresholdRule to apply

Returns:

pandas DataFrame with ‘score’ and ‘flag’ columns, indexed by y’s index

anomsmith.workflows.detect_concept_drift(recent_scores: ndarray | Series, historical_scores: ndarray | Series, threshold: float = 2.0) dict[str, Any][source]

Detect concept drift in model scores.

Compares recent score distribution to historical distribution using statistical tests. Useful for triggering model retraining.

Parameters:
  • recent_scores – Recent anomaly scores (last N samples)

  • historical_scores – Historical anomaly scores (training/baseline period)

  • threshold – Threshold for drift detection (default 2.0 std devs)

Returns:

  • drift_detected: Boolean indicating if drift detected

  • recent_mean: Mean of recent scores

  • historical_mean: Mean of historical scores

  • drift_magnitude: Difference in means normalized by historical std

  • ks_statistic: Kolmogorov-Smirnov test statistic (if scipy available)

  • p_value: P-value from KS test (if scipy available)

Return type:

Dictionary with drift detection results

Examples

>>> drift_info = detect_concept_drift(
...     recent_scores=model_scores[-1000:],
...     historical_scores=baseline_scores
... )
>>> if drift_info["drift_detected"]:
...     trigger_model_retraining()
anomsmith.workflows.detect_network_edge_anomalies(edge_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]

Flag structurally unusual dyads using isolation forest on edge features.

Expects a frame such as the output of edge_features_from_edges() (numeric columns only are used by default).

Raises:

ValueError – If fewer than two edges are present.

anomsmith.workflows.detect_network_node_anomalies(node_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]

Flag structurally unusual nodes using isolation forest on feature rows.

Fits IsolationForestDetector on the numeric feature matrix and thresholds anomaly scores. Typical use: pass the output of node_features_from_edges() (options: join extra numeric columns before calling).

Parameters:
  • node_features – Rows are nodes (index = node id); values are features.

  • threshold_rule – Rule applied to isolation scores (higher = more anomalous).

  • feature_cols – Columns to use; default is all numeric columns in the frame.

  • contamination – Passed to IsolationForest.

  • n_estimators – Number of trees in the forest.

  • random_state – Optional RNG seed.

Returns:

DataFrame with original feature columns plus score and flag (1 = anomaly). Index matches node_features.

Raises:

ValueError – If fewer than two rows are present (isolation forest requires a batch to score relative to).

anomsmith.workflows.detect_network_temporal_node_anomalies(touch_counts_by_bin: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]

Flag nodes whose time-bin activity vectors look unlike the rest.

Rows are nodes (index from node_touch_counts_by_bin()). Columns should be numeric bin counts (any column names); by default all numeric columns are used as features.

Raises:

ValueError – If fewer than two nodes, no numeric columns, or any bin column contains non-finite values.

anomsmith.workflows.discretize_rul(rul: Series | ndarray | SeriesLike, healthy_threshold: float = 30.0, warning_threshold: float = 10.0) Series[source]

Discretize RUL values into health states.

Maps RUL values to health states: - RUL > healthy_threshold: Healthy (0) - warning_threshold < RUL <= healthy_threshold: Warning (1) - RUL <= warning_threshold: Distress (2)

Parameters:
  • rul – Remaining Useful Life values

  • healthy_threshold – RUL threshold for Healthy state (default 30)

  • warning_threshold – RUL threshold for Warning state (default 10)

Returns:

pandas Series with health states aligned to input index

Examples

>>> import pandas as pd
>>> import numpy as np
>>> rul = pd.Series([50, 25, 5, 0])
>>> states = discretize_rul(rul, healthy_threshold=30, warning_threshold=10)
>>> states.values
array([0, 1, 2, 2])
anomsmith.workflows.edge_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]

Per-edge (dyad) features derived from aggregated undirected weights.

Rows follow the u, v, weight table from aggregate_undirected_edges(). Combines each edge weight with endpoint strengths from node_features_from_edges() to highlight unusually heavy links relative to endpoint activity.

Columns:

  • weight: aggregated event count on the dyad.

  • share_of_endpoint_volume: 2 * weight / (deg(u) + deg(v)) using endpoint weighted_degree values (each edge’s weight is included in both degrees).

  • log1p_weight: log1p(weight) for scale-robust modeling.

Parameters:
  • edges – Non-empty edge list (typically aggregated counts).

  • nodes – Full node roster (same semantics as node_features_from_edges()).

  • u_col – Column names in edges.

  • v_col – Column names in edges.

  • weight_col – Column names in edges.

Returns:

DataFrame indexed by MultiIndex (u, v) with numeric feature columns.

Raises:

ValueError – If edges is empty.

anomsmith.workflows.evaluate_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) dict[str, float][source]

Evaluate policy performance metrics.

Parameters:
  • health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)

  • previous_states – Previous health states for transition detection (optional)

  • intervene_cost – Cost of intervention action (default 100)

  • review_cost – Cost of review action (default 30)

  • wait_cost – Cost of wait action (default 0)

  • base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])

  • intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)

  • review_risk_reduction – Risk reduction factor for review (default 0.75)

Returns:

Dictionary with total_cost, total_risk, interventions, reviews, waits

Examples

>>> import pandas as pd
>>> states = pd.Series([0, 0, 1, 2, 2])
>>> metrics = evaluate_policy(states)
>>> metrics['total_cost']
230.0
anomsmith.workflows.evaluate_survival_model(surv_df: DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, risk_scores: ndarray | Series | None = None) dict[str, float][source]

Evaluate survival model performance.

Computes comprehensive metrics for survival model evaluation.

Parameters:
  • surv_df – Survival function DataFrame (rows = time points, cols = samples)

  • durations – Actual time-to-event values (n_samples,)

  • events – Event indicators (1 = event occurred, 0 = censored), optional

  • risk_scores – Optional risk scores for C-index (if None, computed from survival)

Returns:

  • c_index: Concordance index

  • mean_absolute_error: Mean absolute error in predicted vs actual durations

  • median_survival_error: Error in median survival predictions

Return type:

Dictionary with evaluation metrics

Examples

>>> surv_df = model.predict_survival_function(X_test)
>>> metrics = evaluate_survival_model(surv_df, durations_test, events_test)
>>> print(f"C-index: {metrics['c_index']:.3f}")
anomsmith.workflows.fit_survival_model_for_maintenance(X: ndarray | DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, model_type: str = 'logistic_hazard', **model_kwargs) CoxSurvivalModel[source]

Fit a survival model for predictive maintenance.

Convenience function that fits a survival model with sensible defaults for predictive maintenance use cases.

Parameters:
  • X – Feature matrix (n_samples, n_features) - sensor readings

  • durations – Time-to-failure values (n_samples,)

  • events – Event indicators (1 = failure, 0 = censored), optional

  • model_type – Model type - ‘cox’ (lifelines), ‘logistic_hazard’, or ‘deepsurv’

  • **model_kwargs – Additional model parameters

Returns:

Fitted survival model

Examples

>>> model = fit_survival_model_for_maintenance(
...     X_train, durations_train, events_train,
...     model_type="logistic_hazard", n_bins=50
... )
anomsmith.workflows.node_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]

Per-node structural features for anomaly scoring.

Uses the same edge table shape produced by aggregate_undirected_edges() (u, v, weight). Every id in nodes appears in the index; nodes with no incident edges get zero strength, zero distinct-neighbor count, and uniform PageRank mass.

Feature columns:

  • weighted_degree: sum of incident edge weights (communication volume).

  • neighbor_count: number of distinct neighbors.

  • pagerank: undirected PageRank (numpy power iteration; no NetworkX).

Parameters:
  • edges – Edge list with endpoints and non-negative weights.

  • nodes – Complete roster of node identifiers (e.g. all team member ids).

  • u_col – Column names in edges.

  • v_col – Column names in edges.

  • weight_col – Column names in edges.

Returns:

DataFrame indexed by node id with numeric feature columns.

anomsmith.workflows.node_graph_metrics_networkx(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]

Graph centrality metrics via NetworkX (optional dependency).

Installs with the network extra: pip install 'anomsmith[network]'.

Builds an undirected graph: every id in nodes is a vertex; edges from edges carry combined weights (parallel edges in the table should be pre-aggregated). Centrality matches common org-network dashboards: topology for betweenness and closeness; eigenvector uses edge weight.

Columns:

  • betweenness_centrality — NetworkX betweenness_centrality (unweighted hops).

  • closeness_centrality — NetworkX closeness_centrality.

  • eigenvector_centrality — weighted when convergence succeeds; else zeros.

Parameters:
  • edges – Aggregated u, v, weight table (may be empty).

  • nodes – Full roster; isolated members still appear with zeros.

  • u_col – Column names in edges.

  • v_col – Column names in edges.

  • weight_col – Column names in edges.

Returns:

DataFrame aligned to nodes with the three metric columns.

Raises:

ImportError – If NetworkX is not installed.

anomsmith.workflows.node_touch_counts_by_bin(communications: DataFrame, nodes: Index | list[Any] | ndarray, *, timestamp_col: str = 'timestamp', sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', freq: str = '1D', drop_self_loops: bool = True) DataFrame[source]

Count how often each node sends or receives in each time bin.

Each communication row increments both the sender and the receiver for the floored period bucket (pandas offset string, e.g. "1D", "6H").

Parameters:
  • communications – Must include timestamp and endpoint columns.

  • nodes – Full roster; bins include only these ids (other endpoints dropped).

  • timestamp_col – Parseable timestamps (pd.to_datetime).

  • sender_col – Endpoint identifiers.

  • receiver_col – Endpoint identifiers.

  • freq – Bin size passed to Series.dt.floor.

  • drop_self_loops – If True, rows with sender equal receiver are skipped.

Returns:

DataFrame with index = node id, columns = bin start (datetime64), values = integer touch counts. Missing bins are zero; nodes with no events still appear as rows of zeros when listed in nodes.

anomsmith.workflows.predict_health_states_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, healthy_threshold: float = 30.0, warning_threshold: float = 10.0, threshold: float = 0.5) HealthStateView[source]

Predict health states from survival model.

Converts survival model predictions to health states by: 1. Predicting RUL from survival model 2. Discretizing RUL into health states

Parameters:
  • model – Fitted survival model

  • X – Feature matrix (n_samples, n_features)

  • healthy_threshold – RUL threshold for Healthy state (default 30)

  • warning_threshold – RUL threshold for Warning state (default 10)

  • threshold – Survival probability threshold for median RUL (default 0.5)

Returns:

HealthStateView with predicted health states

Examples

>>> health_states = predict_health_states_from_survival(
...     model, X_test, healthy_threshold=30, warning_threshold=10
... )
anomsmith.workflows.predict_rul_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, threshold: float = 0.5, index: Index | None = None) Series[source]

Predict Remaining Useful Life (RUL) from survival model.

Uses median survival time (where survival probability = threshold) as predicted RUL.

Parameters:
  • model – Fitted survival model

  • X – Feature matrix (n_samples, n_features)

  • threshold – Survival probability threshold for median (default 0.5)

  • index – Optional row index for the returned Series (defaults to X.index for DataFrame inputs, else a pandas.RangeIndex)

Returns:

Series of predicted RUL values

Examples

>>> rul_predictions = predict_rul_from_survival(survival_model, X_test)
>>> health_states = predict_health_states_from_survival(
...     survival_model, X_test, healthy_threshold=30, warning_threshold=10
... )
anomsmith.workflows.process_s3_batch(s3_keys: list[str], model: BaseScorer | BaseDetector, bucket: str, s3_client=None) DataFrame[source]

Process a batch of S3 files with anomaly detection.

Designed for AWS Lambda or SageMaker batch jobs that process S3 data in batches.

Parameters:
  • s3_keys – List of S3 object keys to process

  • model – Fitted model (BaseScorer or BaseDetector)

  • bucket – S3 bucket name (required)

  • s3_client – Optional boto3 S3 client (will create if not provided)

Returns:

DataFrame with results for all processed files

Raises:

Examples

>>> s3_keys = ["data/2024/01/01/file1.csv", "data/2024/01/01/file2.csv"]
>>> results = process_s3_batch(s3_keys, scorer, bucket="my-data-bucket")
anomsmith.workflows.rank_assets_by_risk(asset_health: DataFrame, top_n: int | None = None) DataFrame[source]

Rank assets by combined risk score.

Parameters:
  • asset_health – DataFrame from assess_asset_health()

  • top_n – Optional number of top assets to return (default None = all)

Returns:

DataFrame ranked by combined_risk (highest first)

anomsmith.workflows.report_detection(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule) dict[str, Any][source]

Generate detection report with summary stats.

Parameters:
  • y – Time series that was analyzed

  • detector – BaseDetector or BaseScorer instance used

  • threshold_rule – ThresholdRule applied

Returns:

Dictionary with summary stats and top anomaly timestamps

anomsmith.workflows.score_anomalies(y: Series | ndarray | SeriesLike, scorer: BaseScorer) Series[source]

Score anomalies in a time series.

Parameters:
  • y – Time series to score

  • scorer – BaseScorer instance

Returns:

pandas Series of anomaly scores with same index as y

anomsmith.workflows.sweep_thresholds(y: Series | ndarray | SeriesLike, scorer: BaseScorer, threshold_values: list[float] | ndarray, labels: Series | ndarray | SeriesLike | None = None) DataFrame[source]

Evaluate multiple threshold values and return metrics.

Parameters:
  • y – Time series to score

  • scorer – BaseScorer instance

  • threshold_values – List of threshold values to evaluate

  • labels – Optional ground truth labels

Returns:

threshold, precision, recall, f1 (metrics are NaN if labels not provided)

Return type:

pandas DataFrame with columns

anomsmith.workflows.track_mahalanobis_distance(X: ndarray | DataFrame, detector: PCADetector, index: Index | None = None) Series[source]

Track Mahalanobis distance over time as a single metric.

Computes Mahalanobis distance from the “normal” center in PCA space for each time point. This provides a single metric that can be tracked as a time series to monitor equipment health drift.

Delegates scoring to PCADetector.score() so Mahalanobis math stays in the primitive layer (single implementation).

Parameters:
  • X – Feature matrix (n_samples, n_features) with sensor readings

  • detector – Fitted PCADetector (fitted detector with PCA and mean/covariance computed)

  • index – Optional index for the resulting Series

Returns:

pandas Series with Mahalanobis distance values, indexed by time

Examples

>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train)  # Fit on healthy operation data
>>> distances = track_mahalanobis_distance(X_monitor, detector)
>>> # Track distance over time to detect drift

Network workflows

Network-shaped anomaly workflows.

Designed to interoperate with organizational communication graphs such as org_network_analysis (NetworkAnalyzer._to_edge_list): undirected edges aggregated by sender/receiver pair with integer weights (event counts), and a fixed member roster so isolated nodes still appear in outputs.

anomsmith.workflows.network.aggregate_undirected_edges(communications: DataFrame, *, sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', drop_self_loops: bool = True) DataFrame[source]

Aggregate communication rows into undirected weighted edges.

Mirrors the aggregation in org_network_analysis business logic: each unique unordered pair (min(a,b), max(a,b)) gets weight equal to the number of rows (communication events) between those endpoints.

Parameters:
  • communications – One row per event; must include sender/receiver columns.

  • sender_col – Column name for the sender endpoint (default sender_id).

  • receiver_col – Column name for the receiver endpoint (default receiver_id).

  • drop_self_loops – If True, rows where sender equals receiver are skipped.

Returns:

DataFrame with columns u, v, weight (integer counts), sorted by u, v.

anomsmith.workflows.network.detect_network_edge_anomalies(edge_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]

Flag structurally unusual dyads using isolation forest on edge features.

Expects a frame such as the output of edge_features_from_edges() (numeric columns only are used by default).

Raises:

ValueError – If fewer than two edges are present.

anomsmith.workflows.network.detect_network_node_anomalies(node_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]

Flag structurally unusual nodes using isolation forest on feature rows.

Fits IsolationForestDetector on the numeric feature matrix and thresholds anomaly scores. Typical use: pass the output of node_features_from_edges() (options: join extra numeric columns before calling).

Parameters:
  • node_features – Rows are nodes (index = node id); values are features.

  • threshold_rule – Rule applied to isolation scores (higher = more anomalous).

  • feature_cols – Columns to use; default is all numeric columns in the frame.

  • contamination – Passed to IsolationForest.

  • n_estimators – Number of trees in the forest.

  • random_state – Optional RNG seed.

Returns:

DataFrame with original feature columns plus score and flag (1 = anomaly). Index matches node_features.

Raises:

ValueError – If fewer than two rows are present (isolation forest requires a batch to score relative to).

anomsmith.workflows.network.detect_network_temporal_node_anomalies(touch_counts_by_bin: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]

Flag nodes whose time-bin activity vectors look unlike the rest.

Rows are nodes (index from node_touch_counts_by_bin()). Columns should be numeric bin counts (any column names); by default all numeric columns are used as features.

Raises:

ValueError – If fewer than two nodes, no numeric columns, or any bin column contains non-finite values.

anomsmith.workflows.network.edge_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]

Per-edge (dyad) features derived from aggregated undirected weights.

Rows follow the u, v, weight table from aggregate_undirected_edges(). Combines each edge weight with endpoint strengths from node_features_from_edges() to highlight unusually heavy links relative to endpoint activity.

Columns:

  • weight: aggregated event count on the dyad.

  • share_of_endpoint_volume: 2 * weight / (deg(u) + deg(v)) using endpoint weighted_degree values (each edge’s weight is included in both degrees).

  • log1p_weight: log1p(weight) for scale-robust modeling.

Parameters:
  • edges – Non-empty edge list (typically aggregated counts).

  • nodes – Full node roster (same semantics as node_features_from_edges()).

  • u_col – Column names in edges.

  • v_col – Column names in edges.

  • weight_col – Column names in edges.

Returns:

DataFrame indexed by MultiIndex (u, v) with numeric feature columns.

Raises:

ValueError – If edges is empty.

anomsmith.workflows.network.node_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]

Per-node structural features for anomaly scoring.

Uses the same edge table shape produced by aggregate_undirected_edges() (u, v, weight). Every id in nodes appears in the index; nodes with no incident edges get zero strength, zero distinct-neighbor count, and uniform PageRank mass.

Feature columns:

  • weighted_degree: sum of incident edge weights (communication volume).

  • neighbor_count: number of distinct neighbors.

  • pagerank: undirected PageRank (numpy power iteration; no NetworkX).

Parameters:
  • edges – Edge list with endpoints and non-negative weights.

  • nodes – Complete roster of node identifiers (e.g. all team member ids).

  • u_col – Column names in edges.

  • v_col – Column names in edges.

  • weight_col – Column names in edges.

Returns:

DataFrame indexed by node id with numeric feature columns.

anomsmith.workflows.network.node_graph_metrics_networkx(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]

Graph centrality metrics via NetworkX (optional dependency).

Installs with the network extra: pip install 'anomsmith[network]'.

Builds an undirected graph: every id in nodes is a vertex; edges from edges carry combined weights (parallel edges in the table should be pre-aggregated). Centrality matches common org-network dashboards: topology for betweenness and closeness; eigenvector uses edge weight.

Columns:

  • betweenness_centrality — NetworkX betweenness_centrality (unweighted hops).

  • closeness_centrality — NetworkX closeness_centrality.

  • eigenvector_centrality — weighted when convergence succeeds; else zeros.

Parameters:
  • edges – Aggregated u, v, weight table (may be empty).

  • nodes – Full roster; isolated members still appear with zeros.

  • u_col – Column names in edges.

  • v_col – Column names in edges.

  • weight_col – Column names in edges.

Returns:

DataFrame aligned to nodes with the three metric columns.

Raises:

ImportError – If NetworkX is not installed.

anomsmith.workflows.network.node_touch_counts_by_bin(communications: DataFrame, nodes: Index | list[Any] | ndarray, *, timestamp_col: str = 'timestamp', sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', freq: str = '1D', drop_self_loops: bool = True) DataFrame[source]

Count how often each node sends or receives in each time bin.

Each communication row increments both the sender and the receiver for the floored period bucket (pandas offset string, e.g. "1D", "6H").

Parameters:
  • communications – Must include timestamp and endpoint columns.

  • nodes – Full roster; bins include only these ids (other endpoints dropped).

  • timestamp_col – Parseable timestamps (pd.to_datetime).

  • sender_col – Endpoint identifiers.

  • receiver_col – Endpoint identifiers.

  • freq – Bin size passed to Series.dt.floor.

  • drop_self_loops – If True, rows with sender equal receiver are skipped.

Returns:

DataFrame with index = node id, columns = bin start (datetime64), values = integer touch counts. Missing bins are zero; nodes with no events still appear as rows of zeros when listed in nodes.

Platform (predictive maintenance)

Predictive maintenance platform layer (features, RUL helpers, alerts, ingestion).

This package consolidates the former standalone Anomaly Detection Toolkit workflow code into the single anomsmith distribution. All detector primitives (PCA, isolation forest, scorers, etc.) live under anomsmith.primitives; platform holds orchestration, dataset utilities, evaluation helpers, and optional matplotlib visualizations that sit on top of those primitives.

class anomsmith.platform.Alert(timestamp: datetime, level: AlertLevel, message: str, feature: str, value: float, threshold: float, asset_id: str | None = None, metadata: dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Represents a predictive maintenance alert.

asset_id: str | None = None
feature: str
level: AlertLevel
message: str
metadata: dict[str, Any]
threshold: float
timestamp: datetime
value: float
class anomsmith.platform.AlertLevel(*values)[source]

Bases: Enum

Alert severity levels.

CRITICAL = 'critical'
FAILURE = 'failure'
INFO = 'info'
WARNING = 'warning'
class anomsmith.platform.AlertSystem(thresholds: dict[str, dict[str, float]] | None = None, escalation_rules: dict[str, dict[str, Any]] | None = None)[source]

Bases: object

Alert system for predictive maintenance with escalation rules.

alert_counts: dict[str, int]
alert_history: deque
check_thresholds(features: ndarray | DataFrame | Series, feature_names: list[str] | None = None, timestamp: datetime | None = None, asset_id: str | None = None) list[Alert][source]

Check features against thresholds and generate alerts.

Parameters

featuresarray-like

Feature values to check. Can be single value, array, or DataFrame.

feature_nameslist of str, optional

Names of features. Required if features is array.

timestampdatetime, optional

Timestamp for alerts. Defaults to current time.

asset_idstr, optional

Asset identifier.

Returns

alertslist of Alert

List of generated alerts.

get_recent_alerts(n: int = 10, level: AlertLevel | None = None, asset_id: str | None = None) list[Alert][source]

Get recent alerts.

Parameters

nint, default=10

Number of recent alerts to return.

levelAlertLevel, optional

Filter by alert level.

asset_idstr, optional

Filter by asset ID.

Returns

alertslist of Alert

Recent alerts matching criteria.

class anomsmith.platform.DashboardVisualizer(figsize: tuple[int, int] = (15, 10))[source]

Bases: object

Dashboard visualization utilities for predictive maintenance monitoring.

create_dashboard(results_history: dict[str, list[dict[str, Any]]], sensor_data: dict[str, DataFrame] | None = None, save_path: str | None = None)[source]

Create comprehensive dashboard visualization.

Parameters

results_historydict

Dictionary mapping asset_id to list of processing results.

sensor_datadict, optional

Dictionary mapping asset_id to DataFrame with sensor readings.

save_pathstr, optional

Path to save the dashboard figure.

Returns

figmatplotlib.figure.Figure

Dashboard figure.

create_summary_dashboard(results_history: dict[str, list[dict[str, Any]]], save_path: str | None = None)[source]

Create summary dashboard with key metrics.

Parameters

results_historydict

Dictionary mapping asset_id to list of processing results.

save_pathstr, optional

Path to save the dashboard figure.

Returns

figmatplotlib.figure.Figure

Summary dashboard figure.

class anomsmith.platform.FailureClassifier(n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]

Bases: object

Classify normal vs. failure states.

fit(X: ndarray | DataFrame, y: ndarray | Series)[source]

Fit the failure classifier.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

yarray-like of shape (n_samples,)

Binary labels: 0 for normal, 1 for failure.

model_: RandomForestClassifier | None
predict(X: ndarray | DataFrame) ndarray[source]

Predict failure states.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

Returns

predictionsndarray of shape (n_samples,)

Binary predictions: 0 for normal, 1 for failure.

predict_proba(X: ndarray | DataFrame) ndarray[source]

Predict failure probabilities.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

Returns

probabilitiesndarray of shape (n_samples, 2)

Probability of [normal, failure] for each sample.

scaler_: StandardScaler | None
class anomsmith.platform.FeatureExtractor(rolling_windows: list[int] | None = None, frequency_features: bool = True, change_detection: bool = True)[source]

Bases: object

Extract predictive maintenance features from time series data.

extract(data: ndarray | Series | DataFrame, columns: list[str] | None = None) DataFrame[source]

Extract features from time series data.

Parameters

dataarray-like

Time series data. Can be 1D array, Series, or DataFrame.

columnslist of str, optional

Column names if data is a DataFrame. If None, uses ‘value’ for 1D data.

Returns

featuresDataFrame

Extracted features with named columns.

feature_names_: list[str]
class anomsmith.platform.PredictiveMaintenanceSystem(feature_extractor: FeatureExtractor | None = None, rul_estimator: RULEstimator | None = None, failure_classifier: FailureClassifier | None = None, alert_system: AlertSystem | None = None, anomaly_detector: BaseDetector | None = None)[source]

Bases: object

Complete predictive maintenance system integrating all components.

process(data: ndarray | Series | DataFrame, timestamp: datetime | None = None, asset_id: str | None = None, return_features: bool = False) dict[str, Any][source]

Process new data and generate predictions/alerts.

Parameters

dataarray-like

Time series data to process.

timestampdatetime, optional

Timestamp for the data.

asset_idstr, optional

Asset identifier.

return_featuresbool, default=False

Whether to return extracted features.

Returns

resultsdict

Dictionary containing: - ‘features’: extracted features (if return_features=True) - ‘rul’: predicted RUL - ‘failure_probability’: probability of failure - ‘failure_prediction’: binary failure prediction - ‘anomaly_score’: anomaly score from anomsmith.primitives.base.BaseDetector.score() - ‘anomaly_prediction’: 0 (normal) or 1 (anomaly) from LabelView labels - ‘alerts’: list of alerts

class anomsmith.platform.RULEstimator(method: str = 'regression', n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]

Bases: object

Estimate Remaining Useful Life (RUL) for assets.

fit(X: ndarray | DataFrame, y: ndarray | Series, degradation_threshold: float | None = None)[source]

Fit the RUL estimator.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix (e.g., from FeatureExtractor).

yarray-like of shape (n_samples,)

RUL values (time until failure) or degradation values.

degradation_thresholdfloat, optional

Threshold for degradation-based method. If provided, converts degradation values to RUL.

model_: RandomForestRegressor | None
predict(X: ndarray | DataFrame) ndarray[source]

Predict RUL for new data.

Parameters

Xarray-like of shape (n_samples, n_features)

Feature matrix.

Returns

rulndarray of shape (n_samples,)

Predicted RUL values.

scaler_: StandardScaler | None
class anomsmith.platform.RealTimeIngestion(pm_system: PredictiveMaintenanceSystem, window_size: int = 100, update_frequency: int | None = None)[source]

Bases: object

Real-time data ingestion system for predictive maintenance.

data_buffers: dict[str, deque]
get_all_assets() list[str][source]

Get list of all asset IDs being monitored.

get_latest_results(asset_id: str, n: int = 1) list[dict[str, Any]][source]

Get latest processing results for an asset.

Parameters

asset_idstr

Asset identifier.

nint, default=1

Number of latest results to return.

Returns

resultslist of dict

Latest results.

ingest(data: float | ndarray | Series, asset_id: str, timestamp: datetime | None = None, sensor_name: str | None = None) dict[str, Any][source]

Ingest new data point and process if window is full.

Parameters

datafloat, array-like, or Series

New sensor reading(s).

asset_idstr

Asset identifier.

timestampdatetime, optional

Timestamp for the data. Defaults to current time.

sensor_namestr, optional

Name of sensor/feature. Required if data is scalar.

Returns

resultsdict

Processing results if window is processed, else None.

process_window(asset_id: str) dict[str, Any][source]

Process current window for an asset.

Parameters

asset_idstr

Asset identifier.

Returns

resultsdict

Processing results.

results_history: dict[str, list[dict[str, Any]]]
timestamp_buffers: dict[str, deque]
anomsmith.platform.add_degradation_rates(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', periods: list[int] | None = None) DataFrame[source]

Add degradation rate features (rate of change) for feature columns.

Parameters

dfDataFrame

DataFrame with asset and feature columns.

feature_colslist of str

Feature column names to compute degradation rates for.

asset_id_colstr, default=’asset_id’

Column name for asset identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step.

periodslist of int, optional

Periods for rate of change calculation. Default: [1, 3, 5].

Returns

dfDataFrame

DataFrame with added degradation rate columns.

anomsmith.platform.add_rolling_statistics(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', window: int = 5, stats: list[str] | None = None) DataFrame[source]

Add rolling window statistics for feature columns, grouped by asset.

Parameters

dfDataFrame

DataFrame with asset and feature columns.

feature_colslist of str

Feature column names to compute rolling statistics for.

asset_id_colstr, default=’asset_id’

Column name for asset identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step (used for sorting).

windowint, default=5

Rolling window size.

statslist of str, optional

Statistics to compute. Default: [‘mean’, ‘std’, ‘min’, ‘max’].

Returns

dfDataFrame

DataFrame with added rolling statistic columns.

anomsmith.platform.calculate_confusion_matrix_metrics(predictions: ndarray, y_true: ndarray) dict[str, int][source]

Confusion matrix counts with 1 = predicted / true anomaly.

anomsmith.platform.calculate_lead_time(predictions: ndarray, true_labels: ndarray, timestamps: ndarray | None = None) dict[str, float | int][source]

Lead time between anomaly detections and failure events.

Parameters:
  • predictions – Detector labels (1 = anomaly, 0 = normal).

  • true_labels – Ground truth (1 = anomaly, 0 = normal).

  • timestamps – Optional timestamps aligned to predictions.

Returns:

Dictionary with mean/median/min/max lead time and early/late detection counts.

anomsmith.platform.calculate_rul(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', failure_cycle_col: str | None = None) Series[source]

Calculate Remaining Useful Life (RUL) for each record.

RUL is calculated as: max_cycle - current_cycle for each asset.

Parameters

dfDataFrame

DataFrame with asset_id and cycle columns.

asset_id_colstr, default=’asset_id’

Column name for asset/equipment identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step.

failure_cycle_colstr, optional

Column name for failure cycle. If provided, uses this instead of max cycle.

Returns

rulSeries

Remaining Useful Life for each record.

anomsmith.platform.compare_detectors(detectors: dict[str, BaseDetector], X: ndarray | DataFrame, y_true: ndarray, timestamps: ndarray | None = None) DataFrame[source]

Compare multiple fitted detectors side-by-side.

anomsmith.platform.create_rul_labels(df: DataFrame, rul_col: str = 'RUL', warning_threshold: int = 30, critical_threshold: int = 15) DataFrame[source]

Create health status labels based on RUL values.

Parameters

dfDataFrame

DataFrame with RUL column.

rul_colstr, default=’RUL’

Column name for RUL values.

warning_thresholdint, default=30

RUL threshold for warning state.

critical_thresholdint, default=15

RUL threshold for critical state.

Returns

dfDataFrame

DataFrame with added columns: - health_status: ‘healthy’, ‘warning’, ‘critical’, ‘failed’ - binary_label: 0 (healthy) or 1 (failure/warning/critical) - multi_class_label: 0 (healthy), 1 (warning), 2 (critical), 3 (failed)

anomsmith.platform.evaluate_detector(detector: BaseDetector, X: ndarray | DataFrame, y_true: ndarray, scores: ndarray | None = None, timestamps: ndarray | None = None) dict[str, float | int][source]

Evaluate a fitted anomsmith detector on tabular test data.

anomsmith.platform.plot_comparison_metrics(comparison_df: DataFrame, metrics: list[str] | None = None, save_path: str | None = None)[source]

Create comparison chart for multiple detectors.

Parameters

comparison_dfDataFrame

DataFrame from compare_detectors().

metricslist of str, optional

Metrics to plot. Default: [‘precision’, ‘recall’, ‘f1’].

save_pathstr, optional

Path to save the figure.

anomsmith.platform.plot_pca_boundary(detector: PCADetector, X: ndarray | DataFrame, y_true: ndarray | None = None, n_components_plot: int = 2, save_path: str | None = None)[source]

Visualize PCA boundary in 2D projection (anomsmith PCADetector).

anomsmith.platform.plot_reconstruction_error(detector, X: ndarray | DataFrame, y_true: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]

Plot reconstruction error over time for LSTM or PCA detector.

Parameters

detectorBaseDetector

Fitted detector (PCA or LSTM).

Xarray-like

Data to plot.

y_truendarray, optional

True labels for marking actual anomalies.

timestampsndarray, optional

Timestamps for x-axis.

save_pathstr, optional

Path to save the figure.

anomsmith.platform.plot_sensor_drift(sensor_data: ndarray | Series, predictions: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]

Visualize sensor drift with anomaly flags.

Parameters

sensor_dataarray-like

Sensor readings over time.

predictionsndarray, optional

Anomaly predictions (1 for anomaly, 0 for normal).

timestampsndarray, optional

Timestamps for x-axis.

save_pathstr, optional

Path to save the figure.

anomsmith.platform.prepare_pm_features(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', feature_cols: list[str] | None = None, calculate_rul_flag: bool = True, add_labels: bool = True, add_rolling_stats: bool = True, include_degradation_rates: bool = False, rolling_window: int = 5, warning_threshold: int = 30, critical_threshold: int = 15, failure_cycle_col: str | None = None) DataFrame[source]

Prepare predictive maintenance features from raw sensor data.

This is a convenience function that combines: - RUL calculation - Health status labeling - Rolling statistics - Degradation rates

Parameters

dfDataFrame

Input DataFrame with asset_id, cycle, and sensor/feature columns.

asset_id_colstr, default=’asset_id’

Column name for asset identifier.

cycle_colstr, default=’cycle’

Column name for cycle/time step.

feature_colslist of str, optional

Feature column names. If None, auto-detects (excludes asset_id, cycle, RUL, etc.).

calculate_rul_flagbool, default=True

Whether to calculate RUL.

add_labelsbool, default=True

Whether to add health status labels.

add_rolling_statsbool, default=True

Whether to add rolling statistics.

include_degradation_ratesbool, default=False

Whether to add degradation rate features.

rolling_windowint, default=5

Window size for rolling statistics.

warning_thresholdint, default=30

RUL threshold for warning state.

critical_thresholdint, default=15

RUL threshold for critical state.

failure_cycle_colstr, optional

Column name for failure cycle (if available).

Returns

dfDataFrame

DataFrame with all engineered features.