Skip to content

Object Aggregation

Overview

The FeatureCalcObjectAggregation class is a subclass of FeatureCalculator that calculates the aggregated value of a feature for a specific list of objects. This is useful when we want to get the values for a specific feature for a list of objects and then aggregate them in a specific way to have a single value for a parent object.

Calculation Logic

The calculation logic is described in the constructor of the class, shown below in the Class Definition section.

Database Requirements

  • Feature attribute server_calc_type must be set to object_aggregation.
  • Feature attribute feature_options_json with the keys as described in the Class Definition section.

    As an example, the following JSON is used for feature active_power_reference_echo that calculates the reference active power for a ONS site, converting the values to 30 minute averages in MW. The source feature is active_power_reference from the objects defined in the ons_spes attribute.

    {
        "resample": {"rule": "30min", "label": "left", "closed": "right", "method": "mean"},
        "aggregation": "sum",
        "final_scale": 0.001,
        "attr_to_scale": "nominal_power",
        "feature_to_aggregate": "active_power_reference",
        "attr_with_objs_to_aggregate": "ons_spes"
    }
    

Class Definition

FeatureCalcObjectAggregation(object_name, feature)

Class that is used to aggregate features from any type of object and scale it to an attribute if needed.

In summary, it will do the following:

  1. Get the features from the objects that will be aggregated.
  2. Resample the features if necessary.
  3. Shift the features if necessary.
  4. Calculate the desired aggregation method.
  5. Scale the calculated feature if necessary.
  6. Save the calculated feature.

For this to work the desired feature must have attribute server_calc_type defined as object_aggregation and the feature_options_json attribute with the following options:

  • aggregation: str

    Aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".

  • attr_with_objs_to_aggregate: str

    Name of attribute for the current object that contains a list of objects that will be aggregated.

  • feature_to_aggregate: str

    Name of the feature that will be aggregated. This feature must exist in all the objects specified in attr_with_objs_to_aggregate.

  • attr_to_scale: str, optional

    Attribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in attr_with_objs_to_aggregate.

    As an example, this attribute could be the nominal power, number of turbines, etc. If we consider nominal_power, it could be useful if the sum of nominal_power of the objects to aggregate is not equal to the nominal_power of the current object and we want to scale the calculated feature to the nominal_power of the current object.

    In summary, the calculation performed will be:

    • Calculate the feature to aggregate for all objects specified in attr_with_objs_to_aggregate as it would normally be calculated.
    • Sum the value of the scaling attribute for all objects specified in attr_with_objs_to_aggregate.
    • Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in attr_with_objs_to_aggregate.
    • Multiply the calculated feature by the scaling factor.

    If this attribute is not present, the calculated feature will not be scaled.

  • final_scale: float, optional

    Final scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.

  • resample: dict[str, str], optional

    A dict in the format {"rule": , "method": , "closed": , "label": } that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated.

    For more details on the resample method, please check the pandas documentation

    If this option is not present, the features will not be resampled.

  • shift: dict[str, str | int], optional

    A dict in the format {"freq": , "amount": } that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.

Parameters:

  • object_name

    (str) –

    Name of the object that for which the feature will be calculated.

  • feature

    (str) –

    Name of the feature that will be calculated

Source code in echo_energycalc/feature_calc_common.py
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    Class that is used to aggregate features from any type of object and scale it to an attribute if needed.

    In summary, it will do the following:

    1. Get the features from the objects that will be aggregated.
    2. Resample the features if necessary.
    3. Shift the features if necessary.
    4. Calculate the desired aggregation method.
    5. Scale the calculated feature if necessary.
    6. Save the calculated feature.

    For this to work the desired feature must have attribute `server_calc_type` defined as `object_aggregation` and the `feature_options_json` attribute with the following options:

    - `aggregation`: str

        Aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or "feature_eval_expression".

    - `attr_with_objs_to_aggregate`: str

        Name of attribute for the current object that contains a list of objects that will be aggregated.

    - `feature_to_aggregate`: str

        Name of the feature that will be aggregated. This feature must exist in all the objects specified in `attr_with_objs_to_aggregate`.

    - `attr_to_scale`: str, optional

        Attribute to scale the calculated feature if necessary. This attribute must be present both in the current object and in all the objects specified in `attr_with_objs_to_aggregate`.

        As an example, this attribute could be the nominal power, number of turbines, etc. If we consider `nominal_power`, it could be useful if the sum of `nominal_power` of the objects to aggregate is not equal to the `nominal_power` of the current object and we want to scale the calculated feature to the `nominal_power` of the current object.

        In summary, the calculation performed will be:

        - Calculate the feature to aggregate for all objects specified in `attr_with_objs_to_aggregate` as it would normally be calculated.
        - Sum the value of the scaling attribute for all objects specified in `attr_with_objs_to_aggregate`.
        - Get the scaling factor by dividing the value of the scaling attribute of the current object by the sum of the scaling attribute of all objects specified in `attr_with_objs_to_aggregate`.
        - Multiply the calculated feature by the scaling factor.

        If this attribute is not present, the calculated feature will not be scaled.

    - `final_scale`: float, optional

        Final scale factor to be applied to the calculated feature. This is useful if you want to scale the calculated feature to a specific value.

    - `resample`: dict[str, str], optional

        A dict in the format {"rule": <rule>, "method": <method>, "closed": <right or left>, "label": <right or left>} that will be used to resample the features before aggregating them. This is useful if you want to have the final feature with a different frequency than the features that will be aggregated.

        For more details on the resample method, please check the [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.resample.html)

        If this option is not present, the features will not be resampled.

    - `shift`: dict[str, str | int], optional

        A dict in the format {"freq": <freq as string>, "amount": <amount as int>} that will be used to shift the features before aggregating them. This is useful if you are aggregating data that does not have a matching timestamp to the desired of the final feature.

    Parameters
    ----------
    object_name : str
        Name of the object that for which the feature will be calculated.
    feature : str
        Name of the feature that will be calculated
    """
    super().__init__(object_name, feature)

    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))

    self._get_required_data()

    self._feature_attributes = self._get_requirement_data("RequiredFeatureAttributes")[self.feature]

    self.aggregation: str = self._feature_attributes["feature_options_json"]["aggregation"]
    self.attr_with_objs_to_aggregate: str = self._feature_attributes["feature_options_json"]["attr_with_objs_to_aggregate"]
    self.feature_to_aggregate: str = self._feature_attributes["feature_options_json"]["feature_to_aggregate"]
    self.attr_to_scale: str | None = self._feature_attributes["feature_options_json"].get("attr_to_scale", None)
    self.final_scale: float | None = self._feature_attributes["feature_options_json"].get("final_scale", None)
    self.resample: dict[str, str] | None = self._feature_attributes["feature_options_json"].get("resample", None)
    self.shift: dict[str, str | int] | None = self._feature_attributes["feature_options_json"].get("shift", None)

    # getting all the attributes that will be needed from this object
    this_obj_attrs = [self.attr_with_objs_to_aggregate]
    if self.attr_to_scale:
        this_obj_attrs.append(self.attr_to_scale)
    self._add_requirement(RequiredObjectAttributes({self.object: this_obj_attrs}))
    self._get_required_data()

    # getting all the objects that will be aggregated
    self.objects_to_aggregate = self._get_requirement_data("RequiredObjectAttributes")[self.object][self.attr_with_objs_to_aggregate]

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • Series | DataFrame | None:

    Result of the calculation if the method "calculate" was called. None otherwise.

calculate(period, save_into=None, cached_data=None, **kwargs)

Method that will calculate the feature

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None

  • **kwargs

    (dict, default: {} ) –

    Additional arguments that will be passed to the "save" method.

Source code in echo_energycalc/feature_calc_common.py
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: DataFrame | None = None,  # noqa: ARG002
    **kwargs,
) -> None:
    """
    Method that will calculate the feature

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    cached_data : DataFrame | None, optional
        DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
        By default None
    **kwargs : dict, optional
        Additional arguments that will be passed to the "save" method.
    """
    # defining the period that will be used to get the features from the objects to aggregate
    this_period = period.copy()
    # adjusting for resample
    if self.resample:
        timedelta_rule = to_offset(self.resample["rule"])
        this_period.start = Timestamp(this_period.start - timedelta_rule).floor(timedelta_rule).to_pydatetime()
        this_period.end = Timestamp(this_period.end + timedelta_rule).ceil(timedelta_rule).to_pydatetime()
    # adjusting for shift
    if self.shift:
        shift_rule = to_offset(self.shift["freq"])
        this_period.start = (this_period.start - shift_rule * self.shift["amount"]).to_pydatetime()
        this_period.end = (this_period.end + shift_rule * self.shift["amount"]).to_pydatetime()

    # fetching the last features and attributes required
    for required_object in self.objects_to_aggregate:
        try:
            self._add_requirement(
                RequiredFeatures(
                    {required_object: [self.feature_to_aggregate]},
                ),
            )
            self._get_required_data(this_period)
        except Exception as e:
            raise ValueError(
                f"Could not get feature {self.feature_to_aggregate} from {required_object} while calculating {self.feature} feature from the object {self.object}.",
            ) from e

        if self.attr_to_scale:
            try:
                self._add_requirement(
                    RequiredObjectAttributes(
                        {required_object: [self.attr_to_scale]},
                    ),
                )
                self._get_required_data()
            except Exception as ex:
                raise ValueError(
                    f"Could not get attribute {self.attr_to_scale} from {required_object} while calculating {self.feature} feature from the object {self.object}.",
                ) from ex

        df = self._get_requirement_data("RequiredFeatures").droplevel(1, axis=1).copy()

    # resampling
    if self.resample:
        try:
            df = df.resample(
                self.resample["rule"],
                closed=self.resample["closed"],
                label=self.resample["label"],
                origin="start_day",
            ).agg(self.resample["method"])
        except Exception as e:
            raise ValueError(
                f"Could not resample while calculating {self.feature} feature from the object {self.object}. Resample arguments were {self.resample}",
            ) from e

    # shifting
    if self.shift:
        try:
            df = df.shift(self.shift["amount"], freq=self.shift["freq"])
        except Exception as e:
            raise ValueError(
                f"Could not shift while calculating {self.feature} feature from the object {self.object}. Shift arguments were {self.shift}",
            ) from e

    # calculating aggregation
    match self.aggregation:
        case "avg":
            df[self.feature] = df.mean(axis=1, skipna=True)
        case "sum":
            df[self.feature] = df.sum(axis=1, skipna=True)
        case "count":
            df[self.feature] = df.count(axis=1, skipna=True)
        case "max":
            df[self.feature] = df.max(axis=1, skipna=True)
        case "min":
            df[self.feature] = df.min(axis=1, skipna=True)
        case "median":
            df[self.feature] = df.median(axis=1, skipna=True)
        case "std":
            df[self.feature] = df.std(axis=1, skipna=True)
        case "var":
            df[self.feature] = df.var(axis=1, skipna=True)
        case "feature_eval_expression":
            eval_expression = None
            if "feature_eval_expression" in self._feature_attributes:
                eval_expression = self._feature_attributes["feature_eval_expression"]

            if eval_expression is None:
                raise ValueError(
                    "Feature option 'feature_eval_expression' is not present in feature attributes. This is needed when aggregation is 'feature_eval_expression'.",
                )

            # replacing object attributes in expression
            eval_expression = replace_object_attributes_in_expression(eval_expression, self.object, self._perfdb)

            # calculating by evaluating the expression
            loc = {"df": df}
            exec(eval_expression, globals(), loc)  # pylint: disable=exec-used  # noqa: S102
            df = loc["df"]

    # scaling the features if attr_to_scale exists
    if self.attr_to_scale:
        this_obj_attr_to_scale: float = self._get_requirement_data("RequiredObjectAttributes")[self.object][self.attr_to_scale]

        other_objs_attr: float = sum(
            self._get_requirement_data("RequiredObjectAttributes")[x][self.attr_to_scale]
            for x in self.objects_to_aggregate
            if x in df.columns.to_list()
        )

        log_str = f"Scaling the results of the aggregation of the feature {self.feature} of {self.object} "
        log_str += f"based on {self.attr_to_scale} of "
        log_str += f"{[x for x in self.objects_to_aggregate if x in df.columns.to_list()]}. "
        log_str += f"Base: {other_objs_attr}, Target: {this_obj_attr_to_scale}"
        logger.debug(log_str)
        df[self.feature] = this_obj_attr_to_scale * (df[self.feature] / other_objs_attr)

    # removing period outside of the desired period
    df = df.loc[(df.index >= period.start) & (df.index <= period.end)]

    # final scaling
    if self.final_scale:
        df[self.feature] = self.final_scale * df[self.feature]

    # adding calculated feature to class result attribute
    self._result = df[self.feature].copy()

    # saving results
    self.save(save_into=save_into, **kwargs)

    return df[self.feature]

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
        )

    if save_into is None:
        return

    if isinstance(save_into, str):
        if save_into not in ["performance_db", "all"]:
            raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
        upload_to_bazefield = save_into == "all"
    elif save_into is None:
        upload_to_bazefield = False
    else:
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")

    # converting result series to DataFrame if needed
    if isinstance(self.result, Series):
        result_df = self.result.to_frame()
    elif isinstance(self.result, DataFrame):
        result_df = self.result.droplevel(0, axis=1)
    else:
        raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")

    # adjusting DataFrame to be inserted in the database
    # making the columns a Multindex with levels object_name and feature_name
    result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])

    self._perfdb.features.values.series.insert(
        df=result_df,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )