Skip to content

Predictive Model

Overview

The FeatureCalcPredictiveModel class is a subclass of FeatureCalculator that calculates the value of a feature using a predictive model. This is useful when we have pre-trained models that can predict the value of a feature based on the values of other features, such as a machine learning model. Currently this class expects that the model has been trained and save using the abstract class PredictiveModel and that the model is saved in the database as a pickle file.

Calculation Logic

There's not much to say, the calculation is basically as follows:

  1. Load the model from the database.
  2. Find the features that are needed to predict the value of the feature.
  3. Get the values of these features.
  4. Predict the value of the feature using the model and the values of the features.
  5. Return the predicted value.

Database Requirements

  • Feature attribute server_calc_type must be set to predictive_model.
  • Feature attribute feature_options_json with the following keys:

    • calc_model_type: Type of the calculation model that will be used to calculate the feature.
    • target_feature: Name of the feature that the model was trained to predict.

    The following example is used for feature predicted_power:

    {
        "target_feature": "gen_bear_de_temp",
        "calc_model_type": "normal_behavior_model"
    }
    

    Keep in mind that these are only used to find the desired calculation model in the database. See views v_calculation_models and v_calculation_models_files_def for more details. - The features defined in the model must be present in the database with the names as when the model was trained. If the names of the features changed we will need to manually download the pickle file from the database, change the names of the features in the model and upload it again.

Class Definition

FeatureCalcPredictiveModel(object_name, feature)

Class used to calculate features that depend on a PredictiveModel.

For this class to work, the feature must have the attribute feature_options_json with the following keys:

  • calc_model_type: Type of the calculation model that will be used to calculate the feature.
  • target_feature: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.
  • multiple_features: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.
    • If not present or False, only the first feature returned will be used (their first in the model'starget_features attribute).
    • If it is present, it must be a boolean value. If True, all features returned, considering it's names present in the model's target_features attribute, will be returned. Keep in mind that in this case the names used will come from the model's target_features attribute, so it must match the names of the features in performance_db.

Keep in mind that calc_model_type and target_feature will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the calc_model_type must be the exact type of the model, and the target_feature must be a substring of the model name, indicating that the model was trained to predict that feature.

One example is for the feature NbmGearBearTemp_10min.AVG of G97 turbines, which has the following feature_options_json: {"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}. In this case, the model name in the database is temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG, which matches the target_feature substring and has the calc_model_type as normal_behavior_model.

Usually the name of the calculation models in the database should follow the pattern of '!', where the desired prefix is recommended to be the type of the model or the calculation model class used to calculate the feature.

The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.

Parameters:

  • object_name

    (str) –

    Name of the object for which the feature is calculated. It must exist in performance_db.

  • feature

    (str) –

    Feature of the object that is calculated. It must exist in performance_db.

Source code in echo_energycalc/feature_calc_predictive_model.py
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    Class used to calculate features that depend on a PredictiveModel.

    For this class to work, the feature must have the attribute `feature_options_json` with the following keys:

    - `calc_model_type`: Type of the calculation model that will be used to calculate the feature.
    - `target_feature`: Name of the feature that the model was trained to predict. We assume the calculation model will have this string in its name, so it can be used as a regex to match the model name.
    - `multiple_features`: Optional key that indicates when the model outputs more than one feature if all should be returned or just the first one.
        - If not present or False, only the first feature returned will be used (their first in the model's`target_features` attribute).
        - If it is present, it must be a boolean value. If `True`, all features returned, considering it's names present in the model's `target_features` attribute, will be returned. Keep in mind that in this case the names used will come from the model's `target_features` attribute, so it must match the names of the features in performance_db.

    Keep in mind that `calc_model_type` and `target_feature` will be used to filter the calculation models in the database looking for just ONE that matches both. To be more clear, the `calc_model_type` must be the exact type of the model, and the `target_feature` must be a substring of the model name, indicating that the model was trained to predict that feature.

    One example is for the feature `NbmGearBearTemp_10min.AVG` of G97 turbines, which has the following feature_options_json: `{"calc_model_type": "normal_behavior_model", "target_feature": "NbmGearBearTemp_10min.AVG"}`. In this case, the model name in the database is `temperature_nbm_G97-2.07!GenPhaseBTemp_10min.AVG`, which matches the `target_feature` substring and has the `calc_model_type` as `normal_behavior_model`.

    Usually the name of the calculation models in the database should follow the pattern of '<desired prefix>!<target feature>', where the desired prefix is recommended to be the type of the model or the calculation model class used to calculate the feature.

    The class will handle getting all the necessary features for the model to work based on what was defined when the model was trained.

    Parameters
    ----------
    object_name : str
        Name of the object for which the feature is calculated. It must exist in performance_db.
    feature : str
        Feature of the object that is calculated. It must exist in performance_db.
    """
    # initialize parent class
    super().__init__(object_name, feature)

    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))

    self._get_required_data()

    self._feature_attributes = self._get_requirement_data("RequiredFeatureAttributes")[self.feature]

    self._validate_feature_options()

    # getting calculation model considering that the model type is "normal_behavior_model" and the name contains the feature name
    self._add_requirement(
        RequiredCalcModels(
            calc_models={
                self.object: [
                    {
                        "model_name": f".*{self._feature_attributes['feature_options_json']['target_feature']}.*",
                        "model_type": f"^{self._feature_attributes['feature_options_json']['calc_model_type']}$",
                    },
                ],
            },
        ),
    )
    self._get_required_data()

    # getting the model name
    self._model_name = next(iter(self._get_requirement_data("RequiredCalcModels")[self.object].keys()))

    # loading calculation model from file
    try:
        self._model: PredictiveModel = self._get_requirement_data("RequiredCalcModels")[self.object][self._model_name]["model"]
        if not isinstance(self._model, PredictiveModel):
            raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")
        self._model._deserialize_model()  # noqa: SLF001

    except Exception as e:
        raise RuntimeError(f"'{self.object}' failed to load PredictiveModel.") from e

    # checking if model object is an instance of a subclass of PredictiveModel
    if not isinstance(self._model, PredictiveModel):
        raise TypeError(f"'{self.object}' is not an instance of a subclass of PredictiveModel.")

    # defining required features
    self._add_requirement(RequiredFeatures(features={self.object: self._model.model_arguments.reference_features}))

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • Series | DataFrame | None:

    Result of the calculation if the method "calculate" was called. None otherwise.

calculate(period, save_into=None, cached_data=None, **kwargs)

Method that will calculate the feature.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None

  • **kwargs

    (dict, default: {} ) –

    Additional arguments that will be passed to the "save" method.

Returns:

  • DataFrame

    Pandas DataFrame with the calculated features.

Source code in echo_energycalc/feature_calc_predictive_model.py
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: pd.DataFrame | None = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Method that will calculate the feature.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    cached_data : DataFrame | None, optional
        DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
        By default None
    **kwargs : dict, optional
        Additional arguments that will be passed to the "save" method.

    Returns
    -------
    DataFrame
        Pandas DataFrame with the calculated features.
    """
    t0 = perf_counter()

    # adjusting period to account for lagged timestamps
    adjusted_period = period.copy()

    # in case model arguments have lagged timestamps, we need to adjust the period to account for that
    if hasattr(self._model.model_arguments, "lagged_timestamps") and self._model.model_arguments.lagged_timestamps is not None:
        adjusted_period.start = adjusted_period.start - self._model.model_arguments.lagged_timestamps * timedelta(minutes=10)

    # creating a DataFrame to store the results
    features = (
        [self.feature]
        if not self._feature_attributes["feature_options_json"].get("multiple_features", False)
        else self._model.model_arguments.target_features
    )
    columns = pd.MultiIndex.from_product(
        [[self.object], features],
        names=["object", "feature"],
    )
    result = self._create_empty_result(period=adjusted_period, result_type="DataFrame", columns=columns)

    # getting feature values
    self._get_required_data(period=adjusted_period, reindex="10min", cached_data=cached_data)

    # getting DataFrame with feature values
    df = self._get_requirement_data("RequiredFeatures").loc[:, pd.IndexSlice[self.object, :]]

    t1 = perf_counter()

    # adjusting format of the DataFrame to be compatible with the model

    # renaming index
    df.index.name = "timestamp"

    # converting the received multindex that has the object names as the columns first level to the index first level
    df = df.T.unstack(level=0).T

    # switching the levels of the index
    df = df.swaplevel().sort_index()

    # adjusting dtype of index
    df.index = df.index.set_levels(df.index.levels[0].astype("string[pyarrow]"), level=0)
    df.index = df.index.set_levels(df.index.levels[1].astype("datetime64[s]"), level=1)

    # dropping NaNs
    df = df.dropna(how="any")
    df = df.sort_index(level=["timestamp", "object"])

    # converting the data to numpy float32 for compatibility with tensorflow
    df = df.astype("float32")

    t2 = perf_counter()

    # only predict if there is data
    if not df.empty:
        # predicting values
        model_output = self._model.predict(df)
        # dropping one level from the index
        model_output = model_output.droplevel("object")

        # adding output to results
        wanted_idx = result.index.intersection(model_output.index)
        features_to_get = result.columns.get_level_values("feature").tolist()
        if len(features_to_get) == 1:
            # ignoring features_to_get and using the first feature in the model's target_features
            features_to_get = [self._model.model_arguments.target_features[0]]

        result.loc[wanted_idx, result.columns] = model_output.loc[wanted_idx, features_to_get].values

    t3 = perf_counter()

    # trimming result to the original period
    result = result[(result.index >= period.start) & (result.index <= period.end)].copy()

    # dropping all NaN rows
    result = result.dropna(how="all")

    # adding calculated feature to class result attribute
    self._result = result.copy()

    # saving results
    self.save(save_into=save_into, **kwargs)

    logger.debug(
        f"{self.object} - {self.feature} - {period}: Requirements during calc {t1 - t0:.2f}s - Data adjustments {t2 - t1:.2f}s - Model prediction {t3 - t2:.2f}s - Final adjustments {perf_counter() - t3:.2f}s",
    )

    return result

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
        )

    if save_into is None:
        return

    if isinstance(save_into, str):
        if save_into not in ["performance_db", "all"]:
            raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
        upload_to_bazefield = save_into == "all"
    elif save_into is None:
        upload_to_bazefield = False
    else:
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")

    # converting result series to DataFrame if needed
    if isinstance(self.result, Series):
        result_df = self.result.to_frame()
    elif isinstance(self.result, DataFrame):
        result_df = self.result.droplevel(0, axis=1)
    else:
        raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")

    # adjusting DataFrame to be inserted in the database
    # making the columns a Multindex with levels object_name and feature_name
    result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])

    self._perfdb.features.values.series.insert(
        df=result_df,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )