Predictive Model¶
Overview¶
The FeatureCalcPredictiveModel class is a subclass of FeatureCalculator that calculates the value of a feature using a predictive model. This is useful when we have pre-trained models that can predict the value of a feature based on the values of other features, such as a machine learning model. Currently this class expects that the model has been trained and save using the abstract class PredictiveModel and that the model is saved in the database as a pickle file.
Calculation Logic¶
There's not much to say, the calculation is basically as follows:
- Load the model from the database.
- Find the features that are needed to predict the value of the feature.
- Get the values of these features.
- Predict the value of the feature using the model and the values of the features.
- Return the predicted value.
Database Requirements¶
- Feature attribute
server_calc_typemust be set topredictive_model. -
Feature attribute
feature_options_jsonwith the following keys:calc_model_type: Type of the calculation model that will be used to calculate the feature.target_feature: Name of the feature that the model was trained to predict.
The following example is used for feature
predicted_power:{ "target_feature": "gen_bear_de_temp", "calc_model_type": "normal_behavior_model" }Keep in mind that these are only used to find the desired calculation model in the database. See views
v_calculation_modelsandv_calculation_models_files_deffor more details. - The features defined in the model must be present in the database with the names as when the model was trained. If the names of the features changed we will need to manually download the pickle file from the database, change the names of the features in the model and upload it again.
Class Definition¶
FeatureCalcPredictiveModel(object_name, feature)
¶
Class used to calculate features that depend on a PredictiveModel.
For this class to work, the feature must have the attribute feature_options_json with the following keys:
calc_model_type: Type of the calculation model that will be used to calculate the feature.target_feature: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.multiple_features: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.- If not present or False, only the first feature returned will be used (their first in the model's
target_featuresattribute). - If it is present, it must be a boolean value. If
True, all features returned, considering it's names present in the model'starget_featuresattribute, will be returned. Keep in mind that in this case the names used will come from the model'starget_featuresattribute, so it must match the names of the features in performance_db.
- If not present or False, only the first feature returned will be used (their first in the model's
Keep in mind that calc_model_type and target_feature will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the calc_model_type must be the exact type of the model, and the target_feature must be a substring of the model name, indicating that the model was trained to predict that feature.
One example is for the feature NbmGearBearTemp_10min.AVG of G97 turbines, which has the following feature_options_json: {"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}. In this case, the model name in the database is temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG, which matches the target_feature substring and has the calc_model_type as normal_behavior_model.
Usually the name of the calculation models in the database should follow the pattern of '
The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.
Parameters:
-
(object_name¶str) –Name of the object for which the feature is calculated. It must exist in performance_db.
-
(feature¶str) –Feature of the object that is calculated. It must exist in performance_db.
Source code in echo_energycalc/feature_calc_predictive_model.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
Class used to calculate features that depend on a PredictiveModel.
For this class to work, the feature must have the attribute `feature_options_json` with the following keys:
- `calc_model_type`: Type of the calculation model that will be used to calculate the feature.
- `target_feature`: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.
- `multiple_features`: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.
- If not present or False, only the first feature returned will be used (their first in the model's`target_features` attribute).
- If it is present, it must be a boolean value. If `True`, all features returned, considering it's names present in the model's `target_features` attribute, will be returned. Keep in mind that in this case the names used will come from the model's `target_features` attribute, so it must match the names of the features in performance_db.
Keep in mind that `calc_model_type` and `target_feature` will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the `calc_model_type` must be the exact type of the model, and the `target_feature` must be a substring of the model name, indicating that the model was trained to predict that feature.
One example is for the feature `NbmGearBearTemp_10min.AVG` of G97 turbines, which has the following feature_options_json: `{"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}`. In this case, the model name in the database is `temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG`, which matches the `target_feature` substring and has the `calc_model_type` as `normal_behavior_model`.
Usually the name of the calculation models in the database should follow the pattern of '<desired prefix>!<target feature>', where the desired prefix is recommended to be the type of the model or the calculation model class used to calculate the feature.
The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.
Parameters
----------
object_name : str
Name of the object for which the feature is calculated. It must exist in performance_db.
feature : str
Feature of the object that is calculated. It must exist in performance_db.
"""
# initialize parent class
super().__init__(object_name, feature)
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._get_required_data()
self._feature_attributes = self._get_requirement_data("RequiredFeatureAttributes")[self.feature]
self._validate_feature_options()
# getting calculation model considering that the model type is "normal_behavior_model" and the name contains the feature name
self._add_requirement(
RequiredCalcModels(
calc_models={
self.object: [
{
"model_name": f".*{self._feature_attributes['feature_options_json']['target_feature']}.*",
"model_type": f"^{self._feature_attributes['feature_options_json']['calc_model_type']}$",
},
],
},
),
)
self._get_required_data()
# getting the model name
self._model_name = next(iter(self._get_requirement_data("RequiredCalcModels")[self.object].keys()))
# loading calculation model from file
try:
self._model: PredictiveModel = self._get_requirement_data("RequiredCalcModels")[self.object][self._model_name]["model"]
if not isinstance(self._model, PredictiveModel):
raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")
self._model._deserialize_model() # noqa: SLF001
except Exception as e:
raise RuntimeError(f"'{self.object}' failed to load PredictiveModel.") from e
# checking if model object is an instance of a subclass of PredictiveModel
if not isinstance(self._model, PredictiveModel):
raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")
# defining required features
self._add_requirement(RequiredFeatures(features={self.object: self._model.model_arguments.reference_features}))
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
Series | DataFrame | None:–Result of the calculation if the method "calculate" was called. None otherwise.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Method that will calculate the feature.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(cached_data¶DataFrame | None, default:None) –DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None
-
(**kwargs¶dict, default:{}) –Additional arguments that will be passed to the "save" method.
Returns:
-
DataFrame–Pandas DataFrame with the calculated features.
Source code in echo_energycalc/feature_calc_predictive_model.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: pd.DataFrame | None = None,
**kwargs,
) -> pd.DataFrame:
"""
Method that will calculate the feature.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
cached_data : DataFrame | None, optional
DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
By default None
**kwargs : dict, optional
Additional arguments that will be passed to the "save" method.
Returns
-------
DataFrame
Pandas DataFrame with the calculated features.
"""
t0 = perf_counter()
# adjusting period to account for lagged timestamps
adjusted_period = period.copy()
# in case model arguments have lagged timestamps, we need to adjust the period to account for that
if hasattr(self._model.model_arguments, "lagged_timestamps") and self._model.model_arguments.lagged_timestamps is not None:
adjusted_period.start = adjusted_period.start - self._model.model_arguments.lagged_timestamps * timedelta(minutes=10)
# creating a DataFrame to store the results
features = (
[self.feature]
if not self._feature_attributes["feature_options_json"].get("multiple_features", False)
else self._model.model_arguments.target_features
)
columns = pd.MultiIndex.from_product(
[[self.object], features],
names=["object", "feature"],
)
result = self._create_empty_result(period=adjusted_period, result_type="DataFrame", columns=columns)
# getting feature values
self._get_required_data(period=adjusted_period, reindex="10min", cached_data=cached_data)
# getting DataFrame with feature values
df = self._get_requirement_data("RequiredFeatures").loc[:, pd.IndexSlice[self.object, :]]
t1 = perf_counter()
# adjusting format of the DataFrame to be compatible with the model
# renaming index
df.index.name = "timestamp"
# converting the received multindex that has the object names as the columns first level to the index first level
df = df.T.unstack(level=0).T
# switching the levels of the index
df = df.swaplevel().sort_index()
# adjusting dtype of index
df.index = df.index.set_levels(df.index.levels[0].astype("string[pyarrow]"), level=0)
df.index = df.index.set_levels(df.index.levels[1].astype("datetime64[s]"), level=1)
# dropping NaNs
df = df.dropna(how="any")
df = df.sort_index(level=["timestamp", "object"])
# converting the data to numpy float32 for compatibility with tensorflow
df = df.astype("float32")
t2 = perf_counter()
# only predict if there is data
if not df.empty:
# predicting values
model_output = self._model.predict(df)
# dropping one level from the index
model_output = model_output.droplevel("object")
# adding output to results
wanted_idx = result.index.intersection(model_output.index)
features_to_get = result.columns.get_level_values("feature").tolist()
if len(features_to_get) == 1:
# ignoring features_to_get and using the first feature in the model's target_features
features_to_get = [self._model.model_arguments.target_features[0]]
result.loc[wanted_idx, result.columns] = model_output.loc[wanted_idx, features_to_get].values
t3 = perf_counter()
# trimming result to the original period
result = result[(result.index >= period.start) & (result.index <= period.end)].copy()
# dropping all NaN rows
result = result.dropna(how="all")
# adding calculated feature to class result attribute
self._result = result.copy()
# saving results
self.save(save_into=save_into, **kwargs)
logger.debug(
f"{self.object} - {self.feature} - {period}: Requirements during calc {t1 - t0:.2f}s - Data adjustments {t2 - t1:.2f}s - Model prediction {t3 - t2:.2f}s - Final adjustments {perf_counter() - t3:.2f}s",
)
return result
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
)
if save_into is None:
return
if isinstance(save_into, str):
if save_into not in ["performance_db", "all"]:
raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
upload_to_bazefield = save_into == "all"
elif save_into is None:
upload_to_bazefield = False
else:
raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")
# converting result series to DataFrame if needed
if isinstance(self.result, Series):
result_df = self.result.to_frame()
elif isinstance(self.result, DataFrame):
result_df = self.result.droplevel(0, axis=1)
else:
raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")
# adjusting DataFrame to be inserted in the database
# making the columns a Multindex with levels object_name and feature_name
result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])
self._perfdb.features.values.series.insert(
df=result_df,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)