Predictive Model¶
Overview¶
FeatureCalcPredictiveModel applies a pre-trained machine learning model (serialized as a pickle file and stored in performance_db) to predict a feature's value from a set of input features. The typical use case is a Normal Behavior Model (NBM) that predicts a sensor reading (e.g., a bearing temperature) under normal conditions, enabling anomaly detection by comparing the prediction against actual measurements.
The model must have been trained using the PredictiveModel abstract class from echo-calcmodels. The feature inputs are determined automatically from the model's model_arguments.reference_features attribute — no manual configuration of inputs is needed.
Calculation Logic¶
Initialization¶
At instantiation:
- Reads
feature_options_jsonto find the calc model type and target feature name. - Queries performance_db for a calc model matching both conditions simultaneously:
model_typematchescalc_model_typeexactly- model
namecontainstarget_featureas a substring
- Loads and deserializes the matched model from the database.
- Reads
model_arguments.reference_featuresfrom the loaded model to know which SCADA features to fetch.
Per-Period Computation¶
Inside _compute():
-
Adjust period for lagged inputs: If the model uses lagged timestamps (
model_arguments.lagged_timestamps = N), the fetch period is extended back byN × 10 minutesto provide the model with the historical context it needs. -
Fetch features: Fetches all
reference_featuresfor the object and reindexes to a uniform 10-minute grid. -
Prepare model input: Converts the Polars DataFrame to the pandas MultiIndex format expected by the
PredictiveModel.predict()interface. Rows with any null input feature are dropped (the model cannot predict from incomplete data). -
Predict: Calls
self._model.predict(df)on the non-null rows. -
Multiple outputs: If
multiple_features = trueinfeature_options_json, all columns inmodel_arguments.target_featuresare returned as separate feature columns. Otherwise only the first target feature is used. -
Trim result: The result is filtered back to the original requested period (removing the lagged extension used for fetching).
-
Drop all-null rows: Timestamps where prediction could not be made (all inputs were null) are dropped from the output.
Database Requirements¶
Feature Attribute¶
| Attribute | Value |
|---|---|
server_calc_type |
predictive_model |
feature_options_json |
JSON object — see below |
feature_options_json Schema¶
| Key | Type | Required | Description |
|---|---|---|---|
calc_model_type |
string | Yes | Exact type of the calculation model (e.g., "normal_behavior_model"). Used as a regex anchor ^...$. |
target_feature |
string | Yes | Substring of the model name in performance_db. The calculator finds the first model whose name contains this string. |
multiple_features |
boolean | No | If true, all target_features from the model are returned as separate columns. Defaults to false. |
Example:
{
"calc_model_type": "normal_behavior_model",
"target_feature": "NbmGearBearTemp_10min.AVG"
}
Note
The convention for model names in the database is <prefix>!<target_feature> (e.g., temperature_nbm_G97-2.07!NbmGearBearTemp_10min.AVG). The target_feature string in the JSON is matched as a substring of this name, so the ! prefix makes it unambiguous.
Calculation Model¶
| Requirement | Description |
|---|---|
| Model type | Must match calc_model_type exactly |
| Model name | Must contain target_feature as a substring |
| Model class | Must be a subclass of PredictiveModel from echo-calcmodels |
| Reference features | Defined in model_arguments.reference_features — these SCADA features must exist in performance_db for the object |
Class Definition¶
FeatureCalcPredictiveModel(object_name, feature)
¶
Class used to calculate features that depend on a PredictiveModel.
For this class to work, the feature must have the attribute feature_options_json with the following keys:
calc_model_type: Type of the calculation model that will be used to calculate the feature.target_feature: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.multiple_features: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.- If not present or False, only the first feature returned will be used (their first in the model's
target_featuresattribute). - If it is present, it must be a boolean value. If
True, all features returned, considering it's names present in the model'starget_featuresattribute, will be returned. Keep in mind that in this case the names used will come from the model'starget_featuresattribute, so it must match the names of the features in performance_db.
- If not present or False, only the first feature returned will be used (their first in the model's
Keep in mind that calc_model_type and target_feature will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the calc_model_type must be the exact type of the model, and the target_feature must be a substring of the model name, indicating that the model was trained to predict that feature.
One example is for the feature NbmGearBearTemp_10min.AVG of G97 turbines, which has the following feature_options_json: {"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}. In this case, the model name in the database is temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG, which matches the target_feature substring and has the calc_model_type as normal_behavior_model.
Usually the name of the calculation models in the database should follow the pattern of '
The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.
Parameters:
-
(object_name¶str) –Name of the object for which the feature is calculated. It must exist in performance_db.
-
(feature¶str) –Feature of the object that is calculated. It must exist in performance_db.
Source code in echo_energycalc/feature_calc_predictive_model.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
Class used to calculate features that depend on a PredictiveModel.
For this class to work, the feature must have the attribute `feature_options_json` with the following keys:
- `calc_model_type`: Type of the calculation model that will be used to calculate the feature.
- `target_feature`: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.
- `multiple_features`: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.
- If not present or False, only the first feature returned will be used (their first in the model's`target_features` attribute).
- If it is present, it must be a boolean value. If `True`, all features returned, considering it's names present in the model's `target_features` attribute, will be returned. Keep in mind that in this case the names used will come from the model's `target_features` attribute, so it must match the names of the features in performance_db.
Keep in mind that `calc_model_type` and `target_feature` will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the `calc_model_type` must be the exact type of the model, and the `target_feature` must be a substring of the model name, indicating that the model was trained to predict that feature.
One example is for the feature `NbmGearBearTemp_10min.AVG` of G97 turbines, which has the following feature_options_json: `{"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}`. In this case, the model name in the database is `temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG`, which matches the `target_feature` substring and has the `calc_model_type` as `normal_behavior_model`.
Usually the name of the calculation models in the database should follow the pattern of '<desired prefix>!<target feature>', where the desired prefix is recommended to be the type of the model or the calculation model class used to calculate the feature.
The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.
Parameters
----------
object_name : str
Name of the object for which the feature is calculated. It must exist in performance_db.
feature : str
Feature of the object that is calculated. It must exist in performance_db.
"""
# initialize parent class
super().__init__(object_name, feature)
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._fetch_requirements()
self._feature_attributes = self._requirement_data("RequiredFeatureAttributes")[self.feature]
self._validate_feature_options()
# getting calculation model considering that the model type is "normal_behavior_model" and the name contains the feature name
self._add_requirement(
RequiredCalcModels(
calc_models={
self.object: [
{
"model_name": f".*{self._feature_attributes['feature_options_json']['target_feature']}.*",
"model_type": f"^{self._feature_attributes['feature_options_json']['calc_model_type']}$",
},
],
},
),
)
self._fetch_requirements()
# getting the model name
self._model_name = next(iter(self._requirement_data("RequiredCalcModels")[self.object].keys()))
# loading calculation model from file
try:
self._model: PredictiveModel = self._requirement_data("RequiredCalcModels")[self.object][self._model_name]["model"]
if not isinstance(self._model, PredictiveModel):
raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")
self._model._deserialize_model() # noqa: SLF001
except Exception as e:
raise RuntimeError(f"'{self.object}' failed to load PredictiveModel.") from e
# checking if model object is an instance of a subclass of PredictiveModel
if not isinstance(self._model, PredictiveModel):
raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")
# defining required features
self._add_requirement(RequiredFeatures(features={self.object: self._model.model_arguments.reference_features}))
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
DataFrame | None–Polars DataFrame with a
"timestamp"column and one or more feature value columns. None untilcalculateis called.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Run the calculation for the given period and optionally save the result.
Calls :meth:_compute to get the result, stores it in :attr:result,
then calls :meth:save. Subclasses should implement :meth:_compute instead
of overriding this method.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –"all": save in performance_db and bazefield."performance_db": save only in performance_db.None: do not save.
By default None.
-
(cached_data¶DataFrame | None, default:None) –Polars DataFrame with features already fetched/calculated. Passed to
_computeto enable chained calculations without re-querying performance_db. By default None. -
–**kwargs¶Forwarded to :meth:
save.
Returns:
-
DataFrame–Polars DataFrame with a
"timestamp"column and one or more feature value columns.
Source code in echo_energycalc/feature_calc_core.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: pl.DataFrame | None = None,
**kwargs,
) -> pl.DataFrame:
"""
Run the calculation for the given period and optionally save the result.
Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
of overriding this method.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
- ``"all"``: save in performance_db and bazefield.
- ``"performance_db"``: save only in performance_db.
- ``None``: do not save.
By default None.
cached_data : pl.DataFrame | None, optional
Polars DataFrame with features already fetched/calculated. Passed to
``_compute`` to enable chained calculations without re-querying
performance_db. By default None.
**kwargs
Forwarded to :meth:`save`.
Returns
-------
pl.DataFrame
Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
"""
result = self._compute(period, cached_data=cached_data)
self._result = result
self.save(save_into=save_into, **kwargs)
return result
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Please call 'calculate' before calling 'save'.",
)
if save_into is None:
return
upload_to_bazefield = save_into == "all"
if not isinstance(self.result, pl.DataFrame):
raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
if "timestamp" not in self.result.columns:
raise ValueError("result DataFrame must contain a 'timestamp' column.")
# rename feature columns to "object@feature" format expected by perfdb polars insert
feat_cols = [c for c in self.result.columns if c != "timestamp"]
result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})
self._perfdb.features.values.series.insert(
df=result_pl,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)