Object Aggregation¶
Overview¶
The FeatureCalcObjectAggregation class is a subclass of FeatureCalculator that calculates the aggregated value of a feature for a specific list of objects. This is useful when we want to get the values for a specific feature for a list of objects and then aggregate them in a specific way to have a single value for a parent object.
Calculation Logic¶
The calculation logic is described in the constructor of the class, shown below in the Class Definition section.
Database Requirements¶
- Feature attribute
server_calc_typemust be set toobject_aggregation. -
Feature attribute
feature_options_jsonwith the keys as described in the Class Definition section.As an example, the following JSON is used for feature
active_power_reference_echothat calculates the reference active power for a ONS site, converting the values to 30 minute averages in MW. The source feature isactive_power_referencefrom the objects defined in theons_spesattribute.{ "resample": {"rule": "30min", "label": "left", "closed": "right", "method": "mean"}, "aggregation": "sum", "final_scale": 0.001, "attr_to_scale": "nominal_power", "feature_to_aggregate": "active_power_reference", "attr_with_objs_to_aggregate": "ons_spes" }
Class Definition¶
FeatureCalcObjectAggregation(object_name, feature)
¶
Class that is used to aggregate features from any type of object and scale it to an attribute if needed.
In summary, it will do the following:
- Get the features from the objects that will be aggregated.
- Resample the features if necessary.
- Shift the features if necessary.
- Calculate the desired aggregation method.
- Scale the calculated feature if necessary.
- Save the calculated feature.
For this to work the desired feature must have attribute server_calc_type defined as object_aggregation and the feature_options_json attribute with the following options:
-
aggregation: strAggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".
-
attr_with_objs_to_aggregate: strName of attribute for the current object that contains a list of objects that will be aggregated.
-
feature_to_aggregate: strName of the feature that will be aggregated. This feature must exist in all the objects specified in
attr_with_objs_to_aggregate. -
attr_to_scale: str, optionalAttribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in
attr_with_objs_to_aggregate.As an example, this attribute could be the nominal power, number of turbines, etc. If we consider
nominal_power, it could be useful if the sum ofnominal_powerof the objects to aggregate is not equal to thenominal_powerof the current object and we want to scale the calculated feature to thenominal_powerof the current object.In summary, the calculation performed will be:
- Calculate the feature to aggregate for all objects specified in
attr_with_objs_to_aggregateas it would normally be calculated. - Sum the value of the scaling attribute for all objects specified in
attr_with_objs_to_aggregate. - Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in
attr_with_objs_to_aggregate. - Multiply the calculated feature by the scaling factor.
If this attribute is not present, the calculated feature will not be scaled.
- Calculate the feature to aggregate for all objects specified in
-
final_scale: float, optionalFinal scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.
-
resample: dict[str, str], optionalA dict in the format {"rule":
, "method": , "closed": , "label": } that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated. For more details on the resample method, please check the pandas documentation
If this option is not present, the features will not be resampled.
-
shift: dict[str, str | int], optionalA dict in the format {"freq":
, "amount": } that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.
Parameters:
-
(object_name¶str) –Name of the object that for which the feature will be calculated.
-
(feature¶str) –Name of the feature that will be calculated
Source code in echo_energycalc/feature_calc_common.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
Class that is used to aggregate features from any type of object and scale it to an attribute if needed.
In summary, it will do the following:
1. Get the features from the objects that will be aggregated.
2. Resample the features if necessary.
3. Shift the features if necessary.
4. Calculate the desired aggregation method.
5. Scale the calculated feature if necessary.
6. Save the calculated feature.
For this to work the desired feature must have attribute `server_calc_type` defined as `object_aggregation` and the `feature_options_json` attribute with the following options:
- `aggregation`: str
Aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".
- `attr_with_objs_to_aggregate`: str
Name of attribute for the current object that contains a list of objects that will be aggregated.
- `feature_to_aggregate`: str
Name of the feature that will be aggregated. This feature must exist in all the objects specified in `attr_with_objs_to_aggregate`.
- `attr_to_scale`: str, optional
Attribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in `attr_with_objs_to_aggregate`.
As an example, this attribute could be the nominal power, number of turbines, etc. If we consider `nominal_power`, it could be useful if the sum of `nominal_power` of the objects to aggregate is not equal to the `nominal_power` of the current object and we want to scale the calculated feature to the `nominal_power` of the current object.
In summary, the calculation performed will be:
- Calculate the feature to aggregate for all objects specified in `attr_with_objs_to_aggregate` as it would normally be calculated.
- Sum the value of the scaling attribute for all objects specified in `attr_with_objs_to_aggregate`.
- Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in `attr_with_objs_to_aggregate`.
- Multiply the calculated feature by the scaling factor.
If this attribute is not present, the calculated feature will not be scaled.
- `final_scale`: float, optional
Final scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.
- `resample`: dict[str, str], optional
A dict in the format {"rule": <rule>, "method": <method>, "closed": <right or left>, "label": <right or left>} that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated.
For more details on the resample method, please check the [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.resample.html)
If this option is not present, the features will not be resampled.
- `shift`: dict[str, str | int], optional
A dict in the format {"freq": <freq as string>, "amount": <amount as int>} that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.
Parameters
----------
object_name : str
Name of the object that for which the feature will be calculated.
feature : str
Name of the feature that will be calculated
"""
super().__init__(object_name, feature)
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._get_required_data()
self._feature_attributes = self._get_requirement_data("RequiredFeatureAttributes")[self.feature]
self.aggregation: str = self._feature_attributes["feature_options_json"]["aggregation"]
self.attr_with_objs_to_aggregate: str = self._feature_attributes["feature_options_json"]["attr_with_objs_to_aggregate"]
self.feature_to_aggregate: str = self._feature_attributes["feature_options_json"]["feature_to_aggregate"]
self.attr_to_scale: str | None = self._feature_attributes["feature_options_json"].get("attr_to_scale", None)
self.final_scale: float | None = self._feature_attributes["feature_options_json"].get("final_scale", None)
self.resample: dict[str, str] | None = self._feature_attributes["feature_options_json"].get("resample", None)
self.shift: dict[str, str | int] | None = self._feature_attributes["feature_options_json"].get("shift", None)
# getting all the attributes that will be needed from this object
this_obj_attrs = [self.attr_with_objs_to_aggregate]
if self.attr_to_scale:
this_obj_attrs.append(self.attr_to_scale)
self._add_requirement(RequiredObjectAttributes({self.object: this_obj_attrs}))
self._get_required_data()
# getting all the objects that will be aggregated
self.objects_to_aggregate = self._get_requirement_data("RequiredObjectAttributes")[self.object][self.attr_with_objs_to_aggregate]
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
Series | DataFrame | None:–Result of the calculation if the method "calculate" was called. None otherwise.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Method that will calculate the feature
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(cached_data¶DataFrame | None, default:None) –DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None
-
(**kwargs¶dict, default:{}) –Additional arguments that will be passed to the "save" method.
Source code in echo_energycalc/feature_calc_common.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: DataFrame | None = None, # noqa: ARG002
**kwargs,
) -> None:
"""
Method that will calculate the feature
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
cached_data : DataFrame | None, optional
DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
By default None
**kwargs : dict, optional
Additional arguments that will be passed to the "save" method.
"""
# defining the period that will be used to get the features from the objects to aggregate
this_period = period.copy()
# adjusting for resample
if self.resample:
timedelta_rule = to_offset(self.resample["rule"])
this_period.start = Timestamp(this_period.start - timedelta_rule).floor(timedelta_rule).to_pydatetime()
this_period.end = Timestamp(this_period.end + timedelta_rule).ceil(timedelta_rule).to_pydatetime()
# adjusting for shift
if self.shift:
shift_rule = to_offset(self.shift["freq"])
this_period.start = (this_period.start - shift_rule * self.shift["amount"]).to_pydatetime()
this_period.end = (this_period.end + shift_rule * self.shift["amount"]).to_pydatetime()
# fetching the last features and attributes required
for required_object in self.objects_to_aggregate:
try:
self._add_requirement(
RequiredFeatures(
{required_object: [self.feature_to_aggregate]},
),
)
self._get_required_data(this_period)
except Exception as e:
raise ValueError(
f"Could not get feature {self.feature_to_aggregate} from {required_object} while calculating {self.feature} feature from the object {self.object}.",
) from e
if self.attr_to_scale:
try:
self._add_requirement(
RequiredObjectAttributes(
{required_object: [self.attr_to_scale]},
),
)
self._get_required_data()
except Exception as ex:
raise ValueError(
f"Could not get attribute {self.attr_to_scale} from {required_object} while calculating {self.feature} feature from the object {self.object}.",
) from ex
df = self._get_requirement_data("RequiredFeatures").droplevel(1, axis=1).copy()
# resampling
if self.resample:
try:
df = df.resample(
self.resample["rule"],
closed=self.resample["closed"],
label=self.resample["label"],
origin="start_day",
).agg(self.resample["method"])
except Exception as e:
raise ValueError(
f"Could not resample while calculating {self.feature} feature from the object {self.object}. Resample arguments were {self.resample}",
) from e
# shifting
if self.shift:
try:
df = df.shift(self.shift["amount"], freq=self.shift["freq"])
except Exception as e:
raise ValueError(
f"Could not shift while calculating {self.feature} feature from the object {self.object}. Shift arguments were {self.shift}",
) from e
# calculating aggregation
match self.aggregation:
case "avg":
df[self.feature] = df.mean(axis=1, skipna=True)
case "sum":
df[self.feature] = df.sum(axis=1, skipna=True)
case "count":
df[self.feature] = df.count(axis=1, skipna=True)
case "max":
df[self.feature] = df.max(axis=1, skipna=True)
case "min":
df[self.feature] = df.min(axis=1, skipna=True)
case "median":
df[self.feature] = df.median(axis=1, skipna=True)
case "std":
df[self.feature] = df.std(axis=1, skipna=True)
case "var":
df[self.feature] = df.var(axis=1, skipna=True)
case "feature_eval_expression":
eval_expression = None
if "feature_eval_expression" in self._feature_attributes:
eval_expression = self._feature_attributes["feature_eval_expression"]
if eval_expression is None:
raise ValueError(
"Feature option 'feature_eval_expression' is not present in feature attributes. This is needed when aggregation is 'feature_eval_expression'.",
)
# replacing object attributes in expression
eval_expression = replace_object_attributes_in_expression(eval_expression, self.object, self._perfdb)
# calculating by evaluating the expression
loc = {"df": df}
exec(eval_expression, globals(), loc) # pylint: disable=exec-used # noqa: S102
df = loc["df"]
# scaling the features if attr_to_scale exists
if self.attr_to_scale:
this_obj_attr_to_scale: float = self._get_requirement_data("RequiredObjectAttributes")[self.object][self.attr_to_scale]
other_objs_attr: float = sum(
self._get_requirement_data("RequiredObjectAttributes")[x][self.attr_to_scale]
for x in self.objects_to_aggregate
if x in df.columns.to_list()
)
log_str = f"Scaling the results of the aggregation of the feature {self.feature} of {self.object} "
log_str += f"based on {self.attr_to_scale} of "
log_str += f"{[x for x in self.objects_to_aggregate if x in df.columns.to_list()]}. "
log_str += f"Base: {other_objs_attr}, Target: {this_obj_attr_to_scale}"
logger.debug(log_str)
df[self.feature] = this_obj_attr_to_scale * (df[self.feature] / other_objs_attr)
# removing period outside of the desired period
df = df.loc[(df.index >= period.start) & (df.index <= period.end)]
# final scaling
if self.final_scale:
df[self.feature] = self.final_scale * df[self.feature]
# adding calculated feature to class result attribute
self._result = df[self.feature].copy()
# saving results
self.save(save_into=save_into, **kwargs)
return df[self.feature]
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
)
if save_into is None:
return
if isinstance(save_into, str):
if save_into not in ["performance_db", "all"]:
raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
upload_to_bazefield = save_into == "all"
elif save_into is None:
upload_to_bazefield = False
else:
raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")
# converting result series to DataFrame if needed
if isinstance(self.result, Series):
result_df = self.result.to_frame()
elif isinstance(self.result, DataFrame):
result_df = self.result.droplevel(0, axis=1)
else:
raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")
# adjusting DataFrame to be inserted in the database
# making the columns a Multindex with levels object_name and feature_name
result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])
self._perfdb.features.values.series.insert(
df=result_df,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)