Skip to content

SPE Aggregation

Overview

FeatureCalcSpeAggregation aggregates a feature from all child objects of a given SPE (Sub-Plant Equipment grouping) into a single value using a configurable aggregation method.

Example use case: Sum lost_power_curtailment_park from all wind turbines of an SPE to get lost_power_curtailment_park_turbine_sum at the SPE level.


How It Works

  1. Reads the feature_options_json attribute of the target feature to determine:
  2. Which child object type to aggregate (e.g., "wind_turbine")
  3. Which feature to aggregate from those children (e.g., "lost_power_curtailment_park")
  4. Which aggregation method to use (e.g., "sum")
  5. Queries performance_db for all objects of the specified type that belong to the SPE.
  6. Fetches the child feature for the calculation period.
  7. Applies the aggregation row-wise (one output timestamp per input timestamp).

Database Requirements

  • Feature attribute server_calc_type must be set to spe_aggregation.
  • Feature attribute feature_options_json with the following keys:

    Key Type Required Description
    aggregation string Yes One of: "avg", "sum", "max", "min", "median", "std", "var", "count", "feature_eval_expression"
    child_object_type string Yes Object type of the children to aggregate (e.g., "wind_turbine")
    child_feature string Yes Feature name to aggregate from child objects
  • If aggregation is "feature_eval_expression", the feature attribute feature_eval_expression must also be present. The expression receives a pandas DataFrame df whose columns are the child object names (the child feature values are already selected, one column per child). The expression must create a column df[feature_name] with the aggregated result.

Example feature_options_json

JSON
{
    "aggregation": "sum",
    "child_feature": "lost_power_curtailment_park",
    "child_object_type": "wind_turbine"
}

Example with feature_eval_expression

JSON
{
    "aggregation": "feature_eval_expression",
    "child_feature": "active_power",
    "child_object_type": "wind_turbine"
}

Expression (feature_eval_expression):

Python
# Count of turbines producing above 50% of median
median_val = df.median(axis=1)
df["high_producers_count"] = (df.gt(median_val, axis=0)).sum(axis=1).astype(float)

Important Notes

  • The SPE (object_name) must have child objects of the specified child_object_type in performance_db — if none are found, instantiation fails with a ValueError.
  • All child objects must have the child_feature defined in their object model.
  • Null values in child features are skipped (NaN-aware aggregation via pandas skipna=True).

Class Definition

FeatureCalcSpeAggregation(object_name, feature)

FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.

For this to work the desired feature must have the feature_options_json attribute in performance_db, with the following keys:

  • aggregation: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or feature_eval_expression.
  • child_object_type: str, object type of the child objects.
  • child_feature: str, name of the feature of child objects that will be aggregated.

If the aggregation method is feature_eval_expression, the feature_eval_expression attribute must be present in performance_db as well.

The method will look for all objects that have the spe_name column in the objects table equal to the wanted SPE object and the object_type column equal to the child_object_type attribute.

Please make sure that the child objects have the desired feature to be aggregated.

Parameters:

  • object_name

    (str) –

    Name of the object for which the feature is calculated. It must be an SPE.

  • feature

    (str) –

    Feature of the object that is calculated. It must exist in performance_db.

Source code in echo_energycalc/feature_calc_spe_aggregation.py
Python
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.

    For this to work the desired feature must have the `feature_options_json` attribute in performance_db, with the following keys:

    - `aggregation`: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or `feature_eval_expression`.
    - `child_object_type`: str, object type of the child objects.
    - `child_feature`: str, name of the feature of child objects that will be aggregated.

    If the aggregation method is `feature_eval_expression`, the `feature_eval_expression` attribute must be present in performance_db as well.

    The method will look for all objects that have the `spe_name` column in the `objects` table equal to the wanted SPE object and the `object_type` column equal to the `child_object_type` attribute.

    Please make sure that the child objects have the desired feature to be aggregated.

    Parameters
    ----------
    object_name : str
        Name of the object for which the feature is calculated. It must be an SPE.
    feature : str
        Feature of the object that is calculated. It must exist in performance_db.
    """
    # initialize parent class
    super().__init__(object_name, feature)

    # requirements for the feature calculator
    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_eval_expression"], optional=True))

    # getting all required attributes
    self._fetch_requirements()

    self._feature_attributes = self._requirement_data("RequiredFeatureAttributes")[self.feature]
    # validating feature options
    self._validate_feature_options()

    # getting child objects
    spe_objects = self._perfdb.objects.instances.get(
        spe_names=[self.object],
        object_types=[self._feature_attributes["feature_options_json"]["child_object_type"]],
    )
    spe_objects = list(spe_objects.keys())

    # defining required features
    self._add_requirement(
        RequiredFeatures(
            {child_object: [self._feature_attributes["feature_options_json"]["child_feature"]] for child_object in spe_objects},
        ),
    )

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • DataFrame | None

    Polars DataFrame with a "timestamp" column and one or more feature value columns. None until calculate is called.

calculate(period, save_into=None, cached_data=None, **kwargs)

Run the calculation for the given period and optionally save the result.

Calls :meth:_compute to get the result, stores it in :attr:result, then calls :meth:save. Subclasses should implement :meth:_compute instead of overriding this method.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –
    • "all": save in performance_db and bazefield.
    • "performance_db": save only in performance_db.
    • None: do not save.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    Polars DataFrame with features already fetched/calculated. Passed to _compute to enable chained calculations without re-querying performance_db. By default None.

  • **kwargs

    Forwarded to :meth:save.

Returns:

  • DataFrame

    Polars DataFrame with a "timestamp" column and one or more feature value columns.

Source code in echo_energycalc/feature_calc_core.py
Python
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: pl.DataFrame | None = None,
    **kwargs,
) -> pl.DataFrame:
    """
    Run the calculation for the given period and optionally save the result.

    Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
    then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
    of overriding this method.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        - ``"all"``: save in performance_db and bazefield.
        - ``"performance_db"``: save only in performance_db.
        - ``None``: do not save.

        By default None.
    cached_data : pl.DataFrame | None, optional
        Polars DataFrame with features already fetched/calculated. Passed to
        ``_compute`` to enable chained calculations without re-querying
        performance_db. By default None.
    **kwargs
        Forwarded to :meth:`save`.

    Returns
    -------
    pl.DataFrame
        Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
    """
    result = self._compute(period, cached_data=cached_data)
    self._result = result
    self.save(save_into=save_into, **kwargs)
    return result

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
Python
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Please call 'calculate' before calling 'save'.",
        )

    if save_into is None:
        return

    upload_to_bazefield = save_into == "all"

    if not isinstance(self.result, pl.DataFrame):
        raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
    if "timestamp" not in self.result.columns:
        raise ValueError("result DataFrame must contain a 'timestamp' column.")

    # rename feature columns to "object@feature" format expected by perfdb polars insert
    feat_cols = [c for c in self.result.columns if c != "timestamp"]
    result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})

    self._perfdb.features.values.series.insert(
        df=result_pl,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )