SPE Aggregation¶
Overview¶
FeatureCalcSpeAggregation aggregates a feature from all child objects of a given SPE (Sub-Plant Equipment grouping) into a single value using a configurable aggregation method.
Example use case: Sum lost_power_curtailment_park from all wind turbines of an SPE to get lost_power_curtailment_park_turbine_sum at the SPE level.
How It Works¶
- Reads the
feature_options_jsonattribute of the target feature to determine: - Which child object type to aggregate (e.g.,
"wind_turbine") - Which feature to aggregate from those children (e.g.,
"lost_power_curtailment_park") - Which aggregation method to use (e.g.,
"sum") - Queries
performance_dbfor all objects of the specified type that belong to the SPE. - Fetches the child feature for the calculation period.
- Applies the aggregation row-wise (one output timestamp per input timestamp).
Database Requirements¶
- Feature attribute
server_calc_typemust be set tospe_aggregation. -
Feature attribute
feature_options_jsonwith the following keys:Key Type Required Description aggregationstring Yes One of: "avg","sum","max","min","median","std","var","count","feature_eval_expression"child_object_typestring Yes Object type of the children to aggregate (e.g., "wind_turbine")child_featurestring Yes Feature name to aggregate from child objects -
If
aggregationis"feature_eval_expression", the feature attributefeature_eval_expressionmust also be present. The expression receives a pandas DataFramedfwhose columns are the child object names (the child feature values are already selected, one column per child). The expression must create a columndf[feature_name]with the aggregated result.
Example feature_options_json¶
{
"aggregation": "sum",
"child_feature": "lost_power_curtailment_park",
"child_object_type": "wind_turbine"
}
Example with feature_eval_expression¶
{
"aggregation": "feature_eval_expression",
"child_feature": "active_power",
"child_object_type": "wind_turbine"
}
Expression (feature_eval_expression):
# Count of turbines producing above 50% of median
median_val = df.median(axis=1)
df["high_producers_count"] = (df.gt(median_val, axis=0)).sum(axis=1).astype(float)
Important Notes¶
- The SPE (
object_name) must have child objects of the specifiedchild_object_typeinperformance_db— if none are found, instantiation fails with aValueError. - All child objects must have the
child_featuredefined in their object model. - Null values in child features are skipped (NaN-aware aggregation via pandas
skipna=True).
Class Definition¶
FeatureCalcSpeAggregation(object_name, feature)
¶
FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.
For this to work the desired feature must have the feature_options_json attribute in performance_db, with the following keys:
aggregation: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" orfeature_eval_expression.child_object_type: str, object type of the child objects.child_feature: str, name of the feature of child objects that will be aggregated.
If the aggregation method is feature_eval_expression, the feature_eval_expression attribute must be present in performance_db as well.
The method will look for all objects that have the spe_name column in the objects table equal to the wanted SPE object and the object_type column equal to the child_object_type attribute.
Please make sure that the child objects have the desired feature to be aggregated.
Parameters:
-
(object_name¶str) –Name of the object for which the feature is calculated. It must be an SPE.
-
(feature¶str) –Feature of the object that is calculated. It must exist in performance_db.
Source code in echo_energycalc/feature_calc_spe_aggregation.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.
For this to work the desired feature must have the `feature_options_json` attribute in performance_db, with the following keys:
- `aggregation`: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or `feature_eval_expression`.
- `child_object_type`: str, object type of the child objects.
- `child_feature`: str, name of the feature of child objects that will be aggregated.
If the aggregation method is `feature_eval_expression`, the `feature_eval_expression` attribute must be present in performance_db as well.
The method will look for all objects that have the `spe_name` column in the `objects` table equal to the wanted SPE object and the `object_type` column equal to the `child_object_type` attribute.
Please make sure that the child objects have the desired feature to be aggregated.
Parameters
----------
object_name : str
Name of the object for which the feature is calculated. It must be an SPE.
feature : str
Feature of the object that is calculated. It must exist in performance_db.
"""
# initialize parent class
super().__init__(object_name, feature)
# requirements for the feature calculator
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_eval_expression"], optional=True))
# getting all required attributes
self._fetch_requirements()
self._feature_attributes = self._requirement_data("RequiredFeatureAttributes")[self.feature]
# validating feature options
self._validate_feature_options()
# getting child objects
spe_objects = self._perfdb.objects.instances.get(
spe_names=[self.object],
object_types=[self._feature_attributes["feature_options_json"]["child_object_type"]],
)
spe_objects = list(spe_objects.keys())
# defining required features
self._add_requirement(
RequiredFeatures(
{child_object: [self._feature_attributes["feature_options_json"]["child_feature"]] for child_object in spe_objects},
),
)
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
DataFrame | None–Polars DataFrame with a
"timestamp"column and one or more feature value columns. None untilcalculateis called.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Run the calculation for the given period and optionally save the result.
Calls :meth:_compute to get the result, stores it in :attr:result,
then calls :meth:save. Subclasses should implement :meth:_compute instead
of overriding this method.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –"all": save in performance_db and bazefield."performance_db": save only in performance_db.None: do not save.
By default None.
-
(cached_data¶DataFrame | None, default:None) –Polars DataFrame with features already fetched/calculated. Passed to
_computeto enable chained calculations without re-querying performance_db. By default None. -
–**kwargs¶Forwarded to :meth:
save.
Returns:
-
DataFrame–Polars DataFrame with a
"timestamp"column and one or more feature value columns.
Source code in echo_energycalc/feature_calc_core.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: pl.DataFrame | None = None,
**kwargs,
) -> pl.DataFrame:
"""
Run the calculation for the given period and optionally save the result.
Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
of overriding this method.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
- ``"all"``: save in performance_db and bazefield.
- ``"performance_db"``: save only in performance_db.
- ``None``: do not save.
By default None.
cached_data : pl.DataFrame | None, optional
Polars DataFrame with features already fetched/calculated. Passed to
``_compute`` to enable chained calculations without re-querying
performance_db. By default None.
**kwargs
Forwarded to :meth:`save`.
Returns
-------
pl.DataFrame
Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
"""
result = self._compute(period, cached_data=cached_data)
self._result = result
self.save(save_into=save_into, **kwargs)
return result
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Please call 'calculate' before calling 'save'.",
)
if save_into is None:
return
upload_to_bazefield = save_into == "all"
if not isinstance(self.result, pl.DataFrame):
raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
if "timestamp" not in self.result.columns:
raise ValueError("result DataFrame must contain a 'timestamp' column.")
# rename feature columns to "object@feature" format expected by perfdb polars insert
feat_cols = [c for c in self.result.columns if c != "timestamp"]
result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})
self._perfdb.features.values.series.insert(
df=result_pl,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)