Source code for anomsmith.workflows.pm

"""Predictive maintenance workflows.

Workflows for health state prediction, discretization, and policy evaluation.
"""

import logging
from typing import TYPE_CHECKING, Union

import numpy as np
import pandas as pd

from anomsmith.constants import (
    DEFAULT_POLICY_BASE_RISKS,
    DEFAULT_POLICY_INTERVENE_COST,
    DEFAULT_POLICY_INTERVENE_RISK_REDUCTION,
    DEFAULT_POLICY_REVIEW_COST,
    DEFAULT_POLICY_REVIEW_RISK_REDUCTION,
    DEFAULT_POLICY_WAIT_COST,
    DEFAULT_RUL_HEALTHY_THRESHOLD,
    DEFAULT_RUL_WARNING_THRESHOLD,
)
from anomsmith.objects.health_state import HealthStateView
from anomsmith.primitives.health_state.discretize import discretize_rul_to_health_states
from anomsmith.primitives.policy.simple import SimpleHealthPolicy

if TYPE_CHECKING:
    try:
        from timesmith.typing import SeriesLike
    except ImportError:
        SeriesLike = None

logger = logging.getLogger(__name__)


def _coerce_health_state_view(
    health_states: pd.Series | np.ndarray | HealthStateView,
) -> HealthStateView:
    if isinstance(health_states, HealthStateView):
        return health_states
    if isinstance(health_states, pd.Series):
        return HealthStateView(index=health_states.index, states=health_states.values)
    index = pd.RangeIndex(start=0, stop=len(health_states))
    return HealthStateView(index=index, states=np.asarray(health_states))


[docs] def discretize_rul( rul: Union[pd.Series, np.ndarray, "SeriesLike"], healthy_threshold: float = DEFAULT_RUL_HEALTHY_THRESHOLD, warning_threshold: float = DEFAULT_RUL_WARNING_THRESHOLD, ) -> pd.Series: """Discretize RUL values into health states. Maps RUL values to health states: - RUL > healthy_threshold: Healthy (0) - warning_threshold < RUL <= healthy_threshold: Warning (1) - RUL <= warning_threshold: Distress (2) Args: rul: Remaining Useful Life values healthy_threshold: RUL threshold for Healthy state (default 30) warning_threshold: RUL threshold for Warning state (default 10) Returns: pandas Series with health states aligned to input index Examples: >>> import pandas as pd >>> import numpy as np >>> rul = pd.Series([50, 25, 5, 0]) >>> states = discretize_rul(rul, healthy_threshold=30, warning_threshold=10) >>> states.values array([0, 1, 2, 2]) """ health_state_view = discretize_rul_to_health_states( rul, healthy_threshold=healthy_threshold, warning_threshold=warning_threshold ) return health_state_view.to_series()
[docs] def apply_policy( health_states: pd.Series | np.ndarray | HealthStateView, previous_states: pd.Series | np.ndarray | HealthStateView | None = None, intervene_cost: float = DEFAULT_POLICY_INTERVENE_COST, review_cost: float = DEFAULT_POLICY_REVIEW_COST, wait_cost: float = DEFAULT_POLICY_WAIT_COST, base_risks: tuple[float, float, float] = DEFAULT_POLICY_BASE_RISKS, intervene_risk_reduction: float = DEFAULT_POLICY_INTERVENE_RISK_REDUCTION, review_risk_reduction: float = DEFAULT_POLICY_REVIEW_RISK_REDUCTION, ) -> pd.DataFrame: """Apply decision policy to health states. Args: health_states: Current health states (0=Healthy, 1=Warning, 2=Distress) previous_states: Previous health states for transition detection (optional) intervene_cost: Cost of intervention action (default 100) review_cost: Cost of review action (default 30) wait_cost: Cost of wait action (default 0) base_risks: Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3]) intervene_risk_reduction: Risk reduction factor for intervention (default 0.5) review_risk_reduction: Risk reduction factor for review (default 0.75) Returns: pandas DataFrame with health_states, actions, costs, and risks Examples: >>> import pandas as pd >>> states = pd.Series([0, 0, 1, 2, 2]) >>> result = apply_policy(states) >>> result['action'].values array([0, 0, 1, 2, 2]) """ health_state_view = _coerce_health_state_view(health_states) previous_state_view: HealthStateView | None = None if previous_states is not None: previous_state_view = _coerce_health_state_view(previous_states) # Apply policy policy = SimpleHealthPolicy( intervene_cost=intervene_cost, review_cost=review_cost, wait_cost=wait_cost, base_risks=base_risks, intervene_risk_reduction=intervene_risk_reduction, review_risk_reduction=review_risk_reduction, ) result = policy.apply(health_state_view, previous_state_view) return result.to_dataframe()
[docs] def evaluate_policy( health_states: pd.Series | np.ndarray | HealthStateView, previous_states: pd.Series | np.ndarray | HealthStateView | None = None, intervene_cost: float = DEFAULT_POLICY_INTERVENE_COST, review_cost: float = DEFAULT_POLICY_REVIEW_COST, wait_cost: float = DEFAULT_POLICY_WAIT_COST, base_risks: tuple[float, float, float] = DEFAULT_POLICY_BASE_RISKS, intervene_risk_reduction: float = DEFAULT_POLICY_INTERVENE_RISK_REDUCTION, review_risk_reduction: float = DEFAULT_POLICY_REVIEW_RISK_REDUCTION, ) -> dict[str, float]: """Evaluate policy performance metrics. Args: health_states: Current health states (0=Healthy, 1=Warning, 2=Distress) previous_states: Previous health states for transition detection (optional) intervene_cost: Cost of intervention action (default 100) review_cost: Cost of review action (default 30) wait_cost: Cost of wait action (default 0) base_risks: Base failure risks by state [healthy, warning, distress] (default [0.01, 0.1, 0.3]) intervene_risk_reduction: Risk reduction factor for intervention (default 0.5) review_risk_reduction: Risk reduction factor for review (default 0.75) Returns: Dictionary with total_cost, total_risk, interventions, reviews, waits Examples: >>> import pandas as pd >>> states = pd.Series([0, 0, 1, 2, 2]) >>> metrics = evaluate_policy(states) >>> metrics['total_cost'] 230.0 """ health_state_view = _coerce_health_state_view(health_states) previous_state_view: HealthStateView | None = None if previous_states is not None: previous_state_view = _coerce_health_state_view(previous_states) # Evaluate policy policy = SimpleHealthPolicy( intervene_cost=intervene_cost, review_cost=review_cost, wait_cost=wait_cost, base_risks=base_risks, intervene_risk_reduction=intervene_risk_reduction, review_risk_reduction=review_risk_reduction, ) return policy.evaluate(health_state_view, previous_state_view)