API Reference
Anomsmith: Anomaly detection workflows that turn time series signals into actionable decisions.
- class anomsmith.ARIMADriftDetector(order: tuple[int, int, int] = (1, 1, 1), threshold_std: float = 2.0, random_state: int | None = None)[source]
Bases:
BaseDetectorARIMA-based drift detector for time series.
Uses ARIMA forecasting to detect drift. If actual values diverge significantly from forecasts, the series is flagged as drifting.
- Parameters:
order – ARIMA order (p, d, q). Default (1, 1, 1)
threshold_std – Number of standard deviations for drift threshold (default 2.0)
random_state – Random state for reproducibility (not used, kept for compatibility)
- fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) ARIMADriftDetector[source]
Fit the ARIMA model on training data.
- Parameters:
y – Training time series (1D)
X – Optional features (not used for ARIMA)
- Returns:
Self for method chaining
- predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]
Predict drift labels.
- Parameters:
y – Time series to detect drift in
- Returns:
LabelView with binary labels (1 = drift, 0 = normal)
- score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]
Score drift using ARIMA residuals.
- Parameters:
y – Time series to score
- Returns:
ScoreView with drift scores (residual magnitudes)
- class anomsmith.Alert(timestamp: datetime, level: AlertLevel, message: str, feature: str, value: float, threshold: float, asset_id: str | None = None, metadata: dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectRepresents a predictive maintenance alert.
- level: AlertLevel
- class anomsmith.AlertLevel(*values)[source]
Bases:
EnumAlert severity levels.
- CRITICAL = 'critical'
- FAILURE = 'failure'
- INFO = 'info'
- WARNING = 'warning'
- class anomsmith.AlertSystem(thresholds: dict[str, dict[str, float]] | None = None, escalation_rules: dict[str, dict[str, Any]] | None = None)[source]
Bases:
objectAlert system for predictive maintenance with escalation rules.
- check_thresholds(features: ndarray | DataFrame | Series, feature_names: list[str] | None = None, timestamp: datetime | None = None, asset_id: str | None = None) list[Alert][source]
Check features against thresholds and generate alerts.
Parameters
- featuresarray-like
Feature values to check. Can be single value, array, or DataFrame.
- feature_nameslist of str, optional
Names of features. Required if features is array.
- timestampdatetime, optional
Timestamp for alerts. Defaults to current time.
- asset_idstr, optional
Asset identifier.
Returns
- alertslist of Alert
List of generated alerts.
- get_recent_alerts(n: int = 10, level: AlertLevel | None = None, asset_id: str | None = None) list[Alert][source]
Get recent alerts.
Parameters
- nint, default=10
Number of recent alerts to return.
- levelAlertLevel, optional
Filter by alert level.
- asset_idstr, optional
Filter by asset ID.
Returns
- alertslist of Alert
Recent alerts matching criteria.
- class anomsmith.BaseDetector(**params: Any)[source]
Bases:
BaseEstimatorBase class for anomaly detectors.
Detectors produce both scores and binary labels.
- abstractmethod predict(y: ndarray | Series | SeriesLike) LabelView[source]
Predict anomaly labels.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary anomaly labels
- abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.BaseScorer(**params: Any)[source]
Bases:
BaseEstimatorBase class for anomaly scorers.
Scorers assign anomaly scores to time series points. Higher scores indicate more anomalous points.
- abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.DashboardVisualizer(figsize: tuple[int, int] = (15, 10))[source]
Bases:
objectDashboard visualization utilities for predictive maintenance monitoring.
- create_dashboard(results_history: dict[str, list[dict[str, Any]]], sensor_data: dict[str, DataFrame] | None = None, save_path: str | None = None)[source]
Create comprehensive dashboard visualization.
Parameters
- results_historydict
Dictionary mapping asset_id to list of processing results.
- sensor_datadict, optional
Dictionary mapping asset_id to DataFrame with sensor readings.
- save_pathstr, optional
Path to save the dashboard figure.
Returns
- figmatplotlib.figure.Figure
Dashboard figure.
- create_summary_dashboard(results_history: dict[str, list[dict[str, Any]]], save_path: str | None = None)[source]
Create summary dashboard with key metrics.
Parameters
- results_historydict
Dictionary mapping asset_id to list of processing results.
- save_pathstr, optional
Path to save the dashboard figure.
Returns
- figmatplotlib.figure.Figure
Summary dashboard figure.
- anomsmith.EnsembleDetector
alias of
ScoreCombiningEnsembleDetector
- class anomsmith.FailureClassifier(n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]
Bases:
objectClassify normal vs. failure states.
- fit(X: ndarray | DataFrame, y: ndarray | Series)[source]
Fit the failure classifier.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
- yarray-like of shape (n_samples,)
Binary labels: 0 for normal, 1 for failure.
- model_: RandomForestClassifier | None
- predict(X: ndarray | DataFrame) ndarray[source]
Predict failure states.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
Returns
- predictionsndarray of shape (n_samples,)
Binary predictions: 0 for normal, 1 for failure.
- predict_proba(X: ndarray | DataFrame) ndarray[source]
Predict failure probabilities.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
Returns
- probabilitiesndarray of shape (n_samples, 2)
Probability of [normal, failure] for each sample.
- scaler_: StandardScaler | None
- class anomsmith.FeatureExtractor(rolling_windows: list[int] | None = None, frequency_features: bool = True, change_detection: bool = True)[source]
Bases:
objectExtract predictive maintenance features from time series data.
- extract(data: ndarray | Series | DataFrame, columns: list[str] | None = None) DataFrame[source]
Extract features from time series data.
Parameters
- dataarray-like
Time series data. Can be 1D array, Series, or DataFrame.
- columnslist of str, optional
Column names if data is a DataFrame. If None, uses ‘value’ for 1D data.
Returns
- featuresDataFrame
Extracted features with named columns.
- class anomsmith.IQRScorer(factor: float = 1.5, random_state: int | None = None)[source]
Bases:
BaseScorerInterquartile Range (IQR) based outlier scorer.
Computes outlier scores based on IQR bounds. Higher scores indicate more anomalous points.
- Parameters:
factor – IQR multiplier for outlier bounds (default: 1.5)
random_state – Random state for reproducibility (not used, kept for compatibility)
- class anomsmith.IsolationForestDetector(contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None, n_jobs: int = -1)[source]
Bases:
BaseDetectorIsolation Forest anomaly detector.
Isolation Forest is an ensemble method that isolates anomalies by randomly selecting features and splitting values.
- Parameters:
contamination – Expected proportion of outliers in the data
n_estimators – Number of base estimators
random_state – Random state for reproducibility
n_jobs – Number of jobs to run in parallel
- fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) IsolationForestDetector[source]
Fit the Isolation Forest detector.
- Parameters:
y – Training data (target)
X – Optional features (if None, uses y)
- Returns:
Self for method chaining
- class anomsmith.LOFDetector(contamination: float = 0.05, n_neighbors: int = 20, random_state: int | None = None, n_jobs: int = -1)[source]
Bases:
BaseDetectorLocal Outlier Factor (LOF) anomaly detector.
LOF measures the local deviation of density of a given sample with respect to its neighbors.
- Parameters:
contamination – Expected proportion of outliers in the data
n_neighbors – Number of neighbors to use
random_state – Random state for reproducibility
n_jobs – Number of jobs to run in parallel
- fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) LOFDetector[source]
Fit the LOF detector.
- Parameters:
y – Training data (target)
X – Optional features (if None, uses y)
- Returns:
Self for method chaining
- class anomsmith.LSTMAutoencoderDetector(window_size: int = 20, lstm_units: list[int] | None = None, contamination: float = 0.05, threshold_std: float = 3.0, epochs: int = 50, batch_size: int = 32, random_state: int | None = None)[source]
Bases:
BaseDetectorLSTM autoencoder: high reconstruction error ⇒ anomaly (univariate only).
- fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | None = None) LSTMAutoencoderDetector[source]
Fit the estimator.
- Parameters:
y – Target values
X – Optional features
- Returns:
Self for method chaining
- predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]
Predict anomaly labels.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary anomaly labels
- score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.ModelPerformanceTracker(window_size: int = 1000, model_name: str | None = None)[source]
Bases:
objectTrack model performance over time for monitoring and alerting.
Maintains a rolling window of performance metrics and can detect degradation or drift.
- window_size
Number of recent predictions to keep in window
- metrics_history
DataFrame with historical metrics
- detect_degradation(baseline_metrics: dict[str, float], threshold: float = 0.1) bool[source]
Detect if performance has degraded compared to baseline.
- Parameters:
baseline_metrics – Baseline metrics (e.g., from training)
threshold – Relative degradation threshold (default 0.1 = 10%)
- Returns:
True if degradation detected
- get_current_metrics() dict[str, float][source]
Get current performance metrics.
- Returns:
Dictionary with latest metrics
- update(scores: ndarray | Series, predicted_labels: ndarray | Series, true_labels: ndarray | Series | None = None, timestamp: datetime | None = None) dict[str, float][source]
Update tracker with new predictions.
- Parameters:
scores – Anomaly scores
predicted_labels – Predicted binary labels
true_labels – Optional ground truth labels
timestamp – Optional timestamp for this update
- Returns:
Current performance metrics
- class anomsmith.PCADetector(n_components: float | int = 0.95, score_method: Literal['reconstruction', 'mahalanobis', 'both'] = 'reconstruction', contamination: float = 0.05, random_state: int | None = None)[source]
Bases:
BaseDetectorPCA-based anomaly detector.
Uses Principal Component Analysis to model healthy operation boundaries. Anomalies are detected using either: - Mahalanobis distance in the principal component space - Reconstruction error (difference between original and reconstructed data)
- Parameters:
n_components – Number of components to keep. If 0 < n_components < 1, select the number of components such that the amount of variance that needs to be explained is greater than the percentage specified.
score_method – Method for computing anomaly scores: - ‘reconstruction’: Use reconstruction error - ‘mahalanobis’: Use Mahalanobis distance in PC space - ‘both’: Use both and return average
contamination – Expected proportion of outliers in the data (used for threshold)
random_state – Random state for reproducibility
- fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | None = None) PCADetector[source]
Fit the PCA detector on healthy operation data.
- Parameters:
y – Training data (target)
X – Optional features (if None, uses y)
- Returns:
Self for method chaining
- class anomsmith.PanelLike(*args, **kwargs)[source]
Bases:
ProtocolProtocol for panel-like data: DataFrame with entity key plus time index.
Can be a DataFrame with MultiIndex (entity, time) or a regular DataFrame with an entity column and time index.
- columns: Index
- index: DatetimeIndex | MultiIndex | Index
- class anomsmith.PredictiveMaintenanceSystem(feature_extractor: FeatureExtractor | None = None, rul_estimator: RULEstimator | None = None, failure_classifier: FailureClassifier | None = None, alert_system: AlertSystem | None = None, anomaly_detector: BaseDetector | None = None)[source]
Bases:
objectComplete predictive maintenance system integrating all components.
- process(data: ndarray | Series | DataFrame, timestamp: datetime | None = None, asset_id: str | None = None, return_features: bool = False) dict[str, Any][source]
Process new data and generate predictions/alerts.
Parameters
- dataarray-like
Time series data to process.
- timestampdatetime, optional
Timestamp for the data.
- asset_idstr, optional
Asset identifier.
- return_featuresbool, default=False
Whether to return extracted features.
Returns
- resultsdict
Dictionary containing: - ‘features’: extracted features (if return_features=True) - ‘rul’: predicted RUL - ‘failure_probability’: probability of failure - ‘failure_prediction’: binary failure prediction - ‘anomaly_score’: anomaly score from
anomsmith.primitives.base.BaseDetector.score()- ‘anomaly_prediction’:0(normal) or1(anomaly) fromLabelViewlabels - ‘alerts’: list of alerts
- class anomsmith.PyTorchAutoencoderDetector(window_size: int = 24, hidden_dims: list[int] | None = None, learning_rate: float = 0.001, epochs: int = 200, batch_size: int = 32, threshold_std: float = 3.0, random_state: int | None = None)[source]
Bases:
BaseDetectorFeedforward autoencoder on sliding windows (PyTorch, univariate only).
- fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | None = None) PyTorchAutoencoderDetector[source]
Fit the estimator.
- Parameters:
y – Target values
X – Optional features
- Returns:
Self for method chaining
- predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]
Predict anomaly labels.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary anomaly labels
- score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.RULEstimator(method: str = 'regression', n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]
Bases:
objectEstimate Remaining Useful Life (RUL) for assets.
- fit(X: ndarray | DataFrame, y: ndarray | Series, degradation_threshold: float | None = None)[source]
Fit the RUL estimator.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix (e.g., from FeatureExtractor).
- yarray-like of shape (n_samples,)
RUL values (time until failure) or degradation values.
- degradation_thresholdfloat, optional
Threshold for degradation-based method. If provided, converts degradation values to RUL.
- model_: RandomForestRegressor | None
- predict(X: ndarray | DataFrame) ndarray[source]
Predict RUL for new data.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
Returns
- rulndarray of shape (n_samples,)
Predicted RUL values.
- scaler_: StandardScaler | None
- class anomsmith.RealTimeIngestion(pm_system: PredictiveMaintenanceSystem, window_size: int = 100, update_frequency: int | None = None)[source]
Bases:
objectReal-time data ingestion system for predictive maintenance.
- get_latest_results(asset_id: str, n: int = 1) list[dict[str, Any]][source]
Get latest processing results for an asset.
Parameters
- asset_idstr
Asset identifier.
- nint, default=1
Number of latest results to return.
Returns
- resultslist of dict
Latest results.
- ingest(data: float | ndarray | Series, asset_id: str, timestamp: datetime | None = None, sensor_name: str | None = None) dict[str, Any][source]
Ingest new data point and process if window is full.
Parameters
- datafloat, array-like, or Series
New sensor reading(s).
- asset_idstr
Asset identifier.
- timestampdatetime, optional
Timestamp for the data. Defaults to current time.
- sensor_namestr, optional
Name of sensor/feature. Required if data is scalar.
Returns
- resultsdict
Processing results if window is processed, else None.
- class anomsmith.RobustCovarianceDetector(contamination: float = 0.05, support_fraction: float = 0.8, random_state: int | None = None)[source]
Bases:
BaseDetectorRobust Covariance (Elliptic Envelope) anomaly detector.
Assumes that the data is Gaussian distributed and fits an elliptic envelope to the data.
- Parameters:
contamination – Expected proportion of outliers in the data
support_fraction – Proportion of points to be used as support
random_state – Random state for reproducibility
- fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) RobustCovarianceDetector[source]
Fit the Robust Covariance detector.
- Parameters:
y – Training data (target)
X – Optional features (if None, uses y)
- Returns:
Self for method chaining
- class anomsmith.RobustZScoreScorer(epsilon: float = 1e-08)[source]
Bases:
BaseScorerRobust Z-Score anomaly scorer.
Uses median and MAD for robust scaling, then computes absolute z-scores. Higher scores indicate more anomalous points.
- fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | None = None) RobustZScoreScorer[source]
Fit the scorer (no-op for this scorer).
- Parameters:
y – Target values (not used, kept for interface compatibility)
X – Optional features (not used)
- Returns:
Self for method chaining
- score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies using robust z-scores.
- Parameters:
y – Time series to score
- Returns:
ScoreView with absolute robust z-scores
- class anomsmith.ScoreCombiningEnsembleDetector(detectors: list[BaseDetector | BaseScorer], combination_method: str = 'mean', score_percentile: float = 95.0, random_state: int | None = None)[source]
Bases:
BaseDetectorCombine scores from multiple detectors/scorers (mean, max, min, or median).
Replaces the former toolkit
EnsembleDetectorscore-combination path: labels are produced by thresholding the combined score at a fixed percentile. For hard voting over member predictions, useVotingEnsembleDetectorinstead.- fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) ScoreCombiningEnsembleDetector[source]
Fit the estimator.
- Parameters:
y – Target values
X – Optional features
- Returns:
Self for method chaining
- predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]
Predict anomaly labels.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary anomaly labels
- score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.SeasonalBaselineScorer(seasonality: Literal['week', 'month', 'day', 'hour'] = 'week', random_state: int | None = None)[source]
Bases:
BaseScorerSeasonal baseline anomaly scorer.
Calculates seasonal baselines (e.g., weekly, monthly) and scores points that deviate significantly from expected seasonal patterns.
- Parameters:
seasonality – Seasonality to use. Options: ‘week’, ‘month’, ‘day’, ‘hour’.
random_state – Random state for reproducibility (not used, kept for compatibility)
- fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | PanelLike | None = None) SeasonalBaselineScorer[source]
Fit the scorer by computing seasonal baselines.
- Parameters:
y – Time series with datetime index
X – Optional features (not used)
- Returns:
Self for method chaining
- score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies using seasonal baseline.
- Parameters:
y – Time series to score
- Returns:
ScoreView with seasonal z-scores
- class anomsmith.SeriesLike(*args, **kwargs)[source]
Bases:
ProtocolProtocol for series-like data: pandas Series or single-column DataFrame.
Must have a datetime or integer index.
- index: DatetimeIndex | Index
- class anomsmith.ThresholdRule(method: Literal['absolute', 'quantile'], value: float, quantile: float | None = None)[source]
Bases:
objectRule for thresholding anomaly scores.
- method
‘absolute’ (use value directly) or ‘quantile’ (use quantile)
- Type:
Literal[‘absolute’, ‘quantile’]
- anomsmith.VotingEnsemble
alias of
VotingEnsembleDetector
- class anomsmith.VotingEnsembleDetector(detectors: list[BaseDetector | BaseScorer], voting_threshold: int = 2, random_state: int | None = None)[source]
Bases:
BaseDetectorVoting ensemble that combines predictions from multiple detectors.
An anomaly is flagged if at least voting_threshold detectors agree.
- Parameters:
detectors – List of anomaly detectors or scorers to ensemble
voting_threshold – Minimum number of detectors that must flag a sample as anomalous
random_state – Random state for reproducibility (not used, kept for compatibility)
- fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) VotingEnsembleDetector[source]
Fit all detectors in the ensemble.
- Parameters:
y – Training time series
X – Optional features (not used)
- Returns:
Self for method chaining
- get_vote_counts(y: np.ndarray | pd.Series | SeriesLike) np.ndarray[source]
Get vote counts for each sample.
- Parameters:
y – Time series to analyze
- Returns:
Array of vote counts (number of detectors that flagged each sample as anomalous)
- predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]
Predict anomalies using voting.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary labels
- score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]
Compute ensemble scores as mean of individual detector scores.
- Parameters:
y – Time series to score
- Returns:
ScoreView with average anomaly scores
- class anomsmith.WaveletDenoiser(wavelet: str = 'db4', threshold_mode: str = 'soft', level: int = 5)[source]
Bases:
objectWavelet soft/hard thresholding denoising (requires PyWavelets).
Useful as a preprocessing step before scoring or for visualization. This is not a
BaseDetector; it only returns a denoised array.
- class anomsmith.WaveletDetector(wavelet: str = 'db4', threshold_factor: float = 3.0, level: int = 5, random_state: int | None = None)[source]
Bases:
BaseDetectorWavelet-based anomaly detector for time series.
Detects anomalies by identifying large coefficients in wavelet detail levels, which indicate sudden changes or anomalies.
- Parameters:
wavelet – Wavelet type (e.g., ‘db4’, ‘haar’, ‘bior2.2’). Default ‘db4’.
threshold_factor – Threshold factor for anomaly detection (in terms of MAD). Default 3.0.
level – Decomposition level. Default 5.
random_state – Random state for reproducibility (not used, kept for compatibility)
- fit(y: np.ndarray | pd.Series | SeriesLike, X: np.ndarray | pd.DataFrame | PanelLike | None = None) WaveletDetector[source]
Fit the wavelet detector.
- Parameters:
y – Time series data (1D)
X – Optional features (not used)
- Returns:
Self for method chaining
- predict(y: np.ndarray | pd.Series | SeriesLike) LabelView[source]
Predict anomaly labels.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary labels
- score(y: np.ndarray | pd.Series | SeriesLike) ScoreView[source]
Score anomalies using wavelet decomposition.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.ZScoreScorer(n_std: float = 3.0, random_state: int | None = None)[source]
Bases:
BaseScorerZ-score based anomaly scorer.
Computes absolute Z-scores relative to mean and standard deviation. Higher scores indicate more anomalous points.
- Parameters:
n_std – Number of standard deviations (used for thresholding, not scoring)
random_state – Random state for reproducibility (not used, kept for compatibility)
- fit(y: ndarray | Series, X: ndarray | DataFrame | None = None) ZScoreScorer[source]
Fit the scorer by computing mean and standard deviation.
- Parameters:
y – Training data
X – Optional features (not used)
- Returns:
Self for method chaining
- score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies using Z-scores.
- Parameters:
y – Time series to score
- Returns:
ScoreView with absolute Z-scores
- anomsmith.add_degradation_rates(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', periods: list[int] | None = None) DataFrame[source]
Add degradation rate features (rate of change) for feature columns.
Parameters
- dfDataFrame
DataFrame with asset and feature columns.
- feature_colslist of str
Feature column names to compute degradation rates for.
- asset_id_colstr, default=’asset_id’
Column name for asset identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step.
- periodslist of int, optional
Periods for rate of change calculation. Default: [1, 3, 5].
Returns
- dfDataFrame
DataFrame with added degradation rate columns.
- anomsmith.add_rolling_statistics(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', window: int = 5, stats: list[str] | None = None) DataFrame[source]
Add rolling window statistics for feature columns, grouped by asset.
Parameters
- dfDataFrame
DataFrame with asset and feature columns.
- feature_colslist of str
Feature column names to compute rolling statistics for.
- asset_id_colstr, default=’asset_id’
Column name for asset identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step (used for sorting).
- windowint, default=5
Rolling window size.
- statslist of str, optional
Statistics to compute. Default: [‘mean’, ‘std’, ‘min’, ‘max’].
Returns
- dfDataFrame
DataFrame with added rolling statistic columns.
- anomsmith.aggregate_metrics_for_cloudwatch(metrics_list: list[dict[str, float]], namespace: str = 'AnomalyDetection', model_name: str | None = None, timestamp: datetime | None = None) list[dict[str, Any]][source]
Format metrics for AWS CloudWatch PutMetricData API.
Aggregates multiple metric dictionaries into CloudWatch format.
- Parameters:
metrics_list – List of metric dictionaries from compute_performance_metrics
namespace – CloudWatch namespace (default “AnomalyDetection”)
model_name – Optional model name for dimension
timestamp – Optional timestamp (default: now)
- Returns:
List of CloudWatch metric data dictionaries
Examples
>>> metrics = [compute_performance_metrics(y1, pred1), ...] >>> cw_metrics = aggregate_metrics_for_cloudwatch(metrics, model_name="IsolationForest") >>> cloudwatch.put_metric_data( ... Namespace="AnomalyDetection", ... MetricData=cw_metrics ... )
- anomsmith.apply_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) DataFrame[source]
Apply decision policy to health states.
- Parameters:
health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)
previous_states – Previous health states for transition detection (optional)
intervene_cost – Cost of intervention action (default 100)
review_cost – Cost of review action (default 30)
wait_cost – Cost of wait action (default 0)
base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])
intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)
review_risk_reduction – Risk reduction factor for review (default 0.75)
- Returns:
pandas DataFrame with health_states, actions, costs, and risks
Examples
>>> import pandas as pd >>> states = pd.Series([0, 0, 1, 2, 2]) >>> result = apply_policy(states) >>> result['action'].values array([0, 0, 1, 2, 2])
- anomsmith.assess_asset_health(sensor_data: DataFrame, asset_ids: Series | None = None, feature_cols: list[str] | None = None, failure_labels: Series | ndarray | None = None, use_classification: bool = True, use_anomaly_detection: bool = True, contamination: float = 0.05, n_estimators: int = 100, isolation_n_estimators: int = 200, random_state: int | None = None, *, risk_proba_warning_threshold: float = 0.5, risk_proba_distress_threshold: float = 0.8, classification_weight: float = 0.6, anomaly_weight: float = 0.4) DataFrame[source]
Assess asset health using classification and anomaly detection.
Combines failure risk classification with anomaly detection to provide comprehensive asset health assessment. Results can be used to prioritize maintenance actions.
- Parameters:
sensor_data – DataFrame with sensor readings (columns are features, rows are assets)
asset_ids – Optional Series of asset IDs (defaults to sensor_data index)
feature_cols – Optional list of feature column names (defaults to all numeric columns)
failure_labels – Optional binary labels for training classifier (1 = failure, 0 = healthy)
use_classification – Whether to use failure risk classification (default True)
use_anomaly_detection – Whether to use anomaly detection (default True)
contamination – Expected proportion of anomalies (see
DEFAULT_OUTLIER_CONTAMINATION)n_estimators – Number of trees for Random Forest (see
DEFAULT_RANDOM_FOREST_N_ESTIMATORS)isolation_n_estimators – Number of trees for Isolation Forest when anomaly detection is on (see
DEFAULT_ISOLATION_FOREST_N_ESTIMATORS).random_state – Random state for reproducibility
risk_proba_warning_threshold – Min failure probability for warning health state when using classification (default from
anomsmith.constants).risk_proba_distress_threshold – Min failure probability for distress state (must exceed warning threshold; enforced by
FailureRiskClassifier).classification_weight – Weight on normalized classification risk in
combined_riskwhen both classification and anomaly detection run (must sum to 1 withanomaly_weight).anomaly_weight – Weight on normalized anomaly score in
combined_risk.
- Returns:
asset_id: Asset identifier
failure_risk: Probability of failure (if classification used)
health_state: Predicted health state (0=Healthy, 1=Warning, 2=Distress)
is_anomaly: Binary anomaly flag (if anomaly detection used)
anomaly_score: Anomaly score (if anomaly detection used)
combined_risk: Combined risk score (higher = more urgent)
- Return type:
DataFrame with columns
Examples
>>> import pandas as pd >>> import numpy as np >>> sensor_data = pd.DataFrame({ ... 'temperature': [60, 65, 70, 80], ... 'vibration': [0.2, 0.25, 0.3, 0.4], ... 'pressure': [25, 24, 23, 20] ... }) >>> result = assess_asset_health(sensor_data) >>> result.head()
- anomsmith.assess_health_with_pca(X: ndarray | DataFrame, detector: PCADetector, healthy_threshold: float, warning_threshold: float, index: Index | None = None) DataFrame[source]
Assess equipment health using PCA and Mahalanobis distance.
Complete workflow for PCA-based predictive maintenance: 1. Compute Mahalanobis distance from healthy center 2. Classify health states based on distance thresholds 3. Return results as a DataFrame for easy tracking
- Parameters:
X – Feature matrix (n_samples, n_features) with sensor readings
detector – Fitted PCADetector (must use score_method=’mahalanobis’)
healthy_threshold – Distance threshold for Healthy state
warning_threshold – Distance threshold for Warning state
index – Optional index for the results
- Returns:
‘mahalanobis_distance’, ‘health_state’
- Return type:
DataFrame with columns
Examples
>>> detector = PCADetector(n_components=3, score_method='mahalanobis') >>> detector.fit(X_train) # Fit on healthy operation data >>> # Set thresholds based on training data >>> healthy_threshold = np.percentile(detector.score(X_train).scores, 75) >>> warning_threshold = np.percentile(detector.score(X_train).scores, 95) >>> health_df = assess_health_with_pca( ... X_monitor, detector, healthy_threshold, warning_threshold ... ) >>> # Track health over time >>> critical_units = health_df[health_df['health_state'] == 2]
- anomsmith.backtest_detector(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule, labels: Series | ndarray | SeriesLike | None = None, n_splits: int = 5, min_train_size: int = 10) DataFrame[source]
Run backtest of detector across expanding windows.
- Parameters:
y – Time series to backtest on
detector – BaseDetector or BaseScorer instance
threshold_rule – ThresholdRule to apply
labels – Optional ground truth labels
n_splits – Number of splits
min_train_size – Minimum training set size
- Returns:
fold, precision, recall, f1, avg_run_length
- Return type:
pandas DataFrame with columns
- anomsmith.batch_predict(data_iterator: Iterator[ndarray | Series | DataFrame], detector: BaseDetector) Iterator[tuple[LabelView, ScoreView]][source]
Predict anomalies in batches for efficient processing.
- Parameters:
data_iterator – Iterator yielding batches of time series data
detector – Fitted BaseDetector instance
- Yields:
Tuple of (LabelView, ScoreView) for each batch
Examples
>>> detector = IsolationForestDetector(contamination=0.05) >>> detector.fit(X_train) >>> for labels, scores in batch_predict(data_stream(), detector): ... process_predictions(labels, scores)
- anomsmith.batch_score(data_iterator: Iterator[ndarray | Series | DataFrame], scorer: BaseScorer) Iterator[ScoreView][source]
Score anomalies in batches for efficient processing of large datasets.
Designed for stream processing (e.g., AWS Kinesis, S3 batch jobs) where data arrives in chunks.
- Parameters:
data_iterator – Iterator yielding batches of time series data
scorer – Fitted BaseScorer instance
- Yields:
ScoreView for each batch
Examples
>>> def data_stream(): ... for i in range(0, 10000, 1000): ... yield pd.Series(np.random.randn(1000), index=pd.date_range(start=f"2024-01-01", periods=1000, freq="H") + pd.Timedelta(hours=i)) >>> scorer = RobustZScoreScorer() >>> scorer.fit(y_train) >>> for batch_scores in batch_score(data_stream(), scorer): ... process_scores(batch_scores)
- anomsmith.calculate_confusion_matrix_metrics(predictions: ndarray, y_true: ndarray) dict[str, int][source]
Confusion matrix counts with
1= predicted / true anomaly.
- anomsmith.calculate_lead_time(predictions: ndarray, true_labels: ndarray, timestamps: ndarray | None = None) dict[str, float | int][source]
Lead time between anomaly detections and failure events.
- Parameters:
predictions – Detector labels (
1= anomaly,0= normal).true_labels – Ground truth (
1= anomaly,0= normal).timestamps – Optional timestamps aligned to predictions.
- Returns:
Dictionary with mean/median/min/max lead time and early/late detection counts.
- anomsmith.calculate_rul(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', failure_cycle_col: str | None = None) Series[source]
Calculate Remaining Useful Life (RUL) for each record.
RUL is calculated as: max_cycle - current_cycle for each asset.
Parameters
- dfDataFrame
DataFrame with asset_id and cycle columns.
- asset_id_colstr, default=’asset_id’
Column name for asset/equipment identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step.
- failure_cycle_colstr, optional
Column name for failure cycle. If provided, uses this instead of max cycle.
Returns
- rulSeries
Remaining Useful Life for each record.
- anomsmith.classify_health_from_distance(distances: Series | ndarray | SeriesLike, healthy_threshold: float, warning_threshold: float, index: Index | None = None) HealthStateView[source]
Classify health states from Mahalanobis distance thresholds.
Maps Mahalanobis distance values to health states: - distance <= healthy_threshold: Healthy (0) - healthy_threshold < distance <= warning_threshold: Warning (1) - distance > warning_threshold: Critical/Distress (2)
This creates probabilistic zones of “normality” based on distance from the healthy center, minimizing false positives by having a wide decision space for normal operation.
- Parameters:
distances – Mahalanobis distance values (n_samples,)
healthy_threshold – Distance threshold for Healthy state
warning_threshold – Distance threshold for Warning state (must be > healthy_threshold)
index – Optional index for the health states
- Returns:
HealthStateView with classified health states
Examples
>>> distances = track_mahalanobis_distance(X_monitor, detector) >>> # Set thresholds based on training data (e.g., percentiles) >>> healthy_threshold = np.percentile(distances, 75) >>> warning_threshold = np.percentile(distances, 95) >>> health_states = classify_health_from_distance( ... distances, healthy_threshold, warning_threshold ... )
- anomsmith.compare_detectors(detectors: dict[str, BaseDetector], X: ndarray | DataFrame, y_true: ndarray, timestamps: ndarray | None = None) DataFrame[source]
Compare multiple fitted detectors side-by-side.
- anomsmith.compare_survival_models(models: dict[str, CoxSurvivalModel], X_test: ndarray | DataFrame, durations_test: ndarray | Series, events_test: ndarray | Series | None = None) DataFrame[source]
Compare multiple survival models.
Evaluates multiple survival models and returns comparison metrics.
- Parameters:
models – Dictionary mapping model names to fitted CoxSurvivalModel instances
X_test – Test feature matrix
durations_test – Test time-to-event values
events_test – Test event indicators, optional
- Returns:
DataFrame with comparison metrics (C-index, MAE, etc.) for each model
Examples
>>> models = { ... "CoxPH": cox_model, ... "LogisticHazard": lhaz_model, ... "DeepSurv": deepsurv_model ... } >>> comparison = compare_survival_models(models, X_test, durations_test, events_test) >>> print(comparison)
- anomsmith.compute_concordance_index(durations: ndarray | Series, risk_scores: ndarray | Series, events: ndarray | Series | None = None) float[source]
Compute concordance index (C-index) for survival model evaluation.
C-index measures how well a model ranks survival times. A score of 0.5 implies random ordering; 1.0 implies perfect prediction.
Uses lifelines if available, otherwise computes manually.
- Parameters:
durations – Actual time-to-event values (n_samples,)
risk_scores – Predicted risk scores (n_samples,) - higher = higher risk
events – Event indicators (1 = event occurred, 0 = censored), optional
- Returns:
C-index between 0.0 and 1.0
Examples
>>> c_index = compute_concordance_index(true_durations, risk_scores, events) >>> print(f"C-index: {c_index:.3f}")
- anomsmith.compute_pca_health_thresholds(X_train: ndarray | DataFrame, detector: PCADetector, healthy_percentile: float = 75.0, warning_percentile: float = 95.0) tuple[float, float][source]
Compute health state thresholds from training data.
Determines distance thresholds for health state classification based on percentiles of Mahalanobis distances in the training (healthy) data.
- Parameters:
X_train – Training data (should be healthy operation data)
detector – Fitted PCADetector (must use score_method=’mahalanobis’)
healthy_percentile – Percentile for healthy threshold (default 75.0)
warning_percentile – Percentile for warning threshold (default 95.0)
- Returns:
Tuple of (healthy_threshold, warning_threshold)
Examples
>>> detector = PCADetector(n_components=3, score_method='mahalanobis') >>> detector.fit(X_train) # Fit on healthy operation data >>> healthy_threshold, warning_threshold = compute_pca_health_thresholds( ... X_train, detector, healthy_percentile=75, warning_percentile=95 ... )
- anomsmith.compute_performance_metrics(true_labels: ndarray | Series, predicted_labels: ndarray | Series, scores: ndarray | Series | None = None) dict[str, float][source]
Compute comprehensive performance metrics for model monitoring.
Returns metrics suitable for CloudWatch, Prometheus, or similar monitoring systems.
- Parameters:
true_labels – Ground truth binary labels (0 = normal, 1 = anomaly)
predicted_labels – Predicted binary labels
scores – Optional anomaly scores (for threshold-independent metrics)
- Returns:
precision: Precision score
recall: Recall score
f1: F1 score
true_positives: Number of true positives
false_positives: Number of false positives
false_negatives: Number of false negatives
true_negatives: Number of true negatives
anomaly_rate: Proportion of predicted anomalies
avg_run_length: Average length of anomaly runs (if scores provided)
- Return type:
Dictionary with metrics
Examples
>>> metrics = compute_performance_metrics(true_labels, pred_labels, scores) >>> # Send to CloudWatch >>> cloudwatch.put_metric_data( ... Namespace="AnomalyDetection", ... MetricData=[{"MetricName": "F1", "Value": metrics["f1"]}] ... )
- anomsmith.create_rul_labels(df: DataFrame, rul_col: str = 'RUL', warning_threshold: int = 30, critical_threshold: int = 15) DataFrame[source]
Create health status labels based on RUL values.
Parameters
- dfDataFrame
DataFrame with RUL column.
- rul_colstr, default=’RUL’
Column name for RUL values.
- warning_thresholdint, default=30
RUL threshold for warning state.
- critical_thresholdint, default=15
RUL threshold for critical state.
Returns
- dfDataFrame
DataFrame with added columns: - health_status: ‘healthy’, ‘warning’, ‘critical’, ‘failed’ - binary_label: 0 (healthy) or 1 (failure/warning/critical) - multi_class_label: 0 (healthy), 1 (warning), 2 (critical), 3 (failed)
- anomsmith.detect_anomalies(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule) DataFrame[source]
Detect anomalies in a time series.
- Parameters:
y – Time series to detect anomalies in
detector – BaseDetector or BaseScorer instance
threshold_rule – ThresholdRule to apply
- Returns:
pandas DataFrame with ‘score’ and ‘flag’ columns, indexed by y’s index
- anomsmith.detect_concept_drift(recent_scores: ndarray | Series, historical_scores: ndarray | Series, threshold: float = 2.0) dict[str, Any][source]
Detect concept drift in model scores.
Compares recent score distribution to historical distribution using statistical tests. Useful for triggering model retraining.
- Parameters:
recent_scores – Recent anomaly scores (last N samples)
historical_scores – Historical anomaly scores (training/baseline period)
threshold – Threshold for drift detection (default 2.0 std devs)
- Returns:
drift_detected: Boolean indicating if drift detected
recent_mean: Mean of recent scores
historical_mean: Mean of historical scores
drift_magnitude: Difference in means normalized by historical std
ks_statistic: Kolmogorov-Smirnov test statistic (if scipy available)
p_value: P-value from KS test (if scipy available)
- Return type:
Dictionary with drift detection results
Examples
>>> drift_info = detect_concept_drift( ... recent_scores=model_scores[-1000:], ... historical_scores=baseline_scores ... ) >>> if drift_info["drift_detected"]: ... trigger_model_retraining()
- anomsmith.discretize_rul(rul: Series | ndarray | SeriesLike, healthy_threshold: float = 30.0, warning_threshold: float = 10.0) Series[source]
Discretize RUL values into health states.
Maps RUL values to health states: - RUL > healthy_threshold: Healthy (0) - warning_threshold < RUL <= healthy_threshold: Warning (1) - RUL <= warning_threshold: Distress (2)
- Parameters:
rul – Remaining Useful Life values
healthy_threshold – RUL threshold for Healthy state (default 30)
warning_threshold – RUL threshold for Warning state (default 10)
- Returns:
pandas Series with health states aligned to input index
Examples
>>> import pandas as pd >>> import numpy as np >>> rul = pd.Series([50, 25, 5, 0]) >>> states = discretize_rul(rul, healthy_threshold=30, warning_threshold=10) >>> states.values array([0, 1, 2, 2])
- anomsmith.evaluate_detector(detector: BaseDetector, X: ndarray | DataFrame, y_true: ndarray, scores: ndarray | None = None, timestamps: ndarray | None = None) dict[str, float | int][source]
Evaluate a fitted anomsmith detector on tabular test data.
- anomsmith.evaluate_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) dict[str, float][source]
Evaluate policy performance metrics.
- Parameters:
health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)
previous_states – Previous health states for transition detection (optional)
intervene_cost – Cost of intervention action (default 100)
review_cost – Cost of review action (default 30)
wait_cost – Cost of wait action (default 0)
base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])
intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)
review_risk_reduction – Risk reduction factor for review (default 0.75)
- Returns:
Dictionary with total_cost, total_risk, interventions, reviews, waits
Examples
>>> import pandas as pd >>> states = pd.Series([0, 0, 1, 2, 2]) >>> metrics = evaluate_policy(states) >>> metrics['total_cost'] 230.0
- anomsmith.evaluate_survival_model(surv_df: DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, risk_scores: ndarray | Series | None = None) dict[str, float][source]
Evaluate survival model performance.
Computes comprehensive metrics for survival model evaluation.
- Parameters:
surv_df – Survival function DataFrame (rows = time points, cols = samples)
durations – Actual time-to-event values (n_samples,)
events – Event indicators (1 = event occurred, 0 = censored), optional
risk_scores – Optional risk scores for C-index (if None, computed from survival)
- Returns:
c_index: Concordance index
mean_absolute_error: Mean absolute error in predicted vs actual durations
median_survival_error: Error in median survival predictions
- Return type:
Dictionary with evaluation metrics
Examples
>>> surv_df = model.predict_survival_function(X_test) >>> metrics = evaluate_survival_model(surv_df, durations_test, events_test) >>> print(f"C-index: {metrics['c_index']:.3f}")
- anomsmith.fit_survival_model_for_maintenance(X: ndarray | DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, model_type: str = 'logistic_hazard', **model_kwargs) CoxSurvivalModel[source]
Fit a survival model for predictive maintenance.
Convenience function that fits a survival model with sensible defaults for predictive maintenance use cases.
- Parameters:
X – Feature matrix (n_samples, n_features) - sensor readings
durations – Time-to-failure values (n_samples,)
events – Event indicators (1 = failure, 0 = censored), optional
model_type – Model type - ‘cox’ (lifelines), ‘logistic_hazard’, or ‘deepsurv’
**model_kwargs – Additional model parameters
- Returns:
Fitted survival model
Examples
>>> model = fit_survival_model_for_maintenance( ... X_train, durations_train, events_train, ... model_type="logistic_hazard", n_bins=50 ... )
- anomsmith.plot_comparison_metrics(comparison_df: DataFrame, metrics: list[str] | None = None, save_path: str | None = None)[source]
Create comparison chart for multiple detectors.
Parameters
- comparison_dfDataFrame
DataFrame from compare_detectors().
- metricslist of str, optional
Metrics to plot. Default: [‘precision’, ‘recall’, ‘f1’].
- save_pathstr, optional
Path to save the figure.
- anomsmith.plot_pca_boundary(detector: PCADetector, X: ndarray | DataFrame, y_true: ndarray | None = None, n_components_plot: int = 2, save_path: str | None = None)[source]
Visualize PCA boundary in 2D projection (anomsmith
PCADetector).
- anomsmith.plot_reconstruction_error(detector, X: ndarray | DataFrame, y_true: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]
Plot reconstruction error over time for LSTM or PCA detector.
Parameters
- detectorBaseDetector
Fitted detector (PCA or LSTM).
- Xarray-like
Data to plot.
- y_truendarray, optional
True labels for marking actual anomalies.
- timestampsndarray, optional
Timestamps for x-axis.
- save_pathstr, optional
Path to save the figure.
- anomsmith.plot_sensor_drift(sensor_data: ndarray | Series, predictions: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]
Visualize sensor drift with anomaly flags.
Parameters
- sensor_dataarray-like
Sensor readings over time.
- predictionsndarray, optional
Anomaly predictions (
1for anomaly,0for normal).- timestampsndarray, optional
Timestamps for x-axis.
- save_pathstr, optional
Path to save the figure.
- anomsmith.predict_health_states_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, healthy_threshold: float = 30.0, warning_threshold: float = 10.0, threshold: float = 0.5) HealthStateView[source]
Predict health states from survival model.
Converts survival model predictions to health states by: 1. Predicting RUL from survival model 2. Discretizing RUL into health states
- Parameters:
model – Fitted survival model
X – Feature matrix (n_samples, n_features)
healthy_threshold – RUL threshold for Healthy state (default 30)
warning_threshold – RUL threshold for Warning state (default 10)
threshold – Survival probability threshold for median RUL (default 0.5)
- Returns:
HealthStateView with predicted health states
Examples
>>> health_states = predict_health_states_from_survival( ... model, X_test, healthy_threshold=30, warning_threshold=10 ... )
- anomsmith.predict_rul_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, threshold: float = 0.5, index: Index | None = None) Series[source]
Predict Remaining Useful Life (RUL) from survival model.
Uses median survival time (where survival probability = threshold) as predicted RUL.
- Parameters:
model – Fitted survival model
X – Feature matrix (n_samples, n_features)
threshold – Survival probability threshold for median (default 0.5)
index – Optional row index for the returned Series (defaults to
X.indexfor DataFrame inputs, else apandas.RangeIndex)
- Returns:
Series of predicted RUL values
Examples
>>> rul_predictions = predict_rul_from_survival(survival_model, X_test) >>> health_states = predict_health_states_from_survival( ... survival_model, X_test, healthy_threshold=30, warning_threshold=10 ... )
- anomsmith.prepare_pm_features(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', feature_cols: list[str] | None = None, calculate_rul_flag: bool = True, add_labels: bool = True, add_rolling_stats: bool = True, include_degradation_rates: bool = False, rolling_window: int = 5, warning_threshold: int = 30, critical_threshold: int = 15, failure_cycle_col: str | None = None) DataFrame[source]
Prepare predictive maintenance features from raw sensor data.
This is a convenience function that combines: - RUL calculation - Health status labeling - Rolling statistics - Degradation rates
Parameters
- dfDataFrame
Input DataFrame with asset_id, cycle, and sensor/feature columns.
- asset_id_colstr, default=’asset_id’
Column name for asset identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step.
- feature_colslist of str, optional
Feature column names. If None, auto-detects (excludes asset_id, cycle, RUL, etc.).
- calculate_rul_flagbool, default=True
Whether to calculate RUL.
- add_labelsbool, default=True
Whether to add health status labels.
- add_rolling_statsbool, default=True
Whether to add rolling statistics.
- include_degradation_ratesbool, default=False
Whether to add degradation rate features.
- rolling_windowint, default=5
Window size for rolling statistics.
- warning_thresholdint, default=30
RUL threshold for warning state.
- critical_thresholdint, default=15
RUL threshold for critical state.
- failure_cycle_colstr, optional
Column name for failure cycle (if available).
Returns
- dfDataFrame
DataFrame with all engineered features.
- anomsmith.rank_assets_by_risk(asset_health: DataFrame, top_n: int | None = None) DataFrame[source]
Rank assets by combined risk score.
- Parameters:
asset_health – DataFrame from assess_asset_health()
top_n – Optional number of top assets to return (default None = all)
- Returns:
DataFrame ranked by combined_risk (highest first)
- anomsmith.score_anomalies(y: Series | ndarray | SeriesLike, scorer: BaseScorer) Series[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
scorer – BaseScorer instance
- Returns:
pandas Series of anomaly scores with same index as y
- anomsmith.sweep_thresholds(y: Series | ndarray | SeriesLike, scorer: BaseScorer, threshold_values: list[float] | ndarray, labels: Series | ndarray | SeriesLike | None = None) DataFrame[source]
Evaluate multiple threshold values and return metrics.
- Parameters:
y – Time series to score
scorer – BaseScorer instance
threshold_values – List of threshold values to evaluate
labels – Optional ground truth labels
- Returns:
threshold, precision, recall, f1 (metrics are NaN if labels not provided)
- Return type:
pandas DataFrame with columns
- anomsmith.track_mahalanobis_distance(X: ndarray | DataFrame, detector: PCADetector, index: Index | None = None) Series[source]
Track Mahalanobis distance over time as a single metric.
Computes Mahalanobis distance from the “normal” center in PCA space for each time point. This provides a single metric that can be tracked as a time series to monitor equipment health drift.
Delegates scoring to
PCADetector.score()so Mahalanobis math stays in the primitive layer (single implementation).- Parameters:
X – Feature matrix (n_samples, n_features) with sensor readings
detector – Fitted PCADetector (fitted detector with PCA and mean/covariance computed)
index – Optional index for the resulting Series
- Returns:
pandas Series with Mahalanobis distance values, indexed by time
Examples
>>> detector = PCADetector(n_components=3, score_method='mahalanobis') >>> detector.fit(X_train) # Fit on healthy operation data >>> distances = track_mahalanobis_distance(X_monitor, detector) >>> # Track distance over time to detect drift
Objects
Layer 1: Data and representations.
This layer uses timesmith’s SeriesLike and PanelLike types for time series data. ScoreView and LabelView are kept for anomaly-specific outputs. No domain libraries (sklearn, matplotlib, etc.) are imported here. Only numpy and pandas are allowed.
- class anomsmith.objects.Action(*values)[source]
Bases:
IntEnumAction categories for decision policies.
- INTERVENE = 2
- REVIEW = 1
- WAIT = 0
- class anomsmith.objects.ActionView(index: Index, actions: ndarray)[source]
Bases:
objectAction labels aligned to time series index.
- index
Time series index
- Type:
pandas.core.indexes.base.Index
- actions
Action values (0=wait, 1=review, 2=intervene)
- Type:
- index: Index
- class anomsmith.objects.HealthState(*values)[source]
Bases:
IntEnumHealth state categories for predictive maintenance.
States are ordered from healthy (0) to distressed (highest value).
- DISTRESS = 2
- HEALTHY = 0
- WARNING = 1
- class anomsmith.objects.HealthStateView(index: Index, states: ndarray)[source]
Bases:
objectHealth state labels aligned to time series index.
- index
Time series index
- Type:
pandas.core.indexes.base.Index
- states
Health state values (0=Healthy, 1=Warning, 2=Distress)
- Type:
- index: Index
- class anomsmith.objects.LabelView(index: Index, labels: ndarray)[source]
Bases:
objectImmutable view of binary anomaly labels aligned to an index.
- index
Time index (must match input series index)
- Type:
pandas.core.indexes.base.Index
- labels
Binary flags as 1D array (1 = anomaly, 0 = normal)
- Type:
- index: Index
- class anomsmith.objects.PanelLike(*args, **kwargs)[source]
Bases:
ProtocolProtocol for panel-like data: DataFrame with entity key plus time index.
Can be a DataFrame with MultiIndex (entity, time) or a regular DataFrame with an entity column and time index.
- columns: Index
- index: DatetimeIndex | MultiIndex | Index
- class anomsmith.objects.PolicyResult(health_states: HealthStateView, actions: ActionView, costs: ndarray, risks: ndarray)[source]
Bases:
objectResult of applying a decision policy.
- health_states
Predicted health states
- actions
Recommended actions
- costs
Action costs
- Type:
- risks
Failure risks after actions
- Type:
- actions: ActionView
- health_states: HealthStateView
- class anomsmith.objects.ScoreView(index: Index, scores: ndarray)[source]
Bases:
objectImmutable view of anomaly scores aligned to an index.
- index
Time index (must match input series index)
- Type:
pandas.core.indexes.base.Index
- scores
Anomaly scores as 1D array (higher = more anomalous)
- Type:
- index: Index
- class anomsmith.objects.SeriesLike(*args, **kwargs)[source]
Bases:
ProtocolProtocol for series-like data: pandas Series or single-column DataFrame.
Must have a datetime or integer index.
- index: DatetimeIndex | Index
- anomsmith.objects.SeriesView
alias of
SeriesLike
- class anomsmith.objects.WindowSpec(length: int, step: int = 1, alignment: Literal['left', 'right', 'center'] = 'right')[source]
Bases:
objectSpecification for sliding or expanding windows.
- alignment
‘left’ (start at beginning), ‘right’ (end at current), or ‘center’ (centered on current point)
- Type:
Literal[‘left’, ‘right’, ‘center’]
Primitives
Layer 2: Primitives.
This layer defines algorithm interfaces and thin utilities. It must not know about tasks or evaluation. Only numpy and pandas are allowed (no sklearn, matplotlib, etc.).
- class anomsmith.primitives.BaseDetector(**params: Any)[source]
Bases:
BaseEstimatorBase class for anomaly detectors.
Detectors produce both scores and binary labels.
- abstractmethod predict(y: ndarray | Series | SeriesLike) LabelView[source]
Predict anomaly labels.
- Parameters:
y – Time series to detect anomalies in
- Returns:
LabelView with binary anomaly labels
- abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.primitives.BaseEstimator(**params: Any)[source]
Bases:
BaseObjectBase class for estimators with fit and fitted state.
- _fitted
Whether the estimator has been fitted
- abstractmethod fit(y: ndarray | Series | SeriesLike, X: ndarray | DataFrame | None = None) BaseEstimator[source]
Fit the estimator.
- Parameters:
y – Target values
X – Optional features
- Returns:
Self for method chaining
- class anomsmith.primitives.BaseObject(**params: Any)[source]
Bases:
ABCBase class for all primitives with parameter management.
Provides get_params, set_params, clone, and repr methods.
- clone() BaseObject[source]
Create a deep copy of this object.
- Returns:
Deep copy of this object
- get_params(deep: bool = True) dict[str, Any][source]
Get parameters for this object.
- Parameters:
deep – If True, return deep copy of parameters
- Returns:
Dictionary of parameter names to values
- set_params(**params: Any) BaseObject[source]
Set parameters for this object.
- Parameters:
**params – Parameters to set
- Returns:
Self for method chaining
- class anomsmith.primitives.BaseScorer(**params: Any)[source]
Bases:
BaseEstimatorBase class for anomaly scorers.
Scorers assign anomaly scores to time series points. Higher scores indicate more anomalous points.
- abstractmethod score(y: ndarray | Series | SeriesLike) ScoreView[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
- Returns:
ScoreView with anomaly scores
- class anomsmith.primitives.ThresholdRule(method: Literal['absolute', 'quantile'], value: float, quantile: float | None = None)[source]
Bases:
objectRule for thresholding anomaly scores.
- method
‘absolute’ (use value directly) or ‘quantile’ (use quantile)
- Type:
Literal[‘absolute’, ‘quantile’]
- anomsmith.primitives.apply_threshold(score_view: ScoreView, rule: ThresholdRule) LabelView[source]
Apply threshold rule to scores to produce binary labels.
- Parameters:
score_view – ScoreView with anomaly scores
rule – ThresholdRule to apply
- Returns:
LabelView with binary labels (1 = anomaly, 0 = normal)
- anomsmith.primitives.export_model_for_sagemaker(model: BaseEstimator, s3_path: str, metadata: dict[str, Any] | None = None, local_path: str | Path | None = None) dict[str, Any][source]
Export model in format ready for AWS SageMaker deployment.
Creates a model package that can be uploaded to S3 and deployed as a SageMaker endpoint. The model is saved locally first, then S3 upload instructions are returned.
- Parameters:
model – An anomsmith estimator to export
s3_path – S3 path where model will be uploaded (e.g., “s3://bucket/models/v1/”)
metadata – Optional metadata for deployment
local_path – Local path to save model (default: temp directory)
- Returns:
local_path: Local path where model was saved
s3_path: S3 path for upload
upload_command: AWS CLI command to upload
inference_code_template: Template for SageMaker inference script
- Return type:
Dictionary with export information including
Examples
>>> export_info = export_model_for_sagemaker( ... model, "s3://my-bucket/models/anomaly-detector/v1.0" ... ) >>> print(export_info["upload_command"])
- anomsmith.primitives.load_model(path: str | Path) BaseEstimator[source]
Load an anomsmith model from disk.
Warning
Models are loaded using pickle. Only load from trusted sources. Unpickling data from untrusted origins can execute arbitrary code.
- Parameters:
path – Directory path where model was saved
- Returns:
Loaded model instance
- Raises:
FileNotFoundError – If model files not found
ValueError – If model cannot be loaded
Examples
>>> model = load_model("models/robust_zscore_v1") >>> scores = model.score(y_test)
- anomsmith.primitives.robust_zscore(values: ndarray, epsilon: float = 1e-08) ndarray[source]
Compute robust z-scores using median and MAD.
Uses median as center and Median Absolute Deviation (MAD) as scale. Includes epsilon guard to prevent division by zero.
- Parameters:
values – Input values to scale
epsilon – Small value to prevent division by zero
- Returns:
Robust z-scores (same shape as input)
- anomsmith.primitives.save_model(model: BaseEstimator, path: str | Path, metadata: dict[str, Any] | None = None) None[source]
Save an anomsmith model to disk for deployment.
Saves the model’s state, parameters, and metadata in a format suitable for cloud deployment (e.g., AWS SageMaker, containerized endpoints).
- Parameters:
model – An anomsmith estimator (BaseScorer, BaseDetector, etc.)
path – Directory path where model will be saved
metadata – Optional metadata dict (model version, training date, etc.)
- Raises:
ValueError – If model is not fitted
OSError – If path cannot be created
Examples
>>> from anomsmith.primitives.scorers.robust_zscore import RobustZScoreScorer >>> scorer = RobustZScoreScorer() >>> scorer.fit(y_train) >>> save_model(scorer, "models/robust_zscore_v1", metadata={"version": "1.0"})
Workflows
Layer 4: Workflows.
Workflows provide the public entry points users call. Workflows can import matplotlib only if plots are added (not in first pass).
- class anomsmith.workflows.ModelPerformanceTracker(window_size: int = 1000, model_name: str | None = None)[source]
Bases:
objectTrack model performance over time for monitoring and alerting.
Maintains a rolling window of performance metrics and can detect degradation or drift.
- window_size
Number of recent predictions to keep in window
- metrics_history
DataFrame with historical metrics
- detect_degradation(baseline_metrics: dict[str, float], threshold: float = 0.1) bool[source]
Detect if performance has degraded compared to baseline.
- Parameters:
baseline_metrics – Baseline metrics (e.g., from training)
threshold – Relative degradation threshold (default 0.1 = 10%)
- Returns:
True if degradation detected
- get_current_metrics() dict[str, float][source]
Get current performance metrics.
- Returns:
Dictionary with latest metrics
- update(scores: ndarray | Series, predicted_labels: ndarray | Series, true_labels: ndarray | Series | None = None, timestamp: datetime | None = None) dict[str, float][source]
Update tracker with new predictions.
- Parameters:
scores – Anomaly scores
predicted_labels – Predicted binary labels
true_labels – Optional ground truth labels
timestamp – Optional timestamp for this update
- Returns:
Current performance metrics
- anomsmith.workflows.aggregate_metrics_for_cloudwatch(metrics_list: list[dict[str, float]], namespace: str = 'AnomalyDetection', model_name: str | None = None, timestamp: datetime | None = None) list[dict[str, Any]][source]
Format metrics for AWS CloudWatch PutMetricData API.
Aggregates multiple metric dictionaries into CloudWatch format.
- Parameters:
metrics_list – List of metric dictionaries from compute_performance_metrics
namespace – CloudWatch namespace (default “AnomalyDetection”)
model_name – Optional model name for dimension
timestamp – Optional timestamp (default: now)
- Returns:
List of CloudWatch metric data dictionaries
Examples
>>> metrics = [compute_performance_metrics(y1, pred1), ...] >>> cw_metrics = aggregate_metrics_for_cloudwatch(metrics, model_name="IsolationForest") >>> cloudwatch.put_metric_data( ... Namespace="AnomalyDetection", ... MetricData=cw_metrics ... )
- anomsmith.workflows.aggregate_undirected_edges(communications: DataFrame, *, sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', drop_self_loops: bool = True) DataFrame[source]
Aggregate communication rows into undirected weighted edges.
Mirrors the aggregation in
org_network_analysisbusiness logic: each unique unordered pair(min(a,b), max(a,b))gets weight equal to the number of rows (communication events) between those endpoints.- Parameters:
communications – One row per event; must include sender/receiver columns.
sender_col – Column name for the sender endpoint (default
sender_id).receiver_col – Column name for the receiver endpoint (default
receiver_id).drop_self_loops – If True, rows where sender equals receiver are skipped.
- Returns:
DataFrame with columns
u,v,weight(integer counts), sorted byu,v.
- anomsmith.workflows.apply_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) DataFrame[source]
Apply decision policy to health states.
- Parameters:
health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)
previous_states – Previous health states for transition detection (optional)
intervene_cost – Cost of intervention action (default 100)
review_cost – Cost of review action (default 30)
wait_cost – Cost of wait action (default 0)
base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])
intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)
review_risk_reduction – Risk reduction factor for review (default 0.75)
- Returns:
pandas DataFrame with health_states, actions, costs, and risks
Examples
>>> import pandas as pd >>> states = pd.Series([0, 0, 1, 2, 2]) >>> result = apply_policy(states) >>> result['action'].values array([0, 0, 1, 2, 2])
- anomsmith.workflows.assess_asset_health(sensor_data: DataFrame, asset_ids: Series | None = None, feature_cols: list[str] | None = None, failure_labels: Series | ndarray | None = None, use_classification: bool = True, use_anomaly_detection: bool = True, contamination: float = 0.05, n_estimators: int = 100, isolation_n_estimators: int = 200, random_state: int | None = None, *, risk_proba_warning_threshold: float = 0.5, risk_proba_distress_threshold: float = 0.8, classification_weight: float = 0.6, anomaly_weight: float = 0.4) DataFrame[source]
Assess asset health using classification and anomaly detection.
Combines failure risk classification with anomaly detection to provide comprehensive asset health assessment. Results can be used to prioritize maintenance actions.
- Parameters:
sensor_data – DataFrame with sensor readings (columns are features, rows are assets)
asset_ids – Optional Series of asset IDs (defaults to sensor_data index)
feature_cols – Optional list of feature column names (defaults to all numeric columns)
failure_labels – Optional binary labels for training classifier (1 = failure, 0 = healthy)
use_classification – Whether to use failure risk classification (default True)
use_anomaly_detection – Whether to use anomaly detection (default True)
contamination – Expected proportion of anomalies (see
DEFAULT_OUTLIER_CONTAMINATION)n_estimators – Number of trees for Random Forest (see
DEFAULT_RANDOM_FOREST_N_ESTIMATORS)isolation_n_estimators – Number of trees for Isolation Forest when anomaly detection is on (see
DEFAULT_ISOLATION_FOREST_N_ESTIMATORS).random_state – Random state for reproducibility
risk_proba_warning_threshold – Min failure probability for warning health state when using classification (default from
anomsmith.constants).risk_proba_distress_threshold – Min failure probability for distress state (must exceed warning threshold; enforced by
FailureRiskClassifier).classification_weight – Weight on normalized classification risk in
combined_riskwhen both classification and anomaly detection run (must sum to 1 withanomaly_weight).anomaly_weight – Weight on normalized anomaly score in
combined_risk.
- Returns:
asset_id: Asset identifier
failure_risk: Probability of failure (if classification used)
health_state: Predicted health state (0=Healthy, 1=Warning, 2=Distress)
is_anomaly: Binary anomaly flag (if anomaly detection used)
anomaly_score: Anomaly score (if anomaly detection used)
combined_risk: Combined risk score (higher = more urgent)
- Return type:
DataFrame with columns
Examples
>>> import pandas as pd >>> import numpy as np >>> sensor_data = pd.DataFrame({ ... 'temperature': [60, 65, 70, 80], ... 'vibration': [0.2, 0.25, 0.3, 0.4], ... 'pressure': [25, 24, 23, 20] ... }) >>> result = assess_asset_health(sensor_data) >>> result.head()
- anomsmith.workflows.assess_health_with_pca(X: ndarray | DataFrame, detector: PCADetector, healthy_threshold: float, warning_threshold: float, index: Index | None = None) DataFrame[source]
Assess equipment health using PCA and Mahalanobis distance.
Complete workflow for PCA-based predictive maintenance: 1. Compute Mahalanobis distance from healthy center 2. Classify health states based on distance thresholds 3. Return results as a DataFrame for easy tracking
- Parameters:
X – Feature matrix (n_samples, n_features) with sensor readings
detector – Fitted PCADetector (must use score_method=’mahalanobis’)
healthy_threshold – Distance threshold for Healthy state
warning_threshold – Distance threshold for Warning state
index – Optional index for the results
- Returns:
‘mahalanobis_distance’, ‘health_state’
- Return type:
DataFrame with columns
Examples
>>> detector = PCADetector(n_components=3, score_method='mahalanobis') >>> detector.fit(X_train) # Fit on healthy operation data >>> # Set thresholds based on training data >>> healthy_threshold = np.percentile(detector.score(X_train).scores, 75) >>> warning_threshold = np.percentile(detector.score(X_train).scores, 95) >>> health_df = assess_health_with_pca( ... X_monitor, detector, healthy_threshold, warning_threshold ... ) >>> # Track health over time >>> critical_units = health_df[health_df['health_state'] == 2]
- anomsmith.workflows.backtest_detector(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule, labels: Series | ndarray | SeriesLike | None = None, n_splits: int = 5, min_train_size: int = 10) DataFrame[source]
Run backtest of detector across expanding windows.
- Parameters:
y – Time series to backtest on
detector – BaseDetector or BaseScorer instance
threshold_rule – ThresholdRule to apply
labels – Optional ground truth labels
n_splits – Number of splits
min_train_size – Minimum training set size
- Returns:
fold, precision, recall, f1, avg_run_length
- Return type:
pandas DataFrame with columns
- anomsmith.workflows.batch_predict(data_iterator: Iterator[ndarray | Series | DataFrame], detector: BaseDetector) Iterator[tuple[LabelView, ScoreView]][source]
Predict anomalies in batches for efficient processing.
- Parameters:
data_iterator – Iterator yielding batches of time series data
detector – Fitted BaseDetector instance
- Yields:
Tuple of (LabelView, ScoreView) for each batch
Examples
>>> detector = IsolationForestDetector(contamination=0.05) >>> detector.fit(X_train) >>> for labels, scores in batch_predict(data_stream(), detector): ... process_predictions(labels, scores)
- anomsmith.workflows.batch_score(data_iterator: Iterator[ndarray | Series | DataFrame], scorer: BaseScorer) Iterator[ScoreView][source]
Score anomalies in batches for efficient processing of large datasets.
Designed for stream processing (e.g., AWS Kinesis, S3 batch jobs) where data arrives in chunks.
- Parameters:
data_iterator – Iterator yielding batches of time series data
scorer – Fitted BaseScorer instance
- Yields:
ScoreView for each batch
Examples
>>> def data_stream(): ... for i in range(0, 10000, 1000): ... yield pd.Series(np.random.randn(1000), index=pd.date_range(start=f"2024-01-01", periods=1000, freq="H") + pd.Timedelta(hours=i)) >>> scorer = RobustZScoreScorer() >>> scorer.fit(y_train) >>> for batch_scores in batch_score(data_stream(), scorer): ... process_scores(batch_scores)
- anomsmith.workflows.classify_health_from_distance(distances: Series | ndarray | SeriesLike, healthy_threshold: float, warning_threshold: float, index: Index | None = None) HealthStateView[source]
Classify health states from Mahalanobis distance thresholds.
Maps Mahalanobis distance values to health states: - distance <= healthy_threshold: Healthy (0) - healthy_threshold < distance <= warning_threshold: Warning (1) - distance > warning_threshold: Critical/Distress (2)
This creates probabilistic zones of “normality” based on distance from the healthy center, minimizing false positives by having a wide decision space for normal operation.
- Parameters:
distances – Mahalanobis distance values (n_samples,)
healthy_threshold – Distance threshold for Healthy state
warning_threshold – Distance threshold for Warning state (must be > healthy_threshold)
index – Optional index for the health states
- Returns:
HealthStateView with classified health states
Examples
>>> distances = track_mahalanobis_distance(X_monitor, detector) >>> # Set thresholds based on training data (e.g., percentiles) >>> healthy_threshold = np.percentile(distances, 75) >>> warning_threshold = np.percentile(distances, 95) >>> health_states = classify_health_from_distance( ... distances, healthy_threshold, warning_threshold ... )
- anomsmith.workflows.compare_survival_models(models: dict[str, CoxSurvivalModel], X_test: ndarray | DataFrame, durations_test: ndarray | Series, events_test: ndarray | Series | None = None) DataFrame[source]
Compare multiple survival models.
Evaluates multiple survival models and returns comparison metrics.
- Parameters:
models – Dictionary mapping model names to fitted CoxSurvivalModel instances
X_test – Test feature matrix
durations_test – Test time-to-event values
events_test – Test event indicators, optional
- Returns:
DataFrame with comparison metrics (C-index, MAE, etc.) for each model
Examples
>>> models = { ... "CoxPH": cox_model, ... "LogisticHazard": lhaz_model, ... "DeepSurv": deepsurv_model ... } >>> comparison = compare_survival_models(models, X_test, durations_test, events_test) >>> print(comparison)
- anomsmith.workflows.compute_concordance_index(durations: ndarray | Series, risk_scores: ndarray | Series, events: ndarray | Series | None = None) float[source]
Compute concordance index (C-index) for survival model evaluation.
C-index measures how well a model ranks survival times. A score of 0.5 implies random ordering; 1.0 implies perfect prediction.
Uses lifelines if available, otherwise computes manually.
- Parameters:
durations – Actual time-to-event values (n_samples,)
risk_scores – Predicted risk scores (n_samples,) - higher = higher risk
events – Event indicators (1 = event occurred, 0 = censored), optional
- Returns:
C-index between 0.0 and 1.0
Examples
>>> c_index = compute_concordance_index(true_durations, risk_scores, events) >>> print(f"C-index: {c_index:.3f}")
- anomsmith.workflows.compute_pca_health_thresholds(X_train: ndarray | DataFrame, detector: PCADetector, healthy_percentile: float = 75.0, warning_percentile: float = 95.0) tuple[float, float][source]
Compute health state thresholds from training data.
Determines distance thresholds for health state classification based on percentiles of Mahalanobis distances in the training (healthy) data.
- Parameters:
X_train – Training data (should be healthy operation data)
detector – Fitted PCADetector (must use score_method=’mahalanobis’)
healthy_percentile – Percentile for healthy threshold (default 75.0)
warning_percentile – Percentile for warning threshold (default 95.0)
- Returns:
Tuple of (healthy_threshold, warning_threshold)
Examples
>>> detector = PCADetector(n_components=3, score_method='mahalanobis') >>> detector.fit(X_train) # Fit on healthy operation data >>> healthy_threshold, warning_threshold = compute_pca_health_thresholds( ... X_train, detector, healthy_percentile=75, warning_percentile=95 ... )
- anomsmith.workflows.compute_performance_metrics(true_labels: ndarray | Series, predicted_labels: ndarray | Series, scores: ndarray | Series | None = None) dict[str, float][source]
Compute comprehensive performance metrics for model monitoring.
Returns metrics suitable for CloudWatch, Prometheus, or similar monitoring systems.
- Parameters:
true_labels – Ground truth binary labels (0 = normal, 1 = anomaly)
predicted_labels – Predicted binary labels
scores – Optional anomaly scores (for threshold-independent metrics)
- Returns:
precision: Precision score
recall: Recall score
f1: F1 score
true_positives: Number of true positives
false_positives: Number of false positives
false_negatives: Number of false negatives
true_negatives: Number of true negatives
anomaly_rate: Proportion of predicted anomalies
avg_run_length: Average length of anomaly runs (if scores provided)
- Return type:
Dictionary with metrics
Examples
>>> metrics = compute_performance_metrics(true_labels, pred_labels, scores) >>> # Send to CloudWatch >>> cloudwatch.put_metric_data( ... Namespace="AnomalyDetection", ... MetricData=[{"MetricName": "F1", "Value": metrics["f1"]}] ... )
- anomsmith.workflows.detect_anomalies(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule) DataFrame[source]
Detect anomalies in a time series.
- Parameters:
y – Time series to detect anomalies in
detector – BaseDetector or BaseScorer instance
threshold_rule – ThresholdRule to apply
- Returns:
pandas DataFrame with ‘score’ and ‘flag’ columns, indexed by y’s index
- anomsmith.workflows.detect_concept_drift(recent_scores: ndarray | Series, historical_scores: ndarray | Series, threshold: float = 2.0) dict[str, Any][source]
Detect concept drift in model scores.
Compares recent score distribution to historical distribution using statistical tests. Useful for triggering model retraining.
- Parameters:
recent_scores – Recent anomaly scores (last N samples)
historical_scores – Historical anomaly scores (training/baseline period)
threshold – Threshold for drift detection (default 2.0 std devs)
- Returns:
drift_detected: Boolean indicating if drift detected
recent_mean: Mean of recent scores
historical_mean: Mean of historical scores
drift_magnitude: Difference in means normalized by historical std
ks_statistic: Kolmogorov-Smirnov test statistic (if scipy available)
p_value: P-value from KS test (if scipy available)
- Return type:
Dictionary with drift detection results
Examples
>>> drift_info = detect_concept_drift( ... recent_scores=model_scores[-1000:], ... historical_scores=baseline_scores ... ) >>> if drift_info["drift_detected"]: ... trigger_model_retraining()
- anomsmith.workflows.detect_network_edge_anomalies(edge_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]
Flag structurally unusual dyads using isolation forest on edge features.
Expects a frame such as the output of
edge_features_from_edges()(numeric columns only are used by default).- Raises:
ValueError – If fewer than two edges are present.
- anomsmith.workflows.detect_network_node_anomalies(node_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]
Flag structurally unusual nodes using isolation forest on feature rows.
Fits
IsolationForestDetectoron the numeric feature matrix and thresholds anomaly scores. Typical use: pass the output ofnode_features_from_edges()(options: join extra numeric columns before calling).- Parameters:
node_features – Rows are nodes (index = node id); values are features.
threshold_rule – Rule applied to isolation scores (higher = more anomalous).
feature_cols – Columns to use; default is all numeric columns in the frame.
contamination – Passed to
IsolationForest.n_estimators – Number of trees in the forest.
random_state – Optional RNG seed.
- Returns:
DataFrame with original feature columns plus
scoreandflag(1 = anomaly). Index matchesnode_features.- Raises:
ValueError – If fewer than two rows are present (isolation forest requires a batch to score relative to).
- anomsmith.workflows.detect_network_temporal_node_anomalies(touch_counts_by_bin: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]
Flag nodes whose time-bin activity vectors look unlike the rest.
Rows are nodes (index from
node_touch_counts_by_bin()). Columns should be numeric bin counts (any column names); by default all numeric columns are used as features.- Raises:
ValueError – If fewer than two nodes, no numeric columns, or any bin column contains non-finite values.
- anomsmith.workflows.discretize_rul(rul: Series | ndarray | SeriesLike, healthy_threshold: float = 30.0, warning_threshold: float = 10.0) Series[source]
Discretize RUL values into health states.
Maps RUL values to health states: - RUL > healthy_threshold: Healthy (0) - warning_threshold < RUL <= healthy_threshold: Warning (1) - RUL <= warning_threshold: Distress (2)
- Parameters:
rul – Remaining Useful Life values
healthy_threshold – RUL threshold for Healthy state (default 30)
warning_threshold – RUL threshold for Warning state (default 10)
- Returns:
pandas Series with health states aligned to input index
Examples
>>> import pandas as pd >>> import numpy as np >>> rul = pd.Series([50, 25, 5, 0]) >>> states = discretize_rul(rul, healthy_threshold=30, warning_threshold=10) >>> states.values array([0, 1, 2, 2])
- anomsmith.workflows.edge_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]
Per-edge (dyad) features derived from aggregated undirected weights.
Rows follow the
u,v,weighttable fromaggregate_undirected_edges(). Combines each edge weight with endpoint strengths fromnode_features_from_edges()to highlight unusually heavy links relative to endpoint activity.Columns:
weight: aggregated event count on the dyad.share_of_endpoint_volume:2 * weight / (deg(u) + deg(v))using endpointweighted_degreevalues (each edge’s weight is included in both degrees).log1p_weight:log1p(weight)for scale-robust modeling.
- Parameters:
edges – Non-empty edge list (typically aggregated counts).
nodes – Full node roster (same semantics as
node_features_from_edges()).u_col – Column names in
edges.v_col – Column names in
edges.weight_col – Column names in
edges.
- Returns:
DataFrame indexed by
MultiIndex(u, v)with numeric feature columns.- Raises:
ValueError – If
edgesis empty.
- anomsmith.workflows.evaluate_policy(health_states: Series | ndarray | HealthStateView, previous_states: Series | ndarray | HealthStateView | None = None, intervene_cost: float = 100.0, review_cost: float = 30.0, wait_cost: float = 0.0, base_risks: tuple[float, float, float] = (0.01, 0.1, 0.3), intervene_risk_reduction: float = 0.5, review_risk_reduction: float = 0.75) dict[str, float][source]
Evaluate policy performance metrics.
- Parameters:
health_states – Current health states (0=Healthy, 1=Warning, 2=Distress)
previous_states – Previous health states for transition detection (optional)
intervene_cost – Cost of intervention action (default 100)
review_cost – Cost of review action (default 30)
wait_cost – Cost of wait action (default 0)
base_risks – Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3])
intervene_risk_reduction – Risk reduction factor for intervention (default 0.5)
review_risk_reduction – Risk reduction factor for review (default 0.75)
- Returns:
Dictionary with total_cost, total_risk, interventions, reviews, waits
Examples
>>> import pandas as pd >>> states = pd.Series([0, 0, 1, 2, 2]) >>> metrics = evaluate_policy(states) >>> metrics['total_cost'] 230.0
- anomsmith.workflows.evaluate_survival_model(surv_df: DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, risk_scores: ndarray | Series | None = None) dict[str, float][source]
Evaluate survival model performance.
Computes comprehensive metrics for survival model evaluation.
- Parameters:
surv_df – Survival function DataFrame (rows = time points, cols = samples)
durations – Actual time-to-event values (n_samples,)
events – Event indicators (1 = event occurred, 0 = censored), optional
risk_scores – Optional risk scores for C-index (if None, computed from survival)
- Returns:
c_index: Concordance index
mean_absolute_error: Mean absolute error in predicted vs actual durations
median_survival_error: Error in median survival predictions
- Return type:
Dictionary with evaluation metrics
Examples
>>> surv_df = model.predict_survival_function(X_test) >>> metrics = evaluate_survival_model(surv_df, durations_test, events_test) >>> print(f"C-index: {metrics['c_index']:.3f}")
- anomsmith.workflows.fit_survival_model_for_maintenance(X: ndarray | DataFrame, durations: ndarray | Series, events: ndarray | Series | None = None, model_type: str = 'logistic_hazard', **model_kwargs) CoxSurvivalModel[source]
Fit a survival model for predictive maintenance.
Convenience function that fits a survival model with sensible defaults for predictive maintenance use cases.
- Parameters:
X – Feature matrix (n_samples, n_features) - sensor readings
durations – Time-to-failure values (n_samples,)
events – Event indicators (1 = failure, 0 = censored), optional
model_type – Model type - ‘cox’ (lifelines), ‘logistic_hazard’, or ‘deepsurv’
**model_kwargs – Additional model parameters
- Returns:
Fitted survival model
Examples
>>> model = fit_survival_model_for_maintenance( ... X_train, durations_train, events_train, ... model_type="logistic_hazard", n_bins=50 ... )
- anomsmith.workflows.node_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]
Per-node structural features for anomaly scoring.
Uses the same edge table shape produced by
aggregate_undirected_edges()(u,v,weight). Every id innodesappears in the index; nodes with no incident edges get zero strength, zero distinct-neighbor count, and uniform PageRank mass.Feature columns:
weighted_degree: sum of incident edge weights (communication volume).neighbor_count: number of distinct neighbors.pagerank: undirected PageRank (numpy power iteration; no NetworkX).
- Parameters:
edges – Edge list with endpoints and non-negative weights.
nodes – Complete roster of node identifiers (e.g. all team member ids).
u_col – Column names in
edges.v_col – Column names in
edges.weight_col – Column names in
edges.
- Returns:
DataFrame indexed by node id with numeric feature columns.
- anomsmith.workflows.node_graph_metrics_networkx(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]
Graph centrality metrics via NetworkX (optional dependency).
Installs with the
networkextra:pip install 'anomsmith[network]'.Builds an undirected graph: every id in
nodesis a vertex; edges fromedgescarry combined weights (parallel edges in the table should be pre-aggregated). Centrality matches common org-network dashboards: topology for betweenness and closeness; eigenvector uses edgeweight.Columns:
betweenness_centrality— NetworkXbetweenness_centrality(unweighted hops).closeness_centrality— NetworkXcloseness_centrality.eigenvector_centrality— weighted when convergence succeeds; else zeros.
- Parameters:
edges – Aggregated
u,v,weighttable (may be empty).nodes – Full roster; isolated members still appear with zeros.
u_col – Column names in
edges.v_col – Column names in
edges.weight_col – Column names in
edges.
- Returns:
DataFrame aligned to
nodeswith the three metric columns.- Raises:
ImportError – If NetworkX is not installed.
- anomsmith.workflows.node_touch_counts_by_bin(communications: DataFrame, nodes: Index | list[Any] | ndarray, *, timestamp_col: str = 'timestamp', sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', freq: str = '1D', drop_self_loops: bool = True) DataFrame[source]
Count how often each node sends or receives in each time bin.
Each communication row increments both the sender and the receiver for the floored period bucket (pandas offset string, e.g.
"1D","6H").- Parameters:
communications – Must include timestamp and endpoint columns.
nodes – Full roster; bins include only these ids (other endpoints dropped).
timestamp_col – Parseable timestamps (
pd.to_datetime).sender_col – Endpoint identifiers.
receiver_col – Endpoint identifiers.
freq – Bin size passed to
Series.dt.floor.drop_self_loops – If True, rows with sender equal receiver are skipped.
- Returns:
DataFrame with index = node id, columns = bin start (
datetime64), values = integer touch counts. Missing bins are zero; nodes with no events still appear as rows of zeros when listed innodes.
- anomsmith.workflows.predict_health_states_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, healthy_threshold: float = 30.0, warning_threshold: float = 10.0, threshold: float = 0.5) HealthStateView[source]
Predict health states from survival model.
Converts survival model predictions to health states by: 1. Predicting RUL from survival model 2. Discretizing RUL into health states
- Parameters:
model – Fitted survival model
X – Feature matrix (n_samples, n_features)
healthy_threshold – RUL threshold for Healthy state (default 30)
warning_threshold – RUL threshold for Warning state (default 10)
threshold – Survival probability threshold for median RUL (default 0.5)
- Returns:
HealthStateView with predicted health states
Examples
>>> health_states = predict_health_states_from_survival( ... model, X_test, healthy_threshold=30, warning_threshold=10 ... )
- anomsmith.workflows.predict_rul_from_survival(model: CoxSurvivalModel, X: ndarray | DataFrame, threshold: float = 0.5, index: Index | None = None) Series[source]
Predict Remaining Useful Life (RUL) from survival model.
Uses median survival time (where survival probability = threshold) as predicted RUL.
- Parameters:
model – Fitted survival model
X – Feature matrix (n_samples, n_features)
threshold – Survival probability threshold for median (default 0.5)
index – Optional row index for the returned Series (defaults to
X.indexfor DataFrame inputs, else apandas.RangeIndex)
- Returns:
Series of predicted RUL values
Examples
>>> rul_predictions = predict_rul_from_survival(survival_model, X_test) >>> health_states = predict_health_states_from_survival( ... survival_model, X_test, healthy_threshold=30, warning_threshold=10 ... )
- anomsmith.workflows.process_s3_batch(s3_keys: list[str], model: BaseScorer | BaseDetector, bucket: str, s3_client=None) DataFrame[source]
Process a batch of S3 files with anomaly detection.
Designed for AWS Lambda or SageMaker batch jobs that process S3 data in batches.
- Parameters:
s3_keys – List of S3 object keys to process
model – Fitted model (BaseScorer or BaseDetector)
bucket – S3 bucket name (required)
s3_client – Optional boto3 S3 client (will create if not provided)
- Returns:
DataFrame with results for all processed files
- Raises:
ImportError – If boto3 not available
ValueError – If model not fitted or bucket not specified
Examples
>>> s3_keys = ["data/2024/01/01/file1.csv", "data/2024/01/01/file2.csv"] >>> results = process_s3_batch(s3_keys, scorer, bucket="my-data-bucket")
- anomsmith.workflows.rank_assets_by_risk(asset_health: DataFrame, top_n: int | None = None) DataFrame[source]
Rank assets by combined risk score.
- Parameters:
asset_health – DataFrame from assess_asset_health()
top_n – Optional number of top assets to return (default None = all)
- Returns:
DataFrame ranked by combined_risk (highest first)
- anomsmith.workflows.report_detection(y: Series | ndarray | SeriesLike, detector: BaseDetector | BaseScorer, threshold_rule: ThresholdRule) dict[str, Any][source]
Generate detection report with summary stats.
- Parameters:
y – Time series that was analyzed
detector – BaseDetector or BaseScorer instance used
threshold_rule – ThresholdRule applied
- Returns:
Dictionary with summary stats and top anomaly timestamps
- anomsmith.workflows.score_anomalies(y: Series | ndarray | SeriesLike, scorer: BaseScorer) Series[source]
Score anomalies in a time series.
- Parameters:
y – Time series to score
scorer – BaseScorer instance
- Returns:
pandas Series of anomaly scores with same index as y
- anomsmith.workflows.sweep_thresholds(y: Series | ndarray | SeriesLike, scorer: BaseScorer, threshold_values: list[float] | ndarray, labels: Series | ndarray | SeriesLike | None = None) DataFrame[source]
Evaluate multiple threshold values and return metrics.
- Parameters:
y – Time series to score
scorer – BaseScorer instance
threshold_values – List of threshold values to evaluate
labels – Optional ground truth labels
- Returns:
threshold, precision, recall, f1 (metrics are NaN if labels not provided)
- Return type:
pandas DataFrame with columns
- anomsmith.workflows.track_mahalanobis_distance(X: ndarray | DataFrame, detector: PCADetector, index: Index | None = None) Series[source]
Track Mahalanobis distance over time as a single metric.
Computes Mahalanobis distance from the “normal” center in PCA space for each time point. This provides a single metric that can be tracked as a time series to monitor equipment health drift.
Delegates scoring to
PCADetector.score()so Mahalanobis math stays in the primitive layer (single implementation).- Parameters:
X – Feature matrix (n_samples, n_features) with sensor readings
detector – Fitted PCADetector (fitted detector with PCA and mean/covariance computed)
index – Optional index for the resulting Series
- Returns:
pandas Series with Mahalanobis distance values, indexed by time
Examples
>>> detector = PCADetector(n_components=3, score_method='mahalanobis') >>> detector.fit(X_train) # Fit on healthy operation data >>> distances = track_mahalanobis_distance(X_monitor, detector) >>> # Track distance over time to detect drift
Network workflows
Network-shaped anomaly workflows.
Designed to interoperate with organizational communication graphs such as
org_network_analysis (NetworkAnalyzer._to_edge_list): undirected edges
aggregated by sender/receiver pair with integer weights (event counts), and a
fixed member roster so isolated nodes still appear in outputs.
- anomsmith.workflows.network.aggregate_undirected_edges(communications: DataFrame, *, sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', drop_self_loops: bool = True) DataFrame[source]
Aggregate communication rows into undirected weighted edges.
Mirrors the aggregation in
org_network_analysisbusiness logic: each unique unordered pair(min(a,b), max(a,b))gets weight equal to the number of rows (communication events) between those endpoints.- Parameters:
communications – One row per event; must include sender/receiver columns.
sender_col – Column name for the sender endpoint (default
sender_id).receiver_col – Column name for the receiver endpoint (default
receiver_id).drop_self_loops – If True, rows where sender equals receiver are skipped.
- Returns:
DataFrame with columns
u,v,weight(integer counts), sorted byu,v.
- anomsmith.workflows.network.detect_network_edge_anomalies(edge_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]
Flag structurally unusual dyads using isolation forest on edge features.
Expects a frame such as the output of
edge_features_from_edges()(numeric columns only are used by default).- Raises:
ValueError – If fewer than two edges are present.
- anomsmith.workflows.network.detect_network_node_anomalies(node_features: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]
Flag structurally unusual nodes using isolation forest on feature rows.
Fits
IsolationForestDetectoron the numeric feature matrix and thresholds anomaly scores. Typical use: pass the output ofnode_features_from_edges()(options: join extra numeric columns before calling).- Parameters:
node_features – Rows are nodes (index = node id); values are features.
threshold_rule – Rule applied to isolation scores (higher = more anomalous).
feature_cols – Columns to use; default is all numeric columns in the frame.
contamination – Passed to
IsolationForest.n_estimators – Number of trees in the forest.
random_state – Optional RNG seed.
- Returns:
DataFrame with original feature columns plus
scoreandflag(1 = anomaly). Index matchesnode_features.- Raises:
ValueError – If fewer than two rows are present (isolation forest requires a batch to score relative to).
- anomsmith.workflows.network.detect_network_temporal_node_anomalies(touch_counts_by_bin: DataFrame, threshold_rule: ThresholdRule, *, feature_cols: list[str] | None = None, contamination: float = 0.05, n_estimators: int = 200, random_state: int | None = None) DataFrame[source]
Flag nodes whose time-bin activity vectors look unlike the rest.
Rows are nodes (index from
node_touch_counts_by_bin()). Columns should be numeric bin counts (any column names); by default all numeric columns are used as features.- Raises:
ValueError – If fewer than two nodes, no numeric columns, or any bin column contains non-finite values.
- anomsmith.workflows.network.edge_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]
Per-edge (dyad) features derived from aggregated undirected weights.
Rows follow the
u,v,weighttable fromaggregate_undirected_edges(). Combines each edge weight with endpoint strengths fromnode_features_from_edges()to highlight unusually heavy links relative to endpoint activity.Columns:
weight: aggregated event count on the dyad.share_of_endpoint_volume:2 * weight / (deg(u) + deg(v))using endpointweighted_degreevalues (each edge’s weight is included in both degrees).log1p_weight:log1p(weight)for scale-robust modeling.
- Parameters:
edges – Non-empty edge list (typically aggregated counts).
nodes – Full node roster (same semantics as
node_features_from_edges()).u_col – Column names in
edges.v_col – Column names in
edges.weight_col – Column names in
edges.
- Returns:
DataFrame indexed by
MultiIndex(u, v)with numeric feature columns.- Raises:
ValueError – If
edgesis empty.
- anomsmith.workflows.network.node_features_from_edges(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]
Per-node structural features for anomaly scoring.
Uses the same edge table shape produced by
aggregate_undirected_edges()(u,v,weight). Every id innodesappears in the index; nodes with no incident edges get zero strength, zero distinct-neighbor count, and uniform PageRank mass.Feature columns:
weighted_degree: sum of incident edge weights (communication volume).neighbor_count: number of distinct neighbors.pagerank: undirected PageRank (numpy power iteration; no NetworkX).
- Parameters:
edges – Edge list with endpoints and non-negative weights.
nodes – Complete roster of node identifiers (e.g. all team member ids).
u_col – Column names in
edges.v_col – Column names in
edges.weight_col – Column names in
edges.
- Returns:
DataFrame indexed by node id with numeric feature columns.
- anomsmith.workflows.network.node_graph_metrics_networkx(edges: DataFrame, nodes: Index | list[Any] | ndarray, *, u_col: str = 'u', v_col: str = 'v', weight_col: str = 'weight') DataFrame[source]
Graph centrality metrics via NetworkX (optional dependency).
Installs with the
networkextra:pip install 'anomsmith[network]'.Builds an undirected graph: every id in
nodesis a vertex; edges fromedgescarry combined weights (parallel edges in the table should be pre-aggregated). Centrality matches common org-network dashboards: topology for betweenness and closeness; eigenvector uses edgeweight.Columns:
betweenness_centrality— NetworkXbetweenness_centrality(unweighted hops).closeness_centrality— NetworkXcloseness_centrality.eigenvector_centrality— weighted when convergence succeeds; else zeros.
- Parameters:
edges – Aggregated
u,v,weighttable (may be empty).nodes – Full roster; isolated members still appear with zeros.
u_col – Column names in
edges.v_col – Column names in
edges.weight_col – Column names in
edges.
- Returns:
DataFrame aligned to
nodeswith the three metric columns.- Raises:
ImportError – If NetworkX is not installed.
- anomsmith.workflows.network.node_touch_counts_by_bin(communications: DataFrame, nodes: Index | list[Any] | ndarray, *, timestamp_col: str = 'timestamp', sender_col: str = 'sender_id', receiver_col: str = 'receiver_id', freq: str = '1D', drop_self_loops: bool = True) DataFrame[source]
Count how often each node sends or receives in each time bin.
Each communication row increments both the sender and the receiver for the floored period bucket (pandas offset string, e.g.
"1D","6H").- Parameters:
communications – Must include timestamp and endpoint columns.
nodes – Full roster; bins include only these ids (other endpoints dropped).
timestamp_col – Parseable timestamps (
pd.to_datetime).sender_col – Endpoint identifiers.
receiver_col – Endpoint identifiers.
freq – Bin size passed to
Series.dt.floor.drop_self_loops – If True, rows with sender equal receiver are skipped.
- Returns:
DataFrame with index = node id, columns = bin start (
datetime64), values = integer touch counts. Missing bins are zero; nodes with no events still appear as rows of zeros when listed innodes.
Platform (predictive maintenance)
Predictive maintenance platform layer (features, RUL helpers, alerts, ingestion).
This package consolidates the former standalone Anomaly Detection Toolkit workflow
code into the single anomsmith distribution. All detector primitives (PCA,
isolation forest, scorers, etc.) live under anomsmith.primitives; platform
holds orchestration, dataset utilities, evaluation helpers, and optional matplotlib
visualizations that sit on top of those primitives.
- class anomsmith.platform.Alert(timestamp: datetime, level: AlertLevel, message: str, feature: str, value: float, threshold: float, asset_id: str | None = None, metadata: dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectRepresents a predictive maintenance alert.
- level: AlertLevel
- class anomsmith.platform.AlertLevel(*values)[source]
Bases:
EnumAlert severity levels.
- CRITICAL = 'critical'
- FAILURE = 'failure'
- INFO = 'info'
- WARNING = 'warning'
- class anomsmith.platform.AlertSystem(thresholds: dict[str, dict[str, float]] | None = None, escalation_rules: dict[str, dict[str, Any]] | None = None)[source]
Bases:
objectAlert system for predictive maintenance with escalation rules.
- check_thresholds(features: ndarray | DataFrame | Series, feature_names: list[str] | None = None, timestamp: datetime | None = None, asset_id: str | None = None) list[Alert][source]
Check features against thresholds and generate alerts.
Parameters
- featuresarray-like
Feature values to check. Can be single value, array, or DataFrame.
- feature_nameslist of str, optional
Names of features. Required if features is array.
- timestampdatetime, optional
Timestamp for alerts. Defaults to current time.
- asset_idstr, optional
Asset identifier.
Returns
- alertslist of Alert
List of generated alerts.
- get_recent_alerts(n: int = 10, level: AlertLevel | None = None, asset_id: str | None = None) list[Alert][source]
Get recent alerts.
Parameters
- nint, default=10
Number of recent alerts to return.
- levelAlertLevel, optional
Filter by alert level.
- asset_idstr, optional
Filter by asset ID.
Returns
- alertslist of Alert
Recent alerts matching criteria.
- class anomsmith.platform.DashboardVisualizer(figsize: tuple[int, int] = (15, 10))[source]
Bases:
objectDashboard visualization utilities for predictive maintenance monitoring.
- create_dashboard(results_history: dict[str, list[dict[str, Any]]], sensor_data: dict[str, DataFrame] | None = None, save_path: str | None = None)[source]
Create comprehensive dashboard visualization.
Parameters
- results_historydict
Dictionary mapping asset_id to list of processing results.
- sensor_datadict, optional
Dictionary mapping asset_id to DataFrame with sensor readings.
- save_pathstr, optional
Path to save the dashboard figure.
Returns
- figmatplotlib.figure.Figure
Dashboard figure.
- create_summary_dashboard(results_history: dict[str, list[dict[str, Any]]], save_path: str | None = None)[source]
Create summary dashboard with key metrics.
Parameters
- results_historydict
Dictionary mapping asset_id to list of processing results.
- save_pathstr, optional
Path to save the dashboard figure.
Returns
- figmatplotlib.figure.Figure
Summary dashboard figure.
- class anomsmith.platform.FailureClassifier(n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]
Bases:
objectClassify normal vs. failure states.
- fit(X: ndarray | DataFrame, y: ndarray | Series)[source]
Fit the failure classifier.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
- yarray-like of shape (n_samples,)
Binary labels: 0 for normal, 1 for failure.
- model_: RandomForestClassifier | None
- predict(X: ndarray | DataFrame) ndarray[source]
Predict failure states.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
Returns
- predictionsndarray of shape (n_samples,)
Binary predictions: 0 for normal, 1 for failure.
- predict_proba(X: ndarray | DataFrame) ndarray[source]
Predict failure probabilities.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
Returns
- probabilitiesndarray of shape (n_samples, 2)
Probability of [normal, failure] for each sample.
- scaler_: StandardScaler | None
- class anomsmith.platform.FeatureExtractor(rolling_windows: list[int] | None = None, frequency_features: bool = True, change_detection: bool = True)[source]
Bases:
objectExtract predictive maintenance features from time series data.
- extract(data: ndarray | Series | DataFrame, columns: list[str] | None = None) DataFrame[source]
Extract features from time series data.
Parameters
- dataarray-like
Time series data. Can be 1D array, Series, or DataFrame.
- columnslist of str, optional
Column names if data is a DataFrame. If None, uses ‘value’ for 1D data.
Returns
- featuresDataFrame
Extracted features with named columns.
- class anomsmith.platform.PredictiveMaintenanceSystem(feature_extractor: FeatureExtractor | None = None, rul_estimator: RULEstimator | None = None, failure_classifier: FailureClassifier | None = None, alert_system: AlertSystem | None = None, anomaly_detector: BaseDetector | None = None)[source]
Bases:
objectComplete predictive maintenance system integrating all components.
- process(data: ndarray | Series | DataFrame, timestamp: datetime | None = None, asset_id: str | None = None, return_features: bool = False) dict[str, Any][source]
Process new data and generate predictions/alerts.
Parameters
- dataarray-like
Time series data to process.
- timestampdatetime, optional
Timestamp for the data.
- asset_idstr, optional
Asset identifier.
- return_featuresbool, default=False
Whether to return extracted features.
Returns
- resultsdict
Dictionary containing: - ‘features’: extracted features (if return_features=True) - ‘rul’: predicted RUL - ‘failure_probability’: probability of failure - ‘failure_prediction’: binary failure prediction - ‘anomaly_score’: anomaly score from
anomsmith.primitives.base.BaseDetector.score()- ‘anomaly_prediction’:0(normal) or1(anomaly) fromLabelViewlabels - ‘alerts’: list of alerts
- class anomsmith.platform.RULEstimator(method: str = 'regression', n_estimators: int = 100, max_depth: int | None = None, random_state: int | None = None)[source]
Bases:
objectEstimate Remaining Useful Life (RUL) for assets.
- fit(X: ndarray | DataFrame, y: ndarray | Series, degradation_threshold: float | None = None)[source]
Fit the RUL estimator.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix (e.g., from FeatureExtractor).
- yarray-like of shape (n_samples,)
RUL values (time until failure) or degradation values.
- degradation_thresholdfloat, optional
Threshold for degradation-based method. If provided, converts degradation values to RUL.
- model_: RandomForestRegressor | None
- predict(X: ndarray | DataFrame) ndarray[source]
Predict RUL for new data.
Parameters
- Xarray-like of shape (n_samples, n_features)
Feature matrix.
Returns
- rulndarray of shape (n_samples,)
Predicted RUL values.
- scaler_: StandardScaler | None
- class anomsmith.platform.RealTimeIngestion(pm_system: PredictiveMaintenanceSystem, window_size: int = 100, update_frequency: int | None = None)[source]
Bases:
objectReal-time data ingestion system for predictive maintenance.
- get_latest_results(asset_id: str, n: int = 1) list[dict[str, Any]][source]
Get latest processing results for an asset.
Parameters
- asset_idstr
Asset identifier.
- nint, default=1
Number of latest results to return.
Returns
- resultslist of dict
Latest results.
- ingest(data: float | ndarray | Series, asset_id: str, timestamp: datetime | None = None, sensor_name: str | None = None) dict[str, Any][source]
Ingest new data point and process if window is full.
Parameters
- datafloat, array-like, or Series
New sensor reading(s).
- asset_idstr
Asset identifier.
- timestampdatetime, optional
Timestamp for the data. Defaults to current time.
- sensor_namestr, optional
Name of sensor/feature. Required if data is scalar.
Returns
- resultsdict
Processing results if window is processed, else None.
- anomsmith.platform.add_degradation_rates(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', periods: list[int] | None = None) DataFrame[source]
Add degradation rate features (rate of change) for feature columns.
Parameters
- dfDataFrame
DataFrame with asset and feature columns.
- feature_colslist of str
Feature column names to compute degradation rates for.
- asset_id_colstr, default=’asset_id’
Column name for asset identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step.
- periodslist of int, optional
Periods for rate of change calculation. Default: [1, 3, 5].
Returns
- dfDataFrame
DataFrame with added degradation rate columns.
- anomsmith.platform.add_rolling_statistics(df: DataFrame, feature_cols: list[str], asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', window: int = 5, stats: list[str] | None = None) DataFrame[source]
Add rolling window statistics for feature columns, grouped by asset.
Parameters
- dfDataFrame
DataFrame with asset and feature columns.
- feature_colslist of str
Feature column names to compute rolling statistics for.
- asset_id_colstr, default=’asset_id’
Column name for asset identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step (used for sorting).
- windowint, default=5
Rolling window size.
- statslist of str, optional
Statistics to compute. Default: [‘mean’, ‘std’, ‘min’, ‘max’].
Returns
- dfDataFrame
DataFrame with added rolling statistic columns.
- anomsmith.platform.calculate_confusion_matrix_metrics(predictions: ndarray, y_true: ndarray) dict[str, int][source]
Confusion matrix counts with
1= predicted / true anomaly.
- anomsmith.platform.calculate_lead_time(predictions: ndarray, true_labels: ndarray, timestamps: ndarray | None = None) dict[str, float | int][source]
Lead time between anomaly detections and failure events.
- Parameters:
predictions – Detector labels (
1= anomaly,0= normal).true_labels – Ground truth (
1= anomaly,0= normal).timestamps – Optional timestamps aligned to predictions.
- Returns:
Dictionary with mean/median/min/max lead time and early/late detection counts.
- anomsmith.platform.calculate_rul(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', failure_cycle_col: str | None = None) Series[source]
Calculate Remaining Useful Life (RUL) for each record.
RUL is calculated as: max_cycle - current_cycle for each asset.
Parameters
- dfDataFrame
DataFrame with asset_id and cycle columns.
- asset_id_colstr, default=’asset_id’
Column name for asset/equipment identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step.
- failure_cycle_colstr, optional
Column name for failure cycle. If provided, uses this instead of max cycle.
Returns
- rulSeries
Remaining Useful Life for each record.
- anomsmith.platform.compare_detectors(detectors: dict[str, BaseDetector], X: ndarray | DataFrame, y_true: ndarray, timestamps: ndarray | None = None) DataFrame[source]
Compare multiple fitted detectors side-by-side.
- anomsmith.platform.create_rul_labels(df: DataFrame, rul_col: str = 'RUL', warning_threshold: int = 30, critical_threshold: int = 15) DataFrame[source]
Create health status labels based on RUL values.
Parameters
- dfDataFrame
DataFrame with RUL column.
- rul_colstr, default=’RUL’
Column name for RUL values.
- warning_thresholdint, default=30
RUL threshold for warning state.
- critical_thresholdint, default=15
RUL threshold for critical state.
Returns
- dfDataFrame
DataFrame with added columns: - health_status: ‘healthy’, ‘warning’, ‘critical’, ‘failed’ - binary_label: 0 (healthy) or 1 (failure/warning/critical) - multi_class_label: 0 (healthy), 1 (warning), 2 (critical), 3 (failed)
- anomsmith.platform.evaluate_detector(detector: BaseDetector, X: ndarray | DataFrame, y_true: ndarray, scores: ndarray | None = None, timestamps: ndarray | None = None) dict[str, float | int][source]
Evaluate a fitted anomsmith detector on tabular test data.
- anomsmith.platform.plot_comparison_metrics(comparison_df: DataFrame, metrics: list[str] | None = None, save_path: str | None = None)[source]
Create comparison chart for multiple detectors.
Parameters
- comparison_dfDataFrame
DataFrame from compare_detectors().
- metricslist of str, optional
Metrics to plot. Default: [‘precision’, ‘recall’, ‘f1’].
- save_pathstr, optional
Path to save the figure.
- anomsmith.platform.plot_pca_boundary(detector: PCADetector, X: ndarray | DataFrame, y_true: ndarray | None = None, n_components_plot: int = 2, save_path: str | None = None)[source]
Visualize PCA boundary in 2D projection (anomsmith
PCADetector).
- anomsmith.platform.plot_reconstruction_error(detector, X: ndarray | DataFrame, y_true: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]
Plot reconstruction error over time for LSTM or PCA detector.
Parameters
- detectorBaseDetector
Fitted detector (PCA or LSTM).
- Xarray-like
Data to plot.
- y_truendarray, optional
True labels for marking actual anomalies.
- timestampsndarray, optional
Timestamps for x-axis.
- save_pathstr, optional
Path to save the figure.
- anomsmith.platform.plot_sensor_drift(sensor_data: ndarray | Series, predictions: ndarray | None = None, timestamps: ndarray | None = None, save_path: str | None = None)[source]
Visualize sensor drift with anomaly flags.
Parameters
- sensor_dataarray-like
Sensor readings over time.
- predictionsndarray, optional
Anomaly predictions (
1for anomaly,0for normal).- timestampsndarray, optional
Timestamps for x-axis.
- save_pathstr, optional
Path to save the figure.
- anomsmith.platform.prepare_pm_features(df: DataFrame, asset_id_col: str = 'asset_id', cycle_col: str = 'cycle', feature_cols: list[str] | None = None, calculate_rul_flag: bool = True, add_labels: bool = True, add_rolling_stats: bool = True, include_degradation_rates: bool = False, rolling_window: int = 5, warning_threshold: int = 30, critical_threshold: int = 15, failure_cycle_col: str | None = None) DataFrame[source]
Prepare predictive maintenance features from raw sensor data.
This is a convenience function that combines: - RUL calculation - Health status labeling - Rolling statistics - Degradation rates
Parameters
- dfDataFrame
Input DataFrame with asset_id, cycle, and sensor/feature columns.
- asset_id_colstr, default=’asset_id’
Column name for asset identifier.
- cycle_colstr, default=’cycle’
Column name for cycle/time step.
- feature_colslist of str, optional
Feature column names. If None, auto-detects (excludes asset_id, cycle, RUL, etc.).
- calculate_rul_flagbool, default=True
Whether to calculate RUL.
- add_labelsbool, default=True
Whether to add health status labels.
- add_rolling_statsbool, default=True
Whether to add rolling statistics.
- include_degradation_ratesbool, default=False
Whether to add degradation rate features.
- rolling_windowint, default=5
Window size for rolling statistics.
- warning_thresholdint, default=30
RUL threshold for warning state.
- critical_thresholdint, default=15
RUL threshold for critical state.
- failure_cycle_colstr, optional
Column name for failure cycle (if available).
Returns
- dfDataFrame
DataFrame with all engineered features.