Skip to content

Predictive Model

Overview

FeatureCalcPredictiveModel applies a pre-trained machine learning model (serialized as a pickle file and stored in performance_db) to predict a feature's value from a set of input features. The typical use case is a Normal Behavior Model (NBM) that predicts a sensor reading (e.g., a bearing temperature) under normal conditions, enabling anomaly detection by comparing the prediction against actual measurements.

The model must have been trained using the PredictiveModel abstract class from echo-calcmodels. The feature inputs are determined automatically from the model's model_arguments.reference_features attribute — no manual configuration of inputs is needed.


Calculation Logic

Initialization

At instantiation:

  1. Reads feature_options_json to find the calc model type and target feature name.
  2. Queries performance_db for a calc model matching both conditions simultaneously:
    • model_type matches calc_model_type exactly
    • model name contains target_feature as a substring
  3. Loads and deserializes the matched model from the database.
  4. Reads model_arguments.reference_features from the loaded model to know which SCADA features to fetch.

Per-Period Computation

Inside _compute():

  1. Adjust period for lagged inputs: If the model uses lagged timestamps (model_arguments.lagged_timestamps = N), the fetch period is extended back by N × 10 minutes to provide the model with the historical context it needs.

  2. Fetch features: Fetches all reference_features for the object and reindexes to a uniform 10-minute grid.

  3. Prepare model input: Converts the Polars DataFrame to the pandas MultiIndex format expected by the PredictiveModel.predict() interface. Rows with any null input feature are dropped (the model cannot predict from incomplete data).

  4. Predict: Calls self._model.predict(df) on the non-null rows.

  5. Multiple outputs: If multiple_features = true in feature_options_json, all columns in model_arguments.target_features are returned as separate feature columns. Otherwise only the first target feature is used.

  6. Trim result: The result is filtered back to the original requested period (removing the lagged extension used for fetching).

  7. Drop all-null rows: Timestamps where prediction could not be made (all inputs were null) are dropped from the output.


Database Requirements

Feature Attribute

Attribute Value
server_calc_type predictive_model
feature_options_json JSON object — see below

feature_options_json Schema

Key Type Required Description
calc_model_type string Yes Exact type of the calculation model (e.g., "normal_behavior_model"). Used as a regex anchor ^...$.
target_feature string Yes Substring of the model name in performance_db. The calculator finds the first model whose name contains this string.
multiple_features boolean No If true, all target_features from the model are returned as separate columns. Defaults to false.

Example:

JSON
{
    "calc_model_type": "normal_behavior_model",
    "target_feature": "NbmGearBearTemp_10min.AVG"
}

Note

The convention for model names in the database is <prefix>!<target_feature> (e.g., temperature_nbm_G97-2.07!NbmGearBearTemp_10min.AVG). The target_feature string in the JSON is matched as a substring of this name, so the ! prefix makes it unambiguous.

Calculation Model

Requirement Description
Model type Must match calc_model_type exactly
Model name Must contain target_feature as a substring
Model class Must be a subclass of PredictiveModel from echo-calcmodels
Reference features Defined in model_arguments.reference_features — these SCADA features must exist in performance_db for the object

Class Definition

FeatureCalcPredictiveModel(object_name, feature)

Class used to calculate features that depend on a PredictiveModel.

