Skip to content

Required Features

Overview

The RequiredFeatures class is a subclass of CalculationRequirement that is used to get feature values for specific objects and periods. This is one of the most important requirements, as most of the feature calculations will require some features to be present in the database before proceeding with the calculation.

Usage

This requirement can be instantiated with a list of features that need to be present for each object. Below there is an example of how to use this requirement:

requirement = RequiredFeatures(features={"SDM1-VRN1-01": ["wind_speed", "active_power"]})

After calling check and get_data methods, the data attribute of the requirement will be a DataFrame with the required features for the desired period. This DataFrame will have the timestamps as the index and the columns will be a MultiIndex with the object as the first level and the feature as the second level.

Database Requirements

This requirement expects that the features table is set with the necessary features for the model of the wanted object. Examples on how to create a feature are better described in the FeatureCalculator section.

To check if the features are set correctly, go to the v_features view in the database.

Class Definition

RequiredFeatures(features, optional=False)

Subclass of CalculationRequirement that defines the features that are required for the calculation.

This will check the performance database for the existence of the required features for the wanted objects.

Parameters:

  • features

    (dict[str, list[str]]) –

    Features that are required for the calculation.

    Should be in the format {object_name: [feature_name, ...], ...}.

  • optional

    (bool, default: False ) –

    Set to True if this is an optional requirement. by default False

Source code in echo_energycalc/calculation_requirement_features.py
def __init__(
    self,
    features: dict[str, list[str]],
    optional: bool = False,
) -> None:
    """
    Constructor of the RequiredFeatures class.

    This will check the performance database for the existence of the required features for the wanted objects.

    Parameters
    ----------
    features : dict[str, list[str]]
        Features that are required for the calculation.

        Should be in the format {object_name: [feature_name, ...], ...}.
    optional : bool, optional
        Set to True if this is an optional requirement. by default False
    """
    super().__init__(optional)

    # check if features is a dict with str keys and list of str values
    if not isinstance(features, dict):
        raise TypeError(f"features must be a dict, not {type(features)}")
    if not all(isinstance(key, str) for key in features):
        raise TypeError(f"all features keys must be str, not {[type(key) for key in features]}")
    if not all(isinstance(value, list) for value in features.values()):
        raise TypeError(f"all features values must be list, not {[type(value) for value in features.values()]}")
    if not all(all(isinstance(item, str) for item in value) for value in features.values()):
        raise TypeError(f"all features values must be list of str, not {[type(item) for value in features.values() for item in value]}")

    self._features = features

checked property

Attribute that defines if the requirement has been checked. It's value will start as False and will be set to True after the check method is called.

Returns:

  • bool

    True if the requirement has been checked.

data property

Data required for the calculation.

Returns:

  • DataFrame

    DataFrame with the required features for the desired period. The columns are a MultiIndex with object name as first level and feature name as second level.

features property

Features that are required for the calculation.

Returns:

  • dict[str, list[str]]

    Features that are required for the calculation in the format {object_name: [feature_name, ...], ...}.

optional property

Attribute that defines if the requirement is optional.

If optional is True, the requirement is only validated to check if it could exist, not if it is actually present. This is useful for requirements that are not necessary for all calculations, but are useful for some of them.

Returns:

  • bool

    True if the requirement is optional.

check()

Method used to check if all required features are present in the database for each object.

This will raise an error if any of the required features are missing.

Returns:

  • bool

    Returns True if all required features are present in the database for each object.

Source code in echo_energycalc/calculation_requirement_features.py
def check(self) -> bool:
    """
    Method used to check if all required features are present in the database for each object.

    This will raise an error if any of the required features are missing.

    Returns
    -------
    bool
        Returns True if all required features are present in the database for each object.
    """
    if self.optional:
        return True

    # iterating each object and checking if all features are present
    for object_name, features in self.features.items():
        missing_features_baze = []
        # faça uma condição para quando a features terminar com "_b#"
        if any(feature.endswith("_b#") for feature in features):
            # getting model name from object_name
            model_name = self._baze.objects.instances.get(object_names=[object_name])
            model_name = model_name[object_name]["attributes"]["domainName"]
            # getting existing features for the object
            existing_features = self._baze.points.definitions.get_ids(
                object_models=[model_name],
            )

            # filtering features to get only the ones that end with "_b#"
            features_baze = [feature for feature in features if feature.endswith("_b#")]

            # adjusting feature name
            features_baze = [feature.removesuffix("_b#") for feature in features_baze]
            # filtering bazefield dictionary to get only the desired features ids
            existing_features = {
                key: existing_features[model_name][key] for key in features_baze if key in existing_features[model_name]
            }
            # checking if all required features are present
            missing_features_baze = set(features_baze) - set(existing_features.keys())
        # filtering features to get only the ones that do not end with "_b#"
        non_baze_features = [feature for feature in features if not feature.endswith("_b#")]
        # getting existing features
        existing_features = self._perfdb.features.definitions.get_ids(
            object_names=[object_name],
            feature_names=non_baze_features,
        )
        if not existing_features:
            raise ValueError(f"Could not find any of the following features for object {object_name}: {non_baze_features}")
        obj_model = next(iter(existing_features.keys()))
        # checking if all required features are present
        missing_features_pg = set(non_baze_features) - set(existing_features[obj_model].keys())
        missing_features = list(missing_features_baze) + list(missing_features_pg)
        if len(missing_features) > 0:
            raise ValueError(f"Features {missing_features} do not exist for object {object_name}")

    self._checked = True

    return True

get_data(period, reindex='infer', cached_data=None, round_timestamps=None, **kwargs)

Method used to get the data for the required features from performance_db

This will only get the features that are not present in the cached_data DataFrame.

Parameters:

  • period

    (DateTimeRange) –

    Desired period for the features.

  • reindex

    (str, default: 'infer' ) –

    Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.

  • cached_data

    (DataFrame | None, default: None ) –

    Cached data that can be used to avoid querying performance_db.

    This is a DataFrame in the same format as the one returned by the method "perfdb.features.values.series.get", or to be more clear, the columns are a MultiIndex with object name as first level and feature name as second level.

    This is important when doing calculations that involve multiple features to avoid needing to save the intermediate results in performance_db and then query them again.

    By default None.

  • round_timestamps

    (RoundTimeStampsDict | None, default: None ) –

    Dictionary used to round timestamps with the following keys:

    • freq: timedelta, the frequency to round the timestamps to.
    • tolerance: timedelta, the tolerance to be used when rounding timestamps

    If set to None, no rounding will be done. Only applicable if output_type is "DataFrame". By default None

Returns:

  • DataFrame

    DataFrame with the required features for the desired period.

Source code in echo_energycalc/calculation_requirement_features.py
def get_data(
    self,
    period: DateTimeRange,
    reindex: str | None = "infer",
    cached_data: DataFrame | None = None,
    round_timestamps: RoundTimeStampsDict | None = None,
    **kwargs,  # noqa: ARG002
) -> DataFrame:
    """
    Method used to get the data for the required features from performance_db

    This will only get the features that are not present in the cached_data DataFrame.

    Parameters
    ----------
    period : DateTimeRange
        Desired period for the features.
    reindex : str, optional
        Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.
    cached_data : DataFrame | None, optional
        Cached data that can be used to avoid querying performance_db.

        This is a DataFrame in the same format as the one returned by the method "perfdb.features.values.series.get", or to be more clear, the columns are a MultiIndex with object name as first level and feature name as second level.

        This is important when doing calculations that involve multiple features to avoid needing to save the intermediate results in performance_db and then query them again.

        By default None.
    round_timestamps : RoundTimeStampsDict | None, optional
        Dictionary used to round timestamps with the following keys:

        - freq: timedelta, the frequency to round the timestamps to.
        - tolerance: timedelta, the tolerance to be used when rounding timestamps

        If set to None, no rounding will be done. Only applicable if output_type is "DataFrame".
        By default None

    Returns
    -------
    DataFrame
        DataFrame with the required features for the desired period.
    """
    # checking if necessary keyword arguments are present
    if not isinstance(period, DateTimeRange):
        raise TypeError(f"period must be a DateTimeRange, not {type(period)}")
    if not isinstance(reindex, str | type(None)):
        raise TypeError(f"reindex must be a str or None, not {type(reindex)}")
    if not isinstance(cached_data, DataFrame | type(None)):
        raise TypeError(f"cached_data must be a pandas DataFrame, not {type(cached_data)}")

    # check if requirement has been checked
    if not self._checked:
        self.check()

    try:
        # copying required features to avoid changing the original dictionary
        features = copy.deepcopy(self.features)

        # if cached_data is not None check if it is a DataFrame in the correct format
        if cached_data is not None and not cached_data.empty:
            if not isinstance(cached_data, DataFrame):
                raise TypeError(f"cached_data must be a pandas DataFrame, not {type(cached_data)}")
            # checking if it is a DataFrame with a MultiIndex with object name as first level and feature name as second level
            if not isinstance(cached_data.columns, MultiIndex):
                raise TypeError(f"cached_data must have a MultiIndex as columns, not {type(cached_data.columns)}")
            if cached_data.columns.names != ["object", "feature"]:
                raise TypeError(
                    f"cached_data must have a MultiIndex with two levels ['object', 'feature'], not {cached_data.columns.names}",
                )
            # converting cached_data MultiIndex to a list of tuples
            cached_data_features = list(cached_data.columns.to_flat_index())
            # recreating features dictionary without the features that are already in cached_data
            tmp_features = copy.deepcopy(features)
            # creating a dict with the required features from cached_data
            required_cached_data_features = {}
            for req_obj, req_feat in tmp_features.items():
                required_cached_data_features[req_obj] = []
                # removing the features that are already in cached_data
                for feat in req_feat:
                    # checking if cached data has the complete period for the desired feature
                    # this is done to make sure that if the requirement needs a period that is not in cached_data, it will be queried from the database
                    if (req_obj, feat) in cached_data_features and (
                        period.start >= cached_data.loc[:, (req_obj, feat)].dropna().index.min()
                        and period.end <= cached_data.loc[:, (req_obj, feat)].dropna().index.max()
                    ):
                        features[req_obj].remove(feat)
                        required_cached_data_features[req_obj] += [feat]
                # removing the object if it has no more required features needed for it
                if len(features[req_obj]) == 0:
                    features.pop(req_obj)
                if len(required_cached_data_features[req_obj]) == 0:
                    required_cached_data_features.pop(req_obj)

        # getting required features
        if len(features) > 0:
            # separating features that end with "_b#" and those that do not
            baze_features = {obj: [feat for feat in feats if feat.endswith("_b#")] for obj, feats in features.items()}
            perfdb_features = {obj: [feat for feat in feats if not feat.endswith("_b#")] for obj, feats in features.items()}

            # removing empty entries
            baze_features = {obj: feats for obj, feats in baze_features.items() if feats}
            perfdb_features = {obj: feats for obj, feats in perfdb_features.items() if feats}

            # getting required features from baze
            if baze_features:
                # correct the feature names to remove the "_b#" suffix
                baze_features = {obj: [feat.removesuffix("_b#") for feat in feats] for obj, feats in baze_features.items()}
                baze_data = self._baze.points.values.series.get(
                    points=baze_features,
                    period=period,
                    reindex=reindex,
                    round_timestamps=round_timestamps,
                )
                # rename the columns to add the "_b#" suffix
                baze_data.columns = MultiIndex.from_tuples([(col[0], col[1] + "_b#") for col in baze_data.columns])
                baze_data.columns.names = ["object", "feature"]
            else:
                baze_data = DataFrame()

            # getting required features from performance_db
            if perfdb_features:
                perfdb_data = self._perfdb.features.values.series.get(features=perfdb_features, period=period, reindex=reindex)
                perfdb_data.columns.names = ["object", "feature"]
            else:
                perfdb_data = DataFrame()

            # merging both DataFrames
            self._data = concat([baze_data, perfdb_data], axis=1, join="outer")
        else:
            self._data = DataFrame()
            self._data.columns = MultiIndex.from_product([[], []], names=["object", "feature"])

        # adjusting index to have a [s] precision
        self._data.index = self._data.index.astype("datetime64[s]")

        if cached_data is not None and (not cached_data.empty and len(required_cached_data_features) > 0):
            # filtering cached_data to only include the required features
            # this should filter both first and second level of the MultiIndex columns of cached_data based on key and values of self._data
            wanted_columns = MultiIndex.from_tuples(
                [(key, val) for key, value in required_cached_data_features.items() for val in value],
            )
            # getting all columns that are in self._data and not in cached_data
            intersection_cols = self._data.columns.intersection(cached_data.columns)
            # merging to get all the wanted columns
            wanted_columns = wanted_columns.union(intersection_cols)
            # getting cached_data with only the wanted columns
            required_cached_data = cached_data.loc[:, wanted_columns]
            # adjusting the index of self._data to add elements that are in cached_data but not in self._data
            # this is done because this way we make sure that all the timestamps are present in the final DataFrame
            self._data = self._data.reindex(self._data.index.union(required_cached_data.index), axis=0)
            # first merging the columns that are in both self._data and cached_data prioritizing cached data
            # the update method is used because we want to overwrite the values of self._data with the values of cached_data if they are present
            # we do this because we assume cached data is more recent (just calculated)
            self._data.update(required_cached_data.loc[:, intersection_cols], overwrite=True)
            # now adding new columns from cached_data
            new_cols = required_cached_data.columns.difference(self._data.columns)
            # merging required features with cached_data
            self._data = concat([self._data, required_cached_data.loc[:, new_cols]], axis=1, join="outer")
            # sorting the columns based on MultiIndex
            self._data = self._data.reindex(sorted(self._data.columns), axis=1)

    except Exception as e:
        if self.optional:
            self._data = DataFrame()
        else:
            raise e

    return self.data