Object Aggregation¶
Overview¶
FeatureCalcObjectAggregation is a general-purpose aggregator that collects a feature from any list of objects (specified via an object attribute), applies an aggregation method, and optionally rescales the result based on a ratio of attribute values.
Example use case: Sum active_power_reference from a list of SPEs (stored in the ons_spes attribute) and resample from 10-minute to 30-minute, converting kW to MW with a final_scale of 0.001.
How It Works¶
- Reads
feature_options_jsonandattr_with_objs_to_aggregatefrom the feature/object attributes to get the list of objects to aggregate. - Optionally adjusts the period to account for
resampleandshiftsettings. - Fetches the
feature_to_aggregatefrom each object in the list. - Optionally resamples the data.
- Optionally shifts the data.
- Applies the aggregation row-wise.
- Optionally scales the result using the ratio of
attr_to_scalevalues. - Optionally applies a
final_scalemultiplier.
Database Requirements¶
- Feature attribute
server_calc_typemust be set toobject_aggregation. -
Feature attribute
feature_options_jsonwith the following keys:Key Type Required Description aggregationstring Yes One of: "avg","sum","max","min","median","std","var","count","feature_eval_expression"attr_with_objs_to_aggregatestring Yes Name of an object attribute on the current object that holds the list of objects to aggregate feature_to_aggregatestring Yes The feature to aggregate from each listed object attr_to_scalestring No Attribute for proportional scaling (see below) final_scalenumber No Constant multiplier applied to the final result resampleobject No Resample config (see below) shiftobject No Shift config (see below)
resample sub-object¶
Resamples each object's feature before aggregation using pandas .resample():
{
"rule": "30min",
"method": "mean",
"closed": "right",
"label": "left"
}
rule: pandas offset string (e.g.,"30min","1h","1D")method: pandas aggregation method string ("mean","sum","max", etc.)closed: which side of the interval is closed ("right"or"left")label: which side label to use ("right"or"left")
shift sub-object¶
Shifts timestamps before aggregation using pandas .shift():
{
"freq": "10min",
"amount": 1
}
Useful when the source data has a different timestamp alignment than the target feature.
Attribute scaling (attr_to_scale)¶
When attr_to_scale is set, the result is scaled by the ratio:
result_scaled = result × (this_object[attr_to_scale] / sum(source_objects[attr_to_scale]))
Example: If 3 turbines have nominal_power of 2000 kW each (total 6000 kW) and the current SPE has nominal_power of 4000 kW, the scale factor is 4000 / 6000 = 0.667. This is useful when the source objects do not collectively represent 100% of the target object's capacity.
Example feature_options_json¶
Calculates active_power_reference_echo for an ONS site: sums active_power_reference from all SPEs listed in ons_spes, resamples from 10 min to 30 min (mean), and converts kW → MW:
{
"resample": {"rule": "30min", "label": "left", "closed": "right", "method": "mean"},
"aggregation": "sum",
"final_scale": 0.001,
"attr_to_scale": "nominal_power",
"feature_to_aggregate": "active_power_reference",
"attr_with_objs_to_aggregate": "ons_spes"
}
Class Definition¶
FeatureCalcObjectAggregation(object_name, feature)
¶
Class that is used to aggregate features from any type of object and scale it to an attribute if needed.
In summary, it will do the following:
- Get the features from the objects that will be aggregated.
- Resample the features if necessary.
- Shift the features if necessary.
- Calculate the desired aggregation method.
- Scale the calculated feature if necessary.
- Save the calculated feature.
For this to work the desired feature must have attribute server_calc_type defined as object_aggregation and the feature_options_json attribute with the following options:
-
aggregation: strAggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".
-
attr_with_objs_to_aggregate: strName of attribute for the current object that contains a list of objects that will be aggregated.
-
feature_to_aggregate: strName of the feature that will be aggregated. This feature must exist in all the objects specified in
attr_with_objs_to_aggregate. -
attr_to_scale: str, optionalAttribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in
attr_with_objs_to_aggregate.As an example, this attribute could be the nominal power, number of turbines, etc. If we consider
nominal_power, it could be useful if the sum ofnominal_powerof the objects to aggregate is not equal to thenominal_powerof the current object and we want to scale the calculated feature to thenominal_powerof the current object.In summary, the calculation performed will be:
- Calculate the feature to aggregate for all objects specified in
attr_with_objs_to_aggregateas it would normally be calculated. - Sum the value of the scaling attribute for all objects specified in
attr_with_objs_to_aggregate. - Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in
attr_with_objs_to_aggregate. - Multiply the calculated feature by the scaling factor.
If this attribute is not present, the calculated feature will not be scaled.
- Calculate the feature to aggregate for all objects specified in
-
final_scale: float, optionalFinal scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.
-
resample: dict[str, str], optionalA dict in the format {"rule":
, "method": , "closed": , "label": } that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated. For more details on the resample method, please check the pandas documentation
If this option is not present, the features will not be resampled.
-
shift: dict[str, str | int], optionalA dict in the format {"freq":
, "amount": } that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.
Parameters:
-
(object_name¶str) –Name of the object that for which the feature will be calculated.
-
(feature¶str) –Name of the feature that will be calculated
Source code in echo_energycalc/feature_calc_object_aggregation.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
Class that is used to aggregate features from any type of object and scale it to an attribute if needed.
In summary, it will do the following:
1. Get the features from the objects that will be aggregated.
2. Resample the features if necessary.
3. Shift the features if necessary.
4. Calculate the desired aggregation method.
5. Scale the calculated feature if necessary.
6. Save the calculated feature.
For this to work the desired feature must have attribute `server_calc_type` defined as `object_aggregation` and the `feature_options_json` attribute with the following options:
- `aggregation`: str
Aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".
- `attr_with_objs_to_aggregate`: str
Name of attribute for the current object that contains a list of objects that will be aggregated.
- `feature_to_aggregate`: str
Name of the feature that will be aggregated. This feature must exist in all the objects specified in `attr_with_objs_to_aggregate`.
- `attr_to_scale`: str, optional
Attribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in `attr_with_objs_to_aggregate`.
As an example, this attribute could be the nominal power, number of turbines, etc. If we consider `nominal_power`, it could be useful if the sum of `nominal_power` of the objects to aggregate is not equal to the `nominal_power` of the current object and we want to scale the calculated feature to the `nominal_power` of the current object.
In summary, the calculation performed will be:
- Calculate the feature to aggregate for all objects specified in `attr_with_objs_to_aggregate` as it would normally be calculated.
- Sum the value of the scaling attribute for all objects specified in `attr_with_objs_to_aggregate`.
- Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in `attr_with_objs_to_aggregate`.
- Multiply the calculated feature by the scaling factor.
If this attribute is not present, the calculated feature will not be scaled.
- `final_scale`: float, optional
Final scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.
- `resample`: dict[str, str], optional
A dict in the format {"rule": <rule>, "method": <method>, "closed": <right or left>, "label": <right or left>} that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated.
For more details on the resample method, please check the [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.resample.html)
If this option is not present, the features will not be resampled.
- `shift`: dict[str, str | int], optional
A dict in the format {"freq": <freq as string>, "amount": <amount as int>} that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.
Parameters
----------
object_name : str
Name of the object that for which the feature will be calculated.
feature : str
Name of the feature that will be calculated
"""
super().__init__(object_name, feature)
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._fetch_requirements()
self._feature_attributes = self._requirement_data("RequiredFeatureAttributes")[self.feature]
self.aggregation: str = self._feature_attributes["feature_options_json"]["aggregation"]
self.attr_with_objs_to_aggregate: str = self._feature_attributes["feature_options_json"]["attr_with_objs_to_aggregate"]
self.feature_to_aggregate: str = self._feature_attributes["feature_options_json"]["feature_to_aggregate"]
self.attr_to_scale: str | None = self._feature_attributes["feature_options_json"].get("attr_to_scale", None)
self.final_scale: float | None = self._feature_attributes["feature_options_json"].get("final_scale", None)
self.resample: dict[str, str] | None = self._feature_attributes["feature_options_json"].get("resample", None)
self.shift: dict[str, str | int] | None = self._feature_attributes["feature_options_json"].get("shift", None)
# getting all the attributes that will be needed from this object
this_obj_attrs = [self.attr_with_objs_to_aggregate]
if self.attr_to_scale:
this_obj_attrs.append(self.attr_to_scale)
self._add_requirement(RequiredObjectAttributes({self.object: this_obj_attrs}))
self._fetch_requirements()
# getting all the objects that will be aggregated
self.objects_to_aggregate = self._requirement_data("RequiredObjectAttributes")[self.object][self.attr_with_objs_to_aggregate]
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
DataFrame | None–Polars DataFrame with a
"timestamp"column and one or more feature value columns. None untilcalculateis called.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Run the calculation for the given period and optionally save the result.
Calls :meth:_compute to get the result, stores it in :attr:result,
then calls :meth:save. Subclasses should implement :meth:_compute instead
of overriding this method.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –"all": save in performance_db and bazefield."performance_db": save only in performance_db.None: do not save.
By default None.
-
(cached_data¶DataFrame | None, default:None) –Polars DataFrame with features already fetched/calculated. Passed to
_computeto enable chained calculations without re-querying performance_db. By default None. -
–**kwargs¶Forwarded to :meth:
save.
Returns:
-
DataFrame–Polars DataFrame with a
"timestamp"column and one or more feature value columns.
Source code in echo_energycalc/feature_calc_core.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: pl.DataFrame | None = None,
**kwargs,
) -> pl.DataFrame:
"""
Run the calculation for the given period and optionally save the result.
Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
of overriding this method.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
- ``"all"``: save in performance_db and bazefield.
- ``"performance_db"``: save only in performance_db.
- ``None``: do not save.
By default None.
cached_data : pl.DataFrame | None, optional
Polars DataFrame with features already fetched/calculated. Passed to
``_compute`` to enable chained calculations without re-querying
performance_db. By default None.
**kwargs
Forwarded to :meth:`save`.
Returns
-------
pl.DataFrame
Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
"""
result = self._compute(period, cached_data=cached_data)
self._result = result
self.save(save_into=save_into, **kwargs)
return result
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Please call 'calculate' before calling 'save'.",
)
if save_into is None:
return
upload_to_bazefield = save_into == "all"
if not isinstance(self.result, pl.DataFrame):
raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
if "timestamp" not in self.result.columns:
raise ValueError("result DataFrame must contain a 'timestamp' column.")
# rename feature columns to "object@feature" format expected by perfdb polars insert
feat_cols = [c for c in self.result.columns if c != "timestamp"]
result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})
self._perfdb.features.values.series.insert(
df=result_pl,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)