"""PCA-based predictive maintenance workflows.
Uses Principal Component Analysis and Mahalanobis distance to track
equipment health and classify health states (healthy, warning, critical).
"""
import logging
from typing import TYPE_CHECKING, Union
import numpy as np
import pandas as pd
from anomsmith.constants import (
DEFAULT_PCA_HEALTHY_DISTANCE_PERCENTILE,
DEFAULT_PCA_WARNING_DISTANCE_PERCENTILE,
)
from anomsmith.objects.health_state import HealthState, HealthStateView
from anomsmith.primitives.detectors.pca import PCADetector
if TYPE_CHECKING:
try:
from timesmith.typing import SeriesLike
except ImportError:
SeriesLike = None
logger = logging.getLogger(__name__)
[docs]
def track_mahalanobis_distance(
X: np.ndarray | pd.DataFrame,
detector: PCADetector,
index: pd.Index | None = None,
) -> pd.Series:
"""Track Mahalanobis distance over time as a single metric.
Computes Mahalanobis distance from the "normal" center in PCA space
for each time point. This provides a single metric that can be tracked
as a time series to monitor equipment health drift.
Delegates scoring to :meth:`PCADetector.score` so Mahalanobis math stays
in the primitive layer (single implementation).
Args:
X: Feature matrix (n_samples, n_features) with sensor readings
detector: Fitted PCADetector (fitted detector with PCA and mean/covariance computed)
index: Optional index for the resulting Series
Returns:
pandas Series with Mahalanobis distance values, indexed by time
Examples:
>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train) # Fit on healthy operation data
>>> distances = track_mahalanobis_distance(X_monitor, detector)
>>> # Track distance over time to detect drift
"""
if not detector._fitted:
raise ValueError(
"PCADetector must be fitted before tracking Mahalanobis distance."
)
if detector.score_method != "mahalanobis":
raise ValueError(
"track_mahalanobis_distance requires PCADetector(score_method='mahalanobis'). "
f"Got score_method={detector.score_method!r}."
)
if isinstance(X, pd.DataFrame):
out_index = X.index
X_arr = X.values
else:
X_arr = np.asarray(X)
out_index = (
index if index is not None else pd.RangeIndex(start=0, stop=len(X_arr))
)
score_view = detector.score(X_arr)
distances = score_view.scores
logger.debug(
f"Computed Mahalanobis distances: mean={np.mean(distances):.3f}, "
f"max={np.max(distances):.3f}, min={np.min(distances):.3f}"
)
return pd.Series(distances, index=out_index, name="mahalanobis_distance")
[docs]
def classify_health_from_distance(
distances: Union[pd.Series, np.ndarray, "SeriesLike"],
healthy_threshold: float,
warning_threshold: float,
index: pd.Index | None = None,
) -> HealthStateView:
"""Classify health states from Mahalanobis distance thresholds.
Maps Mahalanobis distance values to health states:
- distance <= healthy_threshold: Healthy (0)
- healthy_threshold < distance <= warning_threshold: Warning (1)
- distance > warning_threshold: Critical/Distress (2)
This creates probabilistic zones of "normality" based on distance
from the healthy center, minimizing false positives by having a
wide decision space for normal operation.
Args:
distances: Mahalanobis distance values (n_samples,)
healthy_threshold: Distance threshold for Healthy state
warning_threshold: Distance threshold for Warning state (must be > healthy_threshold)
index: Optional index for the health states
Returns:
HealthStateView with classified health states
Examples:
>>> distances = track_mahalanobis_distance(X_monitor, detector)
>>> # Set thresholds based on training data (e.g., percentiles)
>>> healthy_threshold = np.percentile(distances, 75)
>>> warning_threshold = np.percentile(distances, 95)
>>> health_states = classify_health_from_distance(
... distances, healthy_threshold, warning_threshold
... )
"""
if warning_threshold <= healthy_threshold:
raise ValueError(
f"warning_threshold ({warning_threshold}) must be greater than "
f"healthy_threshold ({healthy_threshold})"
)
if isinstance(distances, pd.Series):
index = distances.index
dist_values = distances.values
else:
dist_values = np.asarray(distances)
if index is None:
index = pd.RangeIndex(start=0, stop=len(dist_values))
# Classify health states based on distance thresholds - vectorized
states: np.ndarray = np.zeros(len(dist_values), dtype=int)
states[(dist_values > healthy_threshold) & (dist_values <= warning_threshold)] = (
HealthState.WARNING.value
)
states[dist_values > warning_threshold] = HealthState.DISTRESS.value
logger.debug(
f"Classified health states from distances: "
f"Healthy={(states == HealthState.HEALTHY.value).sum()}, "
f"Warning={(states == HealthState.WARNING.value).sum()}, "
f"Distress={(states == HealthState.DISTRESS.value).sum()}"
)
return HealthStateView(index=index, states=states)
[docs]
def assess_health_with_pca(
X: np.ndarray | pd.DataFrame,
detector: PCADetector,
healthy_threshold: float,
warning_threshold: float,
index: pd.Index | None = None,
) -> pd.DataFrame:
"""Assess equipment health using PCA and Mahalanobis distance.
Complete workflow for PCA-based predictive maintenance:
1. Compute Mahalanobis distance from healthy center
2. Classify health states based on distance thresholds
3. Return results as a DataFrame for easy tracking
Args:
X: Feature matrix (n_samples, n_features) with sensor readings
detector: Fitted PCADetector (must use score_method='mahalanobis')
healthy_threshold: Distance threshold for Healthy state
warning_threshold: Distance threshold for Warning state
index: Optional index for the results
Returns:
DataFrame with columns: 'mahalanobis_distance', 'health_state'
Examples:
>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train) # Fit on healthy operation data
>>> # Set thresholds based on training data
>>> healthy_threshold = np.percentile(detector.score(X_train).scores, 75)
>>> warning_threshold = np.percentile(detector.score(X_train).scores, 95)
>>> health_df = assess_health_with_pca(
... X_monitor, detector, healthy_threshold, warning_threshold
... )
>>> # Track health over time
>>> critical_units = health_df[health_df['health_state'] == 2]
"""
# Track Mahalanobis distance
distances = track_mahalanobis_distance(X, detector, index=index)
# Classify health states
health_states = classify_health_from_distance(
distances, healthy_threshold, warning_threshold, index=distances.index
)
# Combine into DataFrame
result_df = pd.DataFrame(
{
"mahalanobis_distance": distances.values,
"health_state": health_states.states,
},
index=distances.index,
)
logger.info(
f"Assessed health for {len(result_df)} samples: "
f"Healthy={(result_df['health_state'] == HealthState.HEALTHY.value).sum()}, "
f"Warning={(result_df['health_state'] == HealthState.WARNING.value).sum()}, "
f"Critical={(result_df['health_state'] == HealthState.DISTRESS.value).sum()}"
)
return result_df
[docs]
def compute_pca_health_thresholds(
X_train: np.ndarray | pd.DataFrame,
detector: PCADetector,
healthy_percentile: float = DEFAULT_PCA_HEALTHY_DISTANCE_PERCENTILE,
warning_percentile: float = DEFAULT_PCA_WARNING_DISTANCE_PERCENTILE,
) -> tuple[float, float]:
"""Compute health state thresholds from training data.
Determines distance thresholds for health state classification based on
percentiles of Mahalanobis distances in the training (healthy) data.
Args:
X_train: Training data (should be healthy operation data)
detector: Fitted PCADetector (must use score_method='mahalanobis')
healthy_percentile: Percentile for healthy threshold (default 75.0)
warning_percentile: Percentile for warning threshold (default 95.0)
Returns:
Tuple of (healthy_threshold, warning_threshold)
Examples:
>>> detector = PCADetector(n_components=3, score_method='mahalanobis')
>>> detector.fit(X_train) # Fit on healthy operation data
>>> healthy_threshold, warning_threshold = compute_pca_health_thresholds(
... X_train, detector, healthy_percentile=75, warning_percentile=95
... )
"""
if healthy_percentile >= warning_percentile:
raise ValueError(
f"healthy_percentile ({healthy_percentile}) must be less than "
f"warning_percentile ({warning_percentile})"
)
# Compute distances on training data
distances = track_mahalanobis_distance(X_train, detector)
# Compute percentiles
healthy_threshold = np.percentile(distances, healthy_percentile)
warning_threshold = np.percentile(distances, warning_percentile)
logger.info(
f"Computed health thresholds: healthy={healthy_threshold:.3f} "
f"(percentile {healthy_percentile}), warning={warning_threshold:.3f} "
f"(percentile {warning_percentile})"
)
return float(healthy_threshold), float(warning_threshold)