Skip to content

Object Aggregation

Overview

FeatureCalcObjectAggregation is a general-purpose aggregator that collects a feature from any list of objects (specified via an object attribute), applies an aggregation method, and optionally rescales the result based on a ratio of attribute values.

Example use case: Sum active_power_reference from a list of SPEs (stored in the ons_spes attribute) and resample from 10-minute to 30-minute, converting kW to MW with a final_scale of 0.001.


How It Works

  1. Reads feature_options_json and attr_with_objs_to_aggregate from the feature/object attributes to get the list of objects to aggregate.
  2. Optionally adjusts the period to account for resample and shift settings.
  3. Fetches the feature_to_aggregate from each object in the list.
  4. Optionally resamples the data.
  5. Optionally shifts the data.
  6. Applies the aggregation row-wise.
  7. Optionally scales the result using the ratio of attr_to_scale values.
  8. Optionally applies a final_scale multiplier.

Database Requirements

  • Feature attribute server_calc_type must be set to object_aggregation.
  • Feature attribute feature_options_json with the following keys:

    Key Type Required Description
    aggregation string Yes One of: "avg", "sum", "max", "min", "median", "std", "var", "count", "feature_eval_expression"
    attr_with_objs_to_aggregate string Yes Name of an object attribute on the current object that holds the list of objects to aggregate
    feature_to_aggregate string Yes The feature to aggregate from each listed object
    attr_to_scale string No Attribute for proportional scaling (see below)
    final_scale number No Constant multiplier applied to the final result
    resample object No Resample config (see below)
    shift object No Shift config (see below)

resample sub-object

Resamples each object's feature before aggregation using pandas .resample():

JSON
{
    "rule": "30min",
    "method": "mean",
    "closed": "right",
    "label": "left"
}
  • rule: pandas offset string (e.g., "30min", "1h", "1D")
  • method: pandas aggregation method string ("mean", "sum", "max", etc.)
  • closed: which side of the interval is closed ("right" or "left")
  • label: which side label to use ("right" or "left")

shift sub-object

Shifts timestamps before aggregation using pandas .shift():

JSON
{
    "freq": "10min",
    "amount": 1
}

Useful when the source data has a different timestamp alignment than the target feature.

Attribute scaling (attr_to_scale)

When attr_to_scale is set, the result is scaled by the ratio:

Text Only
result_scaled = result × (this_object[attr_to_scale] / sum(source_objects[attr_to_scale]))

Example: If 3 turbines have nominal_power of 2000 kW each (total 6000 kW) and the current SPE has nominal_power of 4000 kW, the scale factor is 4000 / 6000 = 0.667. This is useful when the source objects do not collectively represent 100% of the target object's capacity.


Example feature_options_json

Calculates active_power_reference_echo for an ONS site: sums active_power_reference from all SPEs listed in ons_spes, resamples from 10 min to 30 min (mean), and converts kW → MW:

JSON
{
    "resample": {"rule": "30min", "label": "left", "closed": "right", "method": "mean"},
    "aggregation": "sum",
    "final_scale": 0.001,
    "attr_to_scale": "nominal_power",
    "feature_to_aggregate": "active_power_reference",
    "attr_with_objs_to_aggregate": "ons_spes"
}

Class Definition

FeatureCalcObjectAggregation(object_name, feature)

Class that is used to aggregate features from any type of object and scale it to an attribute if needed.

In summary, it will do the following:

  1. Get the features from the objects that will be aggregated.
  2. Resample the features if necessary.
  3. Shift the features if necessary.
  4. Calculate the desired aggregation method.
  5. Scale the calculated feature if necessary.
  6. Save the calculated feature.

For this to work the desired feature must have attribute server_calc_type defined as object_aggregation and the feature_options_json attribute with the following options:

  • aggregation: str

    Aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".

  • attr_with_objs_to_aggregate: str

    Name of attribute for the current object that contains a list of objects that will be aggregated.

  • feature_to_aggregate: str

    Name of the feature that will be aggregated. This feature must exist in all the objects specified in attr_with_objs_to_aggregate.

  • attr_to_scale: str, optional

    Attribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in attr_with_objs_to_aggregate.

    As an example, this attribute could be the nominal power, number of turbines, etc. If we consider nominal_power, it could be useful if the sum of nominal_power of the objects to aggregate is not equal to the nominal_power of the current object and we want to scale the calculated feature to the nominal_power of the current object.

    In summary, the calculation performed will be:

    • Calculate the feature to aggregate for all objects specified in attr_with_objs_to_aggregate as it would normally be calculated.
    • Sum the value of the scaling attribute for all objects specified in attr_with_objs_to_aggregate.
    • Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in attr_with_objs_to_aggregate.
    • Multiply the calculated feature by the scaling factor.

    If this attribute is not present, the calculated feature will not be scaled.

  • final_scale: float, optional

    Final scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.

  • resample: dict[str, str], optional

    A dict in the format {"rule": , "method": , "closed": , "label": } that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated.

    For more details on the resample method, please check the pandas documentation

    If this option is not present, the features will not be resampled.

  • shift: dict[str, str | int], optional

    A dict in the format {"freq": , "amount": } that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.

Parameters:

  • object_name

    (str) –

    Name of the object that for which the feature will be calculated.

  • feature

    (str) –

    Name of the feature that will be calculated

Source code in echo_energycalc/feature_calc_object_aggregation.py
Python
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    Class that is used to aggregate features from any type of object and scale it to an attribute if needed.

    In summary, it will do the following:

    1. Get the features from the objects that will be aggregated.
    2. Resample the features if necessary.
    3. Shift the features if necessary.
    4. Calculate the desired aggregation method.
    5. Scale the calculated feature if necessary.
    6. Save the calculated feature.

    For this to work the desired feature must have attribute `server_calc_type` defined as `object_aggregation` and the `feature_options_json` attribute with the following options:

    - `aggregation`: str

        Aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".

    - `attr_with_objs_to_aggregate`: str

        Name of attribute for the current object that contains a list of objects that will be aggregated.

    - `feature_to_aggregate`: str

        Name of the feature that will be aggregated. This feature must exist in all the objects specified in `attr_with_objs_to_aggregate`.

    - `attr_to_scale`: str, optional

        Attribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in `attr_with_objs_to_aggregate`.

        As an example, this attribute could be the nominal power, number of turbines, etc. If we consider `nominal_power`, it could be useful if the sum of `nominal_power` of the objects to aggregate is not equal to the `nominal_power` of the current object and we want to scale the calculated feature to the `nominal_power` of the current object.

        In summary, the calculation performed will be:

        - Calculate the feature to aggregate for all objects specified in `attr_with_objs_to_aggregate` as it would normally be calculated.
        - Sum the value of the scaling attribute for all objects specified in `attr_with_objs_to_aggregate`.
        - Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in `attr_with_objs_to_aggregate`.
        - Multiply the calculated feature by the scaling factor.

        If this attribute is not present, the calculated feature will not be scaled.

    - `final_scale`: float, optional

        Final scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.

    - `resample`: dict[str, str], optional

        A dict in the format {"rule": <rule>, "method": <method>, "closed": <right or left>, "label": <right or left>} that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated.

        For more details on the resample method, please check the [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.resample.html)

        If this option is not present, the features will not be resampled.

    - `shift`: dict[str, str | int], optional

        A dict in the format {"freq": <freq as string>, "amount": <amount as int>} that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.

    Parameters
    ----------
    object_name : str
        Name of the object that for which the feature will be calculated.
    feature : str
        Name of the feature that will be calculated
    """
    super().__init__(object_name, feature)

    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))

    self._fetch_requirements()

    self._feature_attributes = self._requirement_data("RequiredFeatureAttributes")[self.feature]

    self.aggregation: str = self._feature_attributes["feature_options_json"]["aggregation"]
    self.attr_with_objs_to_aggregate: str = self._feature_attributes["feature_options_json"]["attr_with_objs_to_aggregate"]
    self.feature_to_aggregate: str = self._feature_attributes["feature_options_json"]["feature_to_aggregate"]
    self.attr_to_scale: str | None = self._feature_attributes["feature_options_json"].get("attr_to_scale", None)
    self.final_scale: float | None = self._feature_attributes["feature_options_json"].get("final_scale", None)
    self.resample: dict[str, str] | None = self._feature_attributes["feature_options_json"].get("resample", None)
    self.shift: dict[str, str | int] | None = self._feature_attributes["feature_options_json"].get("shift", None)

    # getting all the attributes that will be needed from this object
    this_obj_attrs = [self.attr_with_objs_to_aggregate]
    if self.attr_to_scale:
        this_obj_attrs.append(self.attr_to_scale)
    self._add_requirement(RequiredObjectAttributes({self.object: this_obj_attrs}))
    self._fetch_requirements()

    # getting all the objects that will be aggregated
    self.objects_to_aggregate = self._requirement_data("RequiredObjectAttributes")[self.object][self.attr_with_objs_to_aggregate]

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • DataFrame | None

    Polars DataFrame with a "timestamp" column and one or more feature value columns. None until calculate is called.

calculate(period, save_into=None, cached_data=None, **kwargs)

Run the calculation for the given period and optionally save the result.

Calls :meth:_compute to get the result, stores it in :attr:result, then calls :meth:save. Subclasses should implement :meth:_compute instead of overriding this method.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –
    • "all": save in performance_db and bazefield.
    • "performance_db": save only in performance_db.
    • None: do not save.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    Polars DataFrame with features already fetched/calculated. Passed to _compute to enable chained calculations without re-querying performance_db. By default None.

  • **kwargs

    Forwarded to :meth:save.

Returns:

  • DataFrame

    Polars DataFrame with a "timestamp" column and one or more feature value columns.

Source code in echo_energycalc/feature_calc_core.py
Python
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: pl.DataFrame | None = None,
    **kwargs,
) -> pl.DataFrame:
    """
    Run the calculation for the given period and optionally save the result.

    Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
    then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
    of overriding this method.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        - ``"all"``: save in performance_db and bazefield.
        - ``"performance_db"``: save only in performance_db.
        - ``None``: do not save.

        By default None.
    cached_data : pl.DataFrame | None, optional
        Polars DataFrame with features already fetched/calculated. Passed to
        ``_compute`` to enable chained calculations without re-querying
        performance_db. By default None.
    **kwargs
        Forwarded to :meth:`save`.

    Returns
    -------
    pl.DataFrame
        Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
    """
    result = self._compute(period, cached_data=cached_data)
    self._result = result
    self.save(save_into=save_into, **kwargs)
    return result

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
Python
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Please call 'calculate' before calling 'save'.",
        )

    if save_into is None:
        return

    upload_to_bazefield = save_into == "all"

    if not isinstance(self.result, pl.DataFrame):
        raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
    if "timestamp" not in self.result.columns:
        raise ValueError("result DataFrame must contain a 'timestamp' column.")

    # rename feature columns to "object@feature" format expected by perfdb polars insert
    feat_cols = [c for c in self.result.columns if c != "timestamp"]
    result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})

    self._perfdb.features.values.series.insert(
        df=result_pl,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )