Source code for anomsmith.workflows.asset_health

"""Asset health workflows for grid assets and industrial equipment.

Combines classification and anomaly detection to assess asset health and
prioritize maintenance actions.
"""

import logging
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from anomsmith.constants import (
    DEFAULT_ASSET_HEALTH_ANOMALY_WEIGHT,
    DEFAULT_ASSET_HEALTH_CLASSIFICATION_WEIGHT,
    DEFAULT_FAILURE_PROBA_DISTRESS_THRESHOLD,
    DEFAULT_FAILURE_PROBA_WARNING_THRESHOLD,
    DEFAULT_ISOLATION_FOREST_N_ESTIMATORS,
    DEFAULT_OUTLIER_CONTAMINATION,
    DEFAULT_RANDOM_FOREST_N_ESTIMATORS,
    FUSION_WEIGHT_SUM_ABSOLUTE_TOLERANCE,
)
from anomsmith.primitives.classifiers.failure_risk import FailureRiskClassifier
from anomsmith.primitives.detectors.ml import IsolationForestDetector

if TYPE_CHECKING:
    try:
        from timesmith.typing import SeriesLike
    except ImportError:
        SeriesLike = None

logger = logging.getLogger(__name__)


def _validate_fusion_weights(
    classification_weight: float, anomaly_weight: float
) -> None:
    if classification_weight < 0 or anomaly_weight < 0:
        raise ValueError(
            "classification_weight and anomaly_weight must be non-negative, "
            f"got {classification_weight=}, {anomaly_weight=}"
        )
    s = classification_weight + anomaly_weight
    if abs(s - 1.0) > FUSION_WEIGHT_SUM_ABSOLUTE_TOLERANCE:
        raise ValueError(
            f"classification_weight + anomaly_weight must sum to 1.0, got sum={s}"
        )


[docs] def assess_asset_health( sensor_data: pd.DataFrame, asset_ids: pd.Series | None = None, feature_cols: list[str] | None = None, failure_labels: pd.Series | np.ndarray | None = None, use_classification: bool = True, use_anomaly_detection: bool = True, contamination: float = DEFAULT_OUTLIER_CONTAMINATION, n_estimators: int = DEFAULT_RANDOM_FOREST_N_ESTIMATORS, isolation_n_estimators: int = DEFAULT_ISOLATION_FOREST_N_ESTIMATORS, random_state: int | None = None, *, risk_proba_warning_threshold: float = DEFAULT_FAILURE_PROBA_WARNING_THRESHOLD, risk_proba_distress_threshold: float = DEFAULT_FAILURE_PROBA_DISTRESS_THRESHOLD, classification_weight: float = DEFAULT_ASSET_HEALTH_CLASSIFICATION_WEIGHT, anomaly_weight: float = DEFAULT_ASSET_HEALTH_ANOMALY_WEIGHT, ) -> pd.DataFrame: """Assess asset health using classification and anomaly detection. Combines failure risk classification with anomaly detection to provide comprehensive asset health assessment. Results can be used to prioritize maintenance actions. Args: sensor_data: DataFrame with sensor readings (columns are features, rows are assets) asset_ids: Optional Series of asset IDs (defaults to sensor_data index) feature_cols: Optional list of feature column names (defaults to all numeric columns) failure_labels: Optional binary labels for training classifier (1 = failure, 0 = healthy) use_classification: Whether to use failure risk classification (default True) use_anomaly_detection: Whether to use anomaly detection (default True) contamination: Expected proportion of anomalies (see ``DEFAULT_OUTLIER_CONTAMINATION``) n_estimators: Number of trees for Random Forest (see ``DEFAULT_RANDOM_FOREST_N_ESTIMATORS``) isolation_n_estimators: Number of trees for Isolation Forest when anomaly detection is on (see ``DEFAULT_ISOLATION_FOREST_N_ESTIMATORS``). random_state: Random state for reproducibility risk_proba_warning_threshold: Min failure probability for **warning** health state when using classification (default from ``anomsmith.constants``). risk_proba_distress_threshold: Min failure probability for **distress** state (must exceed warning threshold; enforced by ``FailureRiskClassifier``). classification_weight: Weight on normalized classification risk in ``combined_risk`` when both classification and anomaly detection run (must sum to 1 with ``anomaly_weight``). anomaly_weight: Weight on normalized anomaly score in ``combined_risk``. Returns: DataFrame with columns: - asset_id: Asset identifier - failure_risk: Probability of failure (if classification used) - health_state: Predicted health state (0=Healthy, 1=Warning, 2=Distress) - is_anomaly: Binary anomaly flag (if anomaly detection used) - anomaly_score: Anomaly score (if anomaly detection used) - combined_risk: Combined risk score (higher = more urgent) Examples: >>> import pandas as pd >>> import numpy as np >>> sensor_data = pd.DataFrame({ ... 'temperature': [60, 65, 70, 80], ... 'vibration': [0.2, 0.25, 0.3, 0.4], ... 'pressure': [25, 24, 23, 20] ... }) >>> result = assess_asset_health(sensor_data) >>> result.head() """ _validate_fusion_weights(classification_weight, anomaly_weight) if risk_proba_warning_threshold >= risk_proba_distress_threshold: raise ValueError( "risk_proba_warning_threshold must be < risk_proba_distress_threshold, " f"got {risk_proba_warning_threshold=}, {risk_proba_distress_threshold=}" ) if feature_cols is None: # Use all numeric columns feature_cols = sensor_data.select_dtypes(include=[np.number]).columns.tolist() if asset_ids is None: asset_ids = sensor_data.index results = {"asset_id": asset_ids} X = sensor_data[feature_cols] # Classification-based failure risk prediction failure_risks = np.zeros(len(X)) health_states: np.ndarray = np.zeros(len(X), dtype=int) if use_classification and failure_labels is not None: classifier = FailureRiskClassifier( n_estimators=n_estimators, random_state=random_state ) classifier.fit(X, failure_labels) probas = classifier.predict_proba(X) if probas.shape[1] > 1: failure_risks = probas[:, 1] # Probability of failure class else: failure_risks = probas[:, 0] health_state_view = classifier.predict_health_states( X, index=pd.RangeIndex(len(X)), risk_threshold=risk_proba_warning_threshold, distress_threshold=risk_proba_distress_threshold, ) health_states = health_state_view.states elif use_classification: logger.warning( "Classification requested but no failure_labels provided. " "Skipping classification-based assessment." ) use_classification = False results["failure_risk"] = failure_risks if use_classification else np.nan results["health_state"] = ( health_states if use_classification else np.zeros(len(X), dtype=int) ) # Anomaly detection anomaly_flags: np.ndarray = np.zeros(len(X), dtype=int) anomaly_scores = np.zeros(len(X)) if use_anomaly_detection: # Use Isolation Forest for multivariate anomaly detection detector = IsolationForestDetector( contamination=contamination, n_estimators=isolation_n_estimators, random_state=random_state, ) # Fit on sensor data (treating each row as a sample) # For Isolation Forest, we need to fit on all data detector.fit(X.values) # Score each asset score_view = detector.score(X.values) anomaly_scores = score_view.scores # Predict anomalies label_view = detector.predict(X.values) anomaly_flags = label_view.labels results["is_anomaly"] = anomaly_flags results["anomaly_score"] = anomaly_scores if use_anomaly_detection else np.nan # Combined risk score (normalize and combine both signals) combined_risks = np.zeros(len(X)) if use_classification and use_anomaly_detection: # Normalize failure risk to [0, 1] if failure_risks.max() > failure_risks.min(): norm_failure_risk = (failure_risks - failure_risks.min()) / ( failure_risks.max() - failure_risks.min() ) else: norm_failure_risk = failure_risks # Normalize anomaly scores to [0, 1] if anomaly_scores.max() > anomaly_scores.min(): norm_anomaly_score = (anomaly_scores - anomaly_scores.min()) / ( anomaly_scores.max() - anomaly_scores.min() ) else: norm_anomaly_score = anomaly_scores combined_risks = ( classification_weight * norm_failure_risk + anomaly_weight * norm_anomaly_score ) elif use_classification: # Normalize failure risk if failure_risks.max() > failure_risks.min(): combined_risks = (failure_risks - failure_risks.min()) / ( failure_risks.max() - failure_risks.min() ) else: combined_risks = failure_risks elif use_anomaly_detection: # Normalize anomaly scores if anomaly_scores.max() > anomaly_scores.min(): combined_risks = (anomaly_scores - anomaly_scores.min()) / ( anomaly_scores.max() - anomaly_scores.min() ) else: combined_risks = anomaly_scores results["combined_risk"] = combined_risks # Create DataFrame with results result_df = pd.DataFrame(results) # Sort by combined risk (highest first) for prioritization result_df = result_df.sort_values("combined_risk", ascending=False).reset_index( drop=True ) return result_df
[docs] def rank_assets_by_risk( asset_health: pd.DataFrame, top_n: int | None = None, ) -> pd.DataFrame: """Rank assets by combined risk score. Args: asset_health: DataFrame from assess_asset_health() top_n: Optional number of top assets to return (default None = all) Returns: DataFrame ranked by combined_risk (highest first) """ ranked = asset_health.sort_values("combined_risk", ascending=False) if top_n is not None: ranked = ranked.head(top_n) return ranked