For this class to work, the feature must have the attribute feature_options_json with the following keys:

  • calc_model_type: Type of the calculation model that will be used to calculate the feature.
  • target_feature: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.
  • multiple_features: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.
    • If not present or False, only the first feature returned will be used (their first in the model'starget_features attribute).
    • If it is present, it must be a boolean value. If True, all features returned, considering it's names present in the model's target_features attribute, will be returned. Keep in mind that in this case the names used will come from the model's target_features attribute, so it must match the names of the features in performance_db.

Keep in mind that calc_model_type and target_feature will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the calc_model_type must be the exact type of the model, and the target_feature must be a substring of the model name, indicating that the model was trained to predict that feature.

One example is for the feature NbmGearBearTemp_10min.AVG of G97 turbines, which has the following feature_options_json: {"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}. In this case, the model name in the database is temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG, which matches the target_feature substring and has the calc_model_type as normal_behavior_model.

Usually the name of the calculation models in the database should follow the pattern of '!', where the desired prefix is recommended to be the type of the model or the calculation model class used to calculate the feature.

The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.

Parameters:

  • object_name

    (str) –

    Name of the object for which the feature is calculated. It must exist in performance_db.

  • feature

    (str) –

    Feature of the object that is calculated. It must exist in performance_db.

Source code in echo_energycalc/feature_calc_predictive_model.py
Python
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    Class used to calculate features that depend on a PredictiveModel.

    For this class to work, the feature must have the attribute `feature_options_json` with the following keys:

    - `calc_model_type`: Type of the calculation model that will be used to calculate the feature.
    - `target_feature`: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.
    - `multiple_features`: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.
        - If not present or False, only the first feature returned will be used (their first in the model's`target_features` attribute).
        - If it is present, it must be a boolean value. If `True`, all features returned, considering it's names present in the model's `target_features` attribute, will be returned. Keep in mind that in this case the names used will come from the model's `target_features` attribute, so it must match the names of the features in performance_db.

    Keep in mind that `calc_model_type` and `target_feature` will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the `calc_model_type` must be the exact type of the model, and the `target_feature` must be a substring of the model name, indicating that the model was trained to predict that feature.

    One example is for the feature `NbmGearBearTemp_10min.AVG` of G97 turbines, which has the following feature_options_json: `{"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}`. In this case, the model name in the database is `temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG`, which matches the `target_feature` substring and has the `calc_model_type` as `normal_behavior_model`.

    Usually the name of the calculation models in the database should follow the pattern of '<desired prefix>!<target feature>', where the desired prefix is recommended to be the type of the model or the calculation model class used to calculate the feature.

    The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.

    Parameters
    ----------
    object_name : str
        Name of the object for which the feature is calculated. It must exist in performance_db.
    feature : str
        Feature of the object that is calculated. It must exist in performance_db.
    """
    # initialize parent class
    super().__init__(object_name, feature)

    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))

    self._fetch_requirements()

    self._feature_attributes = self._requirement_data("RequiredFeatureAttributes")[self.feature]

    self._validate_feature_options()

    # getting calculation model considering that the model type is "normal_behavior_model" and the name contains the feature name
    self._add_requirement(
        RequiredCalcModels(
            calc_models={
                self.object: [
                    {
                        "model_name": f".*{self._feature_attributes['feature_options_json']['target_feature']}.*",
                        "model_type": f"^{self._feature_attributes['feature_options_json']['calc_model_type']}$",
                    },
                ],
            },
        ),
    )
    self._fetch_requirements()

    # getting the model name
    self._model_name = next(iter(self._requirement_data("RequiredCalcModels")[self.object].keys()))

    # loading calculation model from file
    try:
        self._model: PredictiveModel = self._requirement_data("RequiredCalcModels")[self.object][self._model_name]["model"]
        if not isinstance(self._model, PredictiveModel):
            raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")
        self._model._deserialize_model()  # noqa: SLF001

    except Exception as e:
        raise RuntimeError(f"'{self.object}' failed to load PredictiveModel.") from e

    # checking if model object is an instance of a subclass of PredictiveModel
    if not isinstance(self._model, PredictiveModel):
        raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")

    # defining required features
    self._add_requirement(RequiredFeatures(features={self.object: self._model.model_arguments.reference_features}))

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • DataFrame | None

    Polars DataFrame with a "timestamp" column and one or more feature value columns. None until calculate is called.

calculate(period, save_into=None, cached_data=None, **kwargs)

Run the calculation for the given period and optionally save the result.

Calls :meth:_compute to get the result, stores it in :attr:result, then calls :meth:save. Subclasses should implement :meth:_compute instead of overriding this method.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –
    • "all": save in performance_db and bazefield.
    • "performance_db": save only in performance_db.
    • None: do not save.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    Polars DataFrame with features already fetched/calculated. Passed to _compute to enable chained calculations without re-querying performance_db. By default None.

  • **kwargs

    Forwarded to :meth:save.

Returns:

  • DataFrame

    Polars DataFrame with a "timestamp" column and one or more feature value columns.

Source code in echo_energycalc/feature_calc_core.py
Python
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: pl.DataFrame | None = None,
    **kwargs,
) -> pl.DataFrame:
    """
    Run the calculation for the given period and optionally save the result.

    Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
    then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
    of overriding this method.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        - ``"all"``: save in performance_db and bazefield.
        - ``"performance_db"``: save only in performance_db.
        - ``None``: do not save.

        By default None.
    cached_data : pl.DataFrame | None, optional
        Polars DataFrame with features already fetched/calculated. Passed to
        ``_compute`` to enable chained calculations without re-querying
        performance_db. By default None.
    **kwargs
        Forwarded to :meth:`save`.

    Returns
    -------
    pl.DataFrame
        Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
    """
    result = self._compute(period, cached_data=cached_data)
    self._result = result
    self.save(save_into=save_into, **kwargs)
    return result

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
Python
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Please call 'calculate' before calling 'save'.",
        )

    if save_into is None:
        return

    upload_to_bazefield = save_into == "all"

    if not isinstance(self.result, pl.DataFrame):
        raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
    if "timestamp" not in self.result.columns:
        raise ValueError("result DataFrame must contain a 'timestamp' column.")

    # rename feature columns to "object@feature" format expected by perfdb polars insert
    feat_cols = [c for c in self.result.columns if c != "timestamp"]
    result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})

    self._perfdb.features.values.series.insert(
        df=result_pl,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )