Required Features¶
Overview¶
The RequiredFeatures class is a subclass of CalculationRequirement that is used to get feature values for specific objects and periods. This is one of the most important requirements, as most of the feature calculations will require some features to be present in the database before proceeding with the calculation.
Usage¶
This requirement can be instantiated with a list of features that need to be present for each object. Below there is an example of how to use this requirement:
requirement = RequiredFeatures(features={"SDM1-VRN1-01": ["wind_speed", "active_power"]})
After calling check and get_data methods, the data attribute of the requirement will be a DataFrame with the required features for the desired period. This DataFrame will have the timestamps as the index and the columns will be a MultiIndex with the object as the first level and the feature as the second level.
Database Requirements¶
This requirement expects that the features table is set with the necessary features for the model of the wanted object. Examples on how to create a feature are better described in the FeatureCalculator section.
To check if the features are set correctly, go to the v_features view in the database.
Class Definition¶
RequiredFeatures(features, optional=False)
¶
Subclass of CalculationRequirement that defines the features that are required for the calculation.
This will check the performance database for the existence of the required features for the wanted objects.
Parameters:
-
(features¶dict[str, list[str]]) –Features that are required for the calculation.
Should be in the format {object_name: [feature_name, ...], ...}.
-
(optional¶bool, default:False) –Set to True if this is an optional requirement. by default False
Source code in echo_energycalc/calculation_requirement_features.py
def __init__(
self,
features: dict[str, list[str]],
optional: bool = False,
) -> None:
"""
Constructor of the RequiredFeatures class.
This will check the performance database for the existence of the required features for the wanted objects.
Parameters
----------
features : dict[str, list[str]]
Features that are required for the calculation.
Should be in the format {object_name: [feature_name, ...], ...}.
optional : bool, optional
Set to True if this is an optional requirement. by default False
"""
super().__init__(optional)
# check if features is a dict with str keys and list of str values
if not isinstance(features, dict):
raise TypeError(f"features must be a dict, not {type(features)}")
if not all(isinstance(key, str) for key in features):
raise TypeError(f"all features keys must be str, not {[type(key) for key in features]}")
if not all(isinstance(value, list) for value in features.values()):
raise TypeError(f"all features values must be list, not {[type(value) for value in features.values()]}")
if not all(all(isinstance(item, str) for item in value) for value in features.values()):
raise TypeError(f"all features values must be list of str, not {[type(item) for value in features.values() for item in value]}")
self._features = features
checked
property
¶
Attribute that defines if the requirement has been checked. It's value will start as False and will be set to True after the check method is called.
Returns:
-
bool–True if the requirement has been checked.
data
property
¶
Data required for the calculation.
Returns:
-
DataFrame–DataFrame with the required features for the desired period. The columns are a MultiIndex with object name as first level and feature name as second level.
features
property
¶
Features that are required for the calculation.
Returns:
-
dict[str, list[str]]–Features that are required for the calculation in the format {object_name: [feature_name, ...], ...}.
optional
property
¶
Attribute that defines if the requirement is optional.
If optional is True, the requirement is only validated to check if it could exist, not if it is actually present. This is useful for requirements that are not necessary for all calculations, but are useful for some of them.
Returns:
-
bool–True if the requirement is optional.
check()
¶
Method used to check if all required features are present in the database for each object.
This will raise an error if any of the required features are missing.
Returns:
-
bool–Returns True if all required features are present in the database for each object.
Source code in echo_energycalc/calculation_requirement_features.py
def check(self) -> bool:
"""
Method used to check if all required features are present in the database for each object.
This will raise an error if any of the required features are missing.
Returns
-------
bool
Returns True if all required features are present in the database for each object.
"""
if self.optional:
return True
# iterating each object and checking if all features are present
for object_name, features in self.features.items():
missing_features_baze = []
# faça uma condição para quando a features terminar com "_b#"
if any(feature.endswith("_b#") for feature in features):
# getting model name from object_name
model_name = self._baze.objects.instances.get(object_names=[object_name])
model_name = model_name[object_name]["attributes"]["domainName"]
# getting existing features for the object
existing_features = self._baze.points.definitions.get_ids(
object_models=[model_name],
)
# filtering features to get only the ones that end with "_b#"
features_baze = [feature for feature in features if feature.endswith("_b#")]
# adjusting feature name
features_baze = [feature.removesuffix("_b#") for feature in features_baze]
# filtering bazefield dictionary to get only the desired features ids
existing_features = {
key: existing_features[model_name][key] for key in features_baze if key in existing_features[model_name]
}
# checking if all required features are present
missing_features_baze = set(features_baze) - set(existing_features.keys())
# filtering features to get only the ones that do not end with "_b#"
non_baze_features = [feature for feature in features if not feature.endswith("_b#")]
# getting existing features
existing_features = self._perfdb.features.definitions.get_ids(
object_names=[object_name],
feature_names=non_baze_features,
)
if not existing_features:
raise ValueError(f"Could not find any of the following features for object {object_name}: {non_baze_features}")
obj_model = next(iter(existing_features.keys()))
# checking if all required features are present
missing_features_pg = set(non_baze_features) - set(existing_features[obj_model].keys())
missing_features = list(missing_features_baze) + list(missing_features_pg)
if len(missing_features) > 0:
raise ValueError(f"Features {missing_features} do not exist for object {object_name}")
self._checked = True
return True
get_data(period, reindex='infer', cached_data=None, round_timestamps=None, **kwargs)
¶
Method used to get the data for the required features from performance_db
This will only get the features that are not present in the cached_data DataFrame.
Parameters:
-
(period¶DateTimeRange) –Desired period for the features.
-
(reindex¶str, default:'infer') –Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.
-
(cached_data¶DataFrame | None, default:None) –Cached data that can be used to avoid querying performance_db.
This is a DataFrame in the same format as the one returned by the method "perfdb.features.values.series.get", or to be more clear, the columns are a MultiIndex with object name as first level and feature name as second level.
This is important when doing calculations that involve multiple features to avoid needing to save the intermediate results in performance_db and then query them again.
By default None.
-
(round_timestamps¶RoundTimeStampsDict | None, default:None) –Dictionary used to round timestamps with the following keys:
- freq: timedelta, the frequency to round the timestamps to.
- tolerance: timedelta, the tolerance to be used when rounding timestamps
If set to None, no rounding will be done. Only applicable if output_type is "DataFrame". By default None
Returns:
-
DataFrame–DataFrame with the required features for the desired period.
Source code in echo_energycalc/calculation_requirement_features.py
def get_data(
self,
period: DateTimeRange,
reindex: str | None = "infer",
cached_data: DataFrame | None = None,
round_timestamps: RoundTimeStampsDict | None = None,
**kwargs, # noqa: ARG002
) -> DataFrame:
"""
Method used to get the data for the required features from performance_db
This will only get the features that are not present in the cached_data DataFrame.
Parameters
----------
period : DateTimeRange
Desired period for the features.
reindex : str, optional
Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.
cached_data : DataFrame | None, optional
Cached data that can be used to avoid querying performance_db.
This is a DataFrame in the same format as the one returned by the method "perfdb.features.values.series.get", or to be more clear, the columns are a MultiIndex with object name as first level and feature name as second level.
This is important when doing calculations that involve multiple features to avoid needing to save the intermediate results in performance_db and then query them again.
By default None.
round_timestamps : RoundTimeStampsDict | None, optional
Dictionary used to round timestamps with the following keys:
- freq: timedelta, the frequency to round the timestamps to.
- tolerance: timedelta, the tolerance to be used when rounding timestamps
If set to None, no rounding will be done. Only applicable if output_type is "DataFrame".
By default None
Returns
-------
DataFrame
DataFrame with the required features for the desired period.
"""
# checking if necessary keyword arguments are present
if not isinstance(period, DateTimeRange):
raise TypeError(f"period must be a DateTimeRange, not {type(period)}")
if not isinstance(reindex, str | type(None)):
raise TypeError(f"reindex must be a str or None, not {type(reindex)}")
if not isinstance(cached_data, DataFrame | type(None)):
raise TypeError(f"cached_data must be a pandas DataFrame, not {type(cached_data)}")
# check if requirement has been checked
if not self._checked:
self.check()
try:
# copying required features to avoid changing the original dictionary
features = copy.deepcopy(self.features)
# if cached_data is not None check if it is a DataFrame in the correct format
if cached_data is not None and not cached_data.empty:
if not isinstance(cached_data, DataFrame):
raise TypeError(f"cached_data must be a pandas DataFrame, not {type(cached_data)}")
# checking if it is a DataFrame with a MultiIndex with object name as first level and feature name as second level
if not isinstance(cached_data.columns, MultiIndex):
raise TypeError(f"cached_data must have a MultiIndex as columns, not {type(cached_data.columns)}")
if cached_data.columns.names != ["object", "feature"]:
raise TypeError(
f"cached_data must have a MultiIndex with two levels ['object', 'feature'], not {cached_data.columns.names}",
)
# converting cached_data MultiIndex to a list of tuples
cached_data_features = list(cached_data.columns.to_flat_index())
# recreating features dictionary without the features that are already in cached_data
tmp_features = copy.deepcopy(features)
# creating a dict with the required features from cached_data
required_cached_data_features = {}
for req_obj, req_feat in tmp_features.items():
required_cached_data_features[req_obj] = []
# removing the features that are already in cached_data
for feat in req_feat:
# checking if cached data has the complete period for the desired feature
# this is done to make sure that if the requirement needs a period that is not in cached_data, it will be queried from the database
if (req_obj, feat) in cached_data_features and (
period.start >= cached_data.loc[:, (req_obj, feat)].dropna().index.min()
and period.end <= cached_data.loc[:, (req_obj, feat)].dropna().index.max()
):
features[req_obj].remove(feat)
required_cached_data_features[req_obj] += [feat]
# removing the object if it has no more required features needed for it
if len(features[req_obj]) == 0:
features.pop(req_obj)
if len(required_cached_data_features[req_obj]) == 0:
required_cached_data_features.pop(req_obj)
# getting required features
if len(features) > 0:
# separating features that end with "_b#" and those that do not
baze_features = {obj: [feat for feat in feats if feat.endswith("_b#")] for obj, feats in features.items()}
perfdb_features = {obj: [feat for feat in feats if not feat.endswith("_b#")] for obj, feats in features.items()}
# removing empty entries
baze_features = {obj: feats for obj, feats in baze_features.items() if feats}
perfdb_features = {obj: feats for obj, feats in perfdb_features.items() if feats}
# getting required features from baze
if baze_features:
# correct the feature names to remove the "_b#" suffix
baze_features = {obj: [feat.removesuffix("_b#") for feat in feats] for obj, feats in baze_features.items()}
baze_data = self._baze.points.values.series.get(
points=baze_features,
period=period,
reindex=reindex,
round_timestamps=round_timestamps,
)
# rename the columns to add the "_b#" suffix
baze_data.columns = MultiIndex.from_tuples([(col[0], col[1] + "_b#") for col in baze_data.columns])
baze_data.columns.names = ["object", "feature"]
else:
baze_data = DataFrame()
# getting required features from performance_db
if perfdb_features:
perfdb_data = self._perfdb.features.values.series.get(features=perfdb_features, period=period, reindex=reindex)
perfdb_data.columns.names = ["object", "feature"]
else:
perfdb_data = DataFrame()
# merging both DataFrames
self._data = concat([baze_data, perfdb_data], axis=1, join="outer")
else:
self._data = DataFrame()
self._data.columns = MultiIndex.from_product([[], []], names=["object", "feature"])
# adjusting index to have a [s] precision
self._data.index = self._data.index.astype("datetime64[s]")
if cached_data is not None and (not cached_data.empty and len(required_cached_data_features) > 0):
# filtering cached_data to only include the required features
# this should filter both first and second level of the MultiIndex columns of cached_data based on key and values of self._data
wanted_columns = MultiIndex.from_tuples(
[(key, val) for key, value in required_cached_data_features.items() for val in value],
)
# getting all columns that are in self._data and not in cached_data
intersection_cols = self._data.columns.intersection(cached_data.columns)
# merging to get all the wanted columns
wanted_columns = wanted_columns.union(intersection_cols)
# getting cached_data with only the wanted columns
required_cached_data = cached_data.loc[:, wanted_columns]
# adjusting the index of self._data to add elements that are in cached_data but not in self._data
# this is done because this way we make sure that all the timestamps are present in the final DataFrame
self._data = self._data.reindex(self._data.index.union(required_cached_data.index), axis=0)
# first merging the columns that are in both self._data and cached_data prioritizing cached data
# the update method is used because we want to overwrite the values of self._data with the values of cached_data if they are present
# we do this because we assume cached data is more recent (just calculated)
self._data.update(required_cached_data.loc[:, intersection_cols], overwrite=True)
# now adding new columns from cached_data
new_cols = required_cached_data.columns.difference(self._data.columns)
# merging required features with cached_data
self._data = concat([self._data, required_cached_data.loc[:, new_cols]], axis=1, join="outer")
# sorting the columns based on MultiIndex
self._data = self._data.reindex(sorted(self._data.columns), axis=1)
except Exception as e:
if self.optional:
self._data = DataFrame()
else:
raise e
return self.data