Skip to content

SPE Aggregation

Overview

The FeatureCalcSpeAggregation class is a subclass of FeatureCalculator that calculates the aggregated value of a feature for a specific SPE. This is useful when we want to get the values for a specific feature for all child objects of a type for a specific SPE and then aggregate them in a specific way to have a single value for the parent object.

Calculation Logic

The calculation logic is described in the constructor of the class, shown below in the Class Definition section.

Database Requirements

  • Feature attribute server_calc_type must be set to spe_aggregation.
  • Feature attribute feature_options_json with the following keys:

    • aggregation: A string that defines the aggregation method to be used. The following methods are available: "avg", "sum", "max", "min", "median", "std", "var", "count" or feature_eval_expression.
    • child_object_type: A string that defines the type of the child object that we want to aggregate.
    • child_feature: A string that defines the feature that we want to aggregate.

    The following example is used for feature lost_power_curtailment_park_turbine_sum:

    {
        "aggregation": "sum",
        "child_feature": "lost_power_curtailment_park",
        "child_object_type": "wind_turbine"
    }
    
  • In case the aggregation key is set to feature_eval_expression, the following keys must be present:

    • feature_eval_expression: A string that defines the Python code that calculates the feature value. This code will have access to a DataFrame with the wanted feature for all the child objects (df) and should create a new column in this DataFrame with the aggregated value, doing the necessary logic to aggregate the values. It is useful when the aggregation method is more complex than the ones available in the aggregation key.

Class Definition

FeatureCalcSpeAggregation(object_name, feature)

FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.

For this to work the desired feature must have the feature_options_json attribute in performance_db, with the following keys:

  • aggregation: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or feature_eval_expression.
  • child_object_type: str, object type of the child objects.
  • child_feature: str, name of the feature of child objects that will be aggregated.

If the aggregation method is feature_eval_expression, the feature_eval_expression attribute must be present in performance_db as well.

The method will look for all objects that have the spe_name column in the objects table equal to the wanted SPE object and the object_type column equal to the child_object_type attribute.

Please make sure that the child objects have the desired feature to be aggregated.

Parameters:

  • object_name

    (str) –

    Name of the object for which the feature is calculated. It must be an SPE.

  • feature

    (str) –

    Feature of the object that is calculated. It must exist in performance_db.

Source code in echo_energycalc/feature_calc_common.py
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    FeatureCalculator class for features that rely on aggregating features of child objects of a given SPE.

    For this to work the desired feature must have the `feature_options_json` attribute in performance_db, with the following keys:

    - `aggregation`: str, aggregation method to be used. Can be "avg", "sum", "max", "min", "median", "std", "var", "count" or `feature_eval_expression`.
    - `child_object_type`: str, object type of the child objects.
    - `child_feature`: str, name of the feature of child objects that will be aggregated.

    If the aggregation method is `feature_eval_expression`, the `feature_eval_expression` attribute must be present in performance_db as well.

    The method will look for all objects that have the `spe_name` column in the `objects` table equal to the wanted SPE object and the `object_type` column equal to the `child_object_type` attribute.

    Please make sure that the child objects have the desired feature to be aggregated.

    Parameters
    ----------
    object_name : str
        Name of the object for which the feature is calculated. It must be an SPE.
    feature : str
        Feature of the object that is calculated. It must exist in performance_db.
    """
    # initialize parent class
    super().__init__(object_name, feature)

    # requirements for the feature calculator
    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
    self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_eval_expression"], optional=True))

    # getting all required attributes
    self._get_required_data()

    self._feature_attributes = self._get_requirement_data("RequiredFeatureAttributes")[self.feature]
    # validating feature options
    self._validate_feature_options()

    # getting child objects
    spe_objects = self._perfdb.objects.instances.get(
        spe_names=[self.object],
        object_types=[self._feature_attributes["feature_options_json"]["child_object_type"]],
    )
    spe_objects = list(spe_objects.keys())

    # defining required features
    self._add_requirement(
        RequiredFeatures(
            {child_object: [self._feature_attributes["feature_options_json"]["child_feature"]] for child_object in spe_objects},
        ),
    )

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • Series | DataFrame | None:

    Result of the calculation if the method "calculate" was called. None otherwise.

calculate(period, save_into=None, cached_data=None, **kwargs)

Method that will calculate the Child Aggregation feature.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None

  • **kwargs

    (dict, default: {} ) –

    Additional arguments that will be passed to the "save" method.

Returns:

  • Series

    Pandas Series with the calculated feature.

Source code in echo_energycalc/feature_calc_common.py
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: DataFrame | None = None,
    **kwargs,
) -> Series:
    """
    Method that will calculate the Child Aggregation feature.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    cached_data : DataFrame | None, optional
        DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
        By default None
    **kwargs : dict, optional
        Additional arguments that will be passed to the "save" method.

    Returns
    -------
    Series
        Pandas Series with the calculated feature.
    """
    # getting required features
    self._get_required_data(period=period, cached_data=cached_data)

    # saving needed features into df as expressions assumes we have a DataFrame called "df" to do all the calculations
    # in here we are dropping level 1 as all features are the same and we expect that the name of the columns are the name of the child objects
    df = self._get_requirement_data("RequiredFeatures").droplevel(1, axis=1).copy()

    # calculating
    match self._feature_attributes["feature_options_json"]["aggregation"]:
        case "avg":
            df[self.feature] = df.mean(axis=1, skipna=True)
        case "sum":
            df[self.feature] = df.sum(axis=1, skipna=True)
        case "count":
            df[self.feature] = df.count(axis=1, skipna=True)
        case "max":
            df[self.feature] = df.max(axis=1, skipna=True)
        case "min":
            df[self.feature] = df.min(axis=1, skipna=True)
        case "median":
            df[self.feature] = df.median(axis=1, skipna=True)
        case "std":
            df[self.feature] = df.std(axis=1, skipna=True)
        case "var":
            df[self.feature] = df.var(axis=1, skipna=True)
        case "feature_eval_expression":
            eval_expression = None
            if "feature_eval_expression" in self._feature_attributes:
                eval_expression = self._feature_attributes["feature_eval_expression"]

            if eval_expression is None:
                raise ValueError(
                    "Feature option 'feature_eval_expression' is not present in feature attributes. This is needed when aggregation is 'feature_eval_expression'.",
                )

            # replacing object attributes in expression
            eval_expression = replace_object_attributes_in_expression(eval_expression, self.object, self._perfdb)

            # calculating by evaluating the expression
            loc = {"df": df}
            exec(eval_expression, globals(), loc)  # pylint: disable=exec-used  # noqa: S102
            df = loc["df"]

    # adding calculated feature to class result attribute
    self._result = df[self.feature].copy()

    # saving results
    self.save(save_into=save_into, **kwargs)

    return df[self.feature]

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
        )

    if save_into is None:
        return

    if isinstance(save_into, str):
        if save_into not in ["performance_db", "all"]:
            raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
        upload_to_bazefield = save_into == "all"
    elif save_into is None:
        upload_to_bazefield = False
    else:
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")

    # converting result series to DataFrame if needed
    if isinstance(self.result, Series):
        result_df = self.result.to_frame()
    elif isinstance(self.result, DataFrame):
        result_df = self.result.droplevel(0, axis=1)
    else:
        raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")

    # adjusting DataFrame to be inserted in the database
    # making the columns a Multindex with levels object_name and feature_name
    result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])

    self._perfdb.features.values.series.insert(
        df=result_df,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )