SPE Aggregation¶
Overview¶
The FeatureCalcSpeAggregation class is a subclass of FeatureCalculator that calculates the aggregated value of a feature for a specific SPE. This is useful when we want to get the values for a specific feature for all child objects of a type for a specific SPE and then aggregate them in a specific way to have a single value for the parent object.
Calculation Logic¶
The calculation logic is described in the constructor of the class, shown below in the Class Definition section.
Database Requirements¶
- Feature attribute
server_calc_typemust be set tospe_aggregation. -
Feature attribute
feature_options_jsonwith the following keys:aggregation: A string that defines the aggregation method to be used. The following methods are available: "avg", "sum", "max", "min", "median", "std", "var", "count" orfeature_eval_expression.child_object_type: A string that defines the type of the child object that we want to aggregate.child_feature: A string that defines the feature that we want to aggregate.
The following example is used for feature
lost_power_curtailment_park_turbine_sum:{ "aggregation": "sum", "child_feature": "lost_power_curtailment_park", "child_object_type": "wind_turbine" } -
In case the
aggregationkey is set tofeature_eval_expression, the following keys must be present:feature_eval_expression: A string that defines the Python code that calculates the feature value. This code will have access to a DataFrame with the wanted feature for all the child objects (df) and should create a new column in this DataFrame with the aggregated value, doing the necessary logic to aggregate the values. It is useful when the aggregation method is more complex than the ones available in theaggregationkey.
Class Definition¶
FeatureCalcSpeAggregation(object_name, feature)
¶
FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.
For this to work the desired feature must have the feature_options_json attribute in performance_db, with the following keys:
aggregation: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" orfeature_eval_expression.child_object_type: str, object type of the child objects.child_feature: str, name of the feature of child objects that will be aggregated.
If the aggregation method is feature_eval_expression, the feature_eval_expression attribute must be present in performance_db as well.
The method will look for all objects that have the spe_name column in the objects table equal to the wanted SPE object and the object_type column equal to the child_object_type attribute.
Please make sure that the child objects have the desired feature to be aggregated.
Parameters:
-
(object_name¶str) –Name of the object for which the feature is calculated. It must be an SPE.
-
(feature¶str) –Feature of the object that is calculated. It must exist in performance_db.
Source code in echo_energycalc/feature_calc_common.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.
For this to work the desired feature must have the `feature_options_json` attribute in performance_db, with the following keys:
- `aggregation`: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or `feature_eval_expression`.
- `child_object_type`: str, object type of the child objects.
- `child_feature`: str, name of the feature of child objects that will be aggregated.
If the aggregation method is `feature_eval_expression`, the `feature_eval_expression` attribute must be present in performance_db as well.
The method will look for all objects that have the `spe_name` column in the `objects` table equal to the wanted SPE object and the `object_type` column equal to the `child_object_type` attribute.
Please make sure that the child objects have the desired feature to be aggregated.
Parameters
----------
object_name : str
Name of the object for which the feature is calculated. It must be an SPE.
feature : str
Feature of the object that is calculated. It must exist in performance_db.
"""
# initialize parent class
super().__init__(object_name, feature)
# requirements for the feature calculator
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_eval_expression"], optional=True))
# getting all required attributes
self._get_required_data()
self._feature_attributes = self._get_requirement_data("RequiredFeatureAttributes")[self.feature]
# validating feature options
self._validate_feature_options()
# getting child objects
spe_objects = self._perfdb.objects.instances.get(
spe_names=[self.object],
object_types=[self._feature_attributes["feature_options_json"]["child_object_type"]],
)
spe_objects = list(spe_objects.keys())
# defining required features
self._add_requirement(
RequiredFeatures(
{child_object: [self._feature_attributes["feature_options_json"]["child_feature"]] for child_object in spe_objects},
),
)
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
Series | DataFrame | None:–Result of the calculation if the method "calculate" was called. None otherwise.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Method that will calculate the Child Aggregation feature.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(cached_data¶DataFrame | None, default:None) –DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None
-
(**kwargs¶dict, default:{}) –Additional arguments that will be passed to the "save" method.
Returns:
-
Series–Pandas Series with the calculated feature.
Source code in echo_energycalc/feature_calc_common.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: DataFrame | None = None,
**kwargs,
) -> Series:
"""
Method that will calculate the Child Aggregation feature.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
cached_data : DataFrame | None, optional
DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
By default None
**kwargs : dict, optional
Additional arguments that will be passed to the "save" method.
Returns
-------
Series
Pandas Series with the calculated feature.
"""
# getting required features
self._get_required_data(period=period, cached_data=cached_data)
# saving needed features into df as expressions assumes we have a DataFrame called "df" to do all the calculations
# in here we are dropping level 1 as all features are the same and we expect that the name of the columns are the name of the child objects
df = self._get_requirement_data("RequiredFeatures").droplevel(1, axis=1).copy()
# calculating
match self._feature_attributes["feature_options_json"]["aggregation"]:
case "avg":
df[self.feature] = df.mean(axis=1, skipna=True)
case "sum":
df[self.feature] = df.sum(axis=1, skipna=True)
case "count":
df[self.feature] = df.count(axis=1, skipna=True)
case "max":
df[self.feature] = df.max(axis=1, skipna=True)
case "min":
df[self.feature] = df.min(axis=1, skipna=True)
case "median":
df[self.feature] = df.median(axis=1, skipna=True)
case "std":
df[self.feature] = df.std(axis=1, skipna=True)
case "var":
df[self.feature] = df.var(axis=1, skipna=True)
case "feature_eval_expression":
eval_expression = None
if "feature_eval_expression" in self._feature_attributes:
eval_expression = self._feature_attributes["feature_eval_expression"]
if eval_expression is None:
raise ValueError(
"Feature option 'feature_eval_expression' is not present in feature attributes. This is needed when aggregation is 'feature_eval_expression'.",
)
# replacing object attributes in expression
eval_expression = replace_object_attributes_in_expression(eval_expression, self.object, self._perfdb)
# calculating by evaluating the expression
loc = {"df": df}
exec(eval_expression, globals(), loc) # pylint: disable=exec-used # noqa: S102
df = loc["df"]
# adding calculated feature to class result attribute
self._result = df[self.feature].copy()
# saving results
self.save(save_into=save_into, **kwargs)
return df[self.feature]
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
)
if save_into is None:
return
if isinstance(save_into, str):
if save_into not in ["performance_db", "all"]:
raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
upload_to_bazefield = save_into == "all"
elif save_into is None:
upload_to_bazefield = False
else:
raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")
# converting result series to DataFrame if needed
if isinstance(self.result, Series):
result_df = self.result.to_frame()
elif isinstance(self.result, DataFrame):
result_df = self.result.droplevel(0, axis=1)
else:
raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")
# adjusting DataFrame to be inserted in the database
# making the columns a Multindex with levels object_name and feature_name
result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])
self._perfdb.features.values.series.insert(
df=result_df,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)