Skip to content

Feature Calculator

Overview

The FeatureCalculator class is an abstract base class that defines the interface for all feature calculators. Each subclass computes a specific type of feature (identified by _name) for a given (object, feature) pair.


Lifecycle

Text Only
__init__(object_name, feature)
  │  1. Validates that object and feature exist in performance_db
  │  2. Validates that feature's server_calc_type matches _name
  │  3. Calls _add_requirement(...) for each needed data source
  │  4. Calls _fetch_requirements() for static requirements
  │     (object attributes, calc models, feature attributes)
  └─ Ready to calculate

calculate(period, save_into, cached_data)   ← called by CalculationHandler
  │  1. Calls _compute(period, cached_data)
  │  2. Stores result in self._result
  │  3. Calls save(save_into)
  └─ Returns pl.DataFrame

_compute(period, cached_data)   ← implemented in subclasses
  │  1. _fetch_requirements(period, ...)  ← for time-series data
  │  2. _requirement_data("RequiredFeatures")  → pl.DataFrame
  │  3. Calculation logic
  └─ Returns pl.DataFrame({"timestamp": ..., "feature_name": ...})

Subclass Implementation

The minimum requirements for a concrete subclass are:

  • _name class attribute: must match the server_calc_type attribute in performance_db.
  • __init__: calls super().__init__(object_name, feature), registers requirements, fetches static ones.
  • _compute: implements the calculation logic and returns a Polars DataFrame.

Complete example: FeatureCalcExample

Python
from __future__ import annotations

import polars as pl
from echo_datetimerange import DateTimeRange

from echo_energycalc.calculation_requirement_object_attributes import RequiredObjectAttributes
from echo_energycalc.feature_calc_core import FeatureCalculator


class FeatureCalcExample(FeatureCalculator):
    """Fills every 10-minute slot with the object's nominal power (example only)."""

    _name = "example_calc"

    def __init__(self, object_name: str, feature: str) -> None:
        """
        Parameters
        ----------
        object_name : str
            Name of the object (must exist in performance_db).
        feature : str
            Name of the feature (must have server_calc_type = "example_calc").
        """
        super().__init__(object_name, feature)

        # Register a static requirement: object attribute "nominal_power"
        self._add_requirement(RequiredObjectAttributes({self.object: ["nominal_power"]}))

        # Fetch static requirements immediately (no period needed for attributes)
        self._fetch_requirements()

        # Read the fetched data and store it as an instance variable
        self._nominal_power = float(
            self._requirement_data("RequiredObjectAttributes")[self.object]["nominal_power"]
        )

    def _compute(
        self,
        period: DateTimeRange,
        cached_data: pl.DataFrame | None = None,  # noqa: ARG002
    ) -> pl.DataFrame:
        """Return nominal_power as a constant for every 10-minute slot.

        Parameters
        ----------
        period : DateTimeRange
            Calculation period.
        cached_data : pl.DataFrame | None, optional
            Passed by CalculationHandler — not used in this example.

        Returns
        -------
        pl.DataFrame
            DataFrame with columns ``["timestamp", self.feature]``.
        """
        result = self._create_empty_result(period=period, result_type="Series", freq="10min")
        result = result.with_columns(pl.lit(self._nominal_power).alias(self.feature))
        return result

Note

If the calculation depends on time-series features (e.g., WindSpeed_10min.AVG), add those requirements in __init__ but do not call _fetch_requirements() for them there. Instead, call _fetch_requirements(period=period, ...) inside _compute, because the period is only known at calculation time.


Working with requirements that depend on a period

When RequiredFeatures or RequiredAlarms are needed:

Python
def __init__(self, object_name: str, feature: str) -> None:
    super().__init__(object_name, feature)

    # Static requirements — fetch immediately
    self._add_requirement(RequiredObjectAttributes({self.object: ["nominal_power"]}))
    self._fetch_requirements()
    self._nominal_power = float(
        self._requirement_data("RequiredObjectAttributes")[self.object]["nominal_power"]
    )

    # Time-series requirements — registered now, fetched later in _compute
    self._add_requirement(
        RequiredFeatures({self.object: ["ActivePower_10min.AVG", "WindSpeed_10min.AVG"]})
    )


def _compute(self, period: DateTimeRange, cached_data: pl.DataFrame | None = None) -> pl.DataFrame:
    # Fetch time-series data for this specific period
    self._fetch_requirements(period=period, reindex="10min", cached_data=cached_data)

    # Get all fetched features as a single Polars DataFrame
    # Columns: "timestamp" + "object@feature" (multi-object) or bare feature names (single object)
    df = self._adjust_features(self.object)

    result = self._create_empty_result(period=period, result_type="Series")
    # ... do your calculation ...
    return result

Key helper methods

_add_requirement(requirement)

Registers a CalculationRequirement instance. Handles deduplication for RequiredFeatures and RequiredVibrationData automatically — the same feature for the same object is never queried twice.

_fetch_requirements(period, reindex, round_timestamps, only_missing, cached_data)

Calls check() then get_data() on all registered requirements. Pass:

  • period: required for any RequiredFeatures or RequiredAlarms.
  • reindex="10min": aligns all feature timestamps to a 10-minute grid.
  • only_missing=True (default): skips requirements already fetched.
  • cached_data: a Polars DataFrame of pre-fetched features from the CalculationHandler in-memory cache, avoiding redundant DB queries when multiple calculators share inputs.

_requirement_data(requirement_type)

Returns the merged data from all requirements of a given type:

requirement_type Return type
"RequiredFeatures" pl.DataFrame — columns "timestamp" + "object@feature"
"RequiredObjectAttributes" dict[object_name, dict[attr, value]]
"RequiredFeatureAttributes" dict[feature_name, dict[attr, value]]
"RequiredCalcModels" dict[object_name, dict[model_name, {model, ...}]]
"RequiredAlarms" pl.DataFrame — alarm event rows
"RequiredVibrationData" pl.DataFrame — raw vibration records
"RequiredVibrationFrequencies" pl.DataFrame — frequency definitions

_adjust_features(object_name, rename_dict, adjust_wind_speed_std, valid_curtailment_states)

Extracts features for one or more objects from _requirement_data("RequiredFeatures") and applies standard quality filters:

  • WindSpeed_10min.AVG ≤ 0 → null
  • ActivePower_10min.AVG ≤ 0 → null
  • ActivePower_10min.AVG when IEC-OperationState_10min.REP ≤ 1 → null
  • ActivePower_10min.AVG when CurtailmentState_10min.REP not in valid states → null

_create_empty_result(period, result_type, freq, columns)

Creates a Polars DataFrame with null-filled columns over the given period:

  • result_type="Series": single column named self.feature
  • result_type="DataFrame": multiple columns specified by columns

_get_night_mask(timestamps, latitude, longitude, utc_offset_hours)

Returns a boolean Polars Series where True means the sun is below the horizon at that timestamp. Useful for solar calculators to zero out nighttime values.


Database Requirements

  • Feature attribute server_calc_type must match the calculator's _name.
  • The CalculationHandler uses FEATURE_CALC_CLASS_MAPPING (auto-populated from all registered subclasses) to find the right class at runtime.

Class Definition

FeatureCalculator(object_name, feature)

Abstract base class for all feature calculators.

Each concrete subclass computes one type of server-calculated feature (identified by the _name class attribute) for a given (object, feature) pair stored in performance_db.

Subclass contract
  1. Define _name as a class attribute matching the server_calc_type attribute in performance_db. This also auto-registers the class in :attr:_registry via :meth:__init_subclass__.
  2. Override __init__: call super().__init__(object_name, feature), register requirements with :meth:_add_requirement, and fetch static (period-independent) requirements with :meth:_fetch_requirements.
  3. Implement :meth:_compute: fetch time-series requirements for the given period, run the calculation, and return a Polars DataFrame with columns ["timestamp", ...].

Do not override :meth:calculate — it calls _compute, stores the result in :attr:result, and delegates to :meth:save.

Data conventions
  • Multi-object intermediate DataFrames use flat "object@feature" column names (e.g. "WT01@WindSpeed_10min.AVG").
  • Single-object result DataFrames use bare feature names as columns plus a "timestamp" column.
  • Timestamps are pl.Datetime with millisecond precision.

Examples:

Minimal subclass::

Text Only
class FeatureCalcExample(FeatureCalculator):
    _name = "example_calc"

    def __init__(self, object_name: str, feature: str) -> None:
        super().__init__(object_name, feature)
        self._add_requirement(RequiredObjectAttributes({self.object: ["nominal_power"]}))
        self._fetch_requirements()
        self._nominal_power = float(self._requirement_data("RequiredObjectAttributes")[self.object]["nominal_power"])

    def _compute(self, period: DateTimeRange, cached_data=None) -> pl.DataFrame:
        result = self._create_empty_result(period=period, result_type="Series")
        result = result.with_columns(pl.lit(self._nominal_power).alias(self.feature))
        return result

This should be overloaded in child classes of FeatureCalculator to define the name, requirements and logic of the feature calculator.

It's extremely important to define the name of the feature calculator in the child class. This name must be equal to the "server_calc_type" attribute of the feature in performance_db and will be used to check if the feature calculator is the correct one for the feature. Set the name of the feature calculator in the child class as a class attribute "_name". Defining _name automatically registers the class in FeatureCalculator._registry via __init_subclass__.

Below there is a simple example on how to define a child class of FeatureCalculator:

Python
class FeatureCalculatorExample(FeatureCalculator):
    # name of the feature calculator
    _name = "name"

    def __init__(self, object_name: str, feature: str) -> None:
        # initialize parent class
        super().__init__(object_name, feature)

        # requirements for the feature calculator
        self.add_requirement(...)

Parameters:

  • object_name

    (str) –

    Name of the object for which the feature is calculated. It must exist in performance_db.

  • feature

    (str) –

    Feature of the object that is calculated. It must exist in performance_db.

Source code in echo_energycalc/feature_calc_core.py
Python
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    Constructor of the FeatureCalculator class.

    This should be overloaded in child classes of FeatureCalculator to define the name, requirements and logic of the feature calculator.

    It's extremely important to define the name of the feature calculator in the child class. This name must be equal to the "server_calc_type" attribute of the feature in performance_db and will be used to check if the feature calculator is the correct one for the feature. Set the name of the feature calculator in the child class as a class attribute "_name". Defining ``_name`` automatically registers the class in ``FeatureCalculator._registry`` via ``__init_subclass__``.

    Below there is a simple example on how to define a child class of FeatureCalculator:

    ```python
    class FeatureCalculatorExample(FeatureCalculator):
        # name of the feature calculator
        _name = "name"

        def __init__(self, object_name: str, feature: str) -> None:
            # initialize parent class
            super().__init__(object_name, feature)

            # requirements for the feature calculator
            self.add_requirement(...)
    ```

    Parameters
    ----------
    object_name : str
        Name of the object for which the feature is calculated. It must exist in performance_db.
    feature : str
        Feature of the object that is calculated. It must exist in performance_db.
    """
    # checking arguments
    if not isinstance(object_name, str):
        raise TypeError(f"object_name must be a string, not {type(object_name)}")
    if not isinstance(feature, str):
        raise TypeError(f"feature must be a string, not {type(feature)}")

    # check if self._name is defined in child class
    if not hasattr(self, "_name"):
        raise ValueError(f"FeatureCalculator name is not defined in {self.__class__.__name__}.")

    # empty set of requirements
    self._requirements = None

    # creating structure that will be used to connect to performance_db
    self._perfdb = PerfDB(application_name=self.__class__.__name__)

    # check if object exists in performance_db
    obj_def = self._perfdb.objects.instances.get(object_names=[object_name], output_type="dict")
    if len(obj_def) == 0:
        raise ValueError(f"Object {object_name} does not exist in performance_db.")
    self._object = object_name
    obj_model = obj_def[object_name]["object_model_name"]

    # check if feature exists in performance_db
    obj_features = self._perfdb.features.definitions.get(
        object_names=[object_name],
        feature_names=[feature],
        get_attributes=True,
        attribute_names=["server_calc_type"],
        output_type="dict",
    )
    if len(obj_features) != 1 or len(obj_features[obj_model]) != 1:
        raise ValueError(f"Feature {feature} does not exist in performance_db for object {object_name}.")
    self._feature = feature

    feature_def = obj_features[obj_model][feature]

    # checking if this calculation name is equal to the "server_calc_type" attribute of the feature
    if "server_calc_type" not in feature_def:
        raise ValueError(
            f"Feature {feature} does not have the attribute 'server_calc_type' in performance_db for object {object_name}.",
        )
    if feature_def["server_calc_type"] != self.name:
        raise ValueError(
            f"Feature '{feature}' of object '{object_name}' has the attribute 'server_calc_type' equal to '{feature_def['server_calc_type']}' in performance_db, which is different from the name of the class {self.__class__.__name__}('{self.name}').",
        )

    # results of the calculation. It will be filled by the method "calculate".
    self._result: pl.DataFrame | None = None

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • DataFrame | None

    Polars DataFrame with a "timestamp" column and one or more feature value columns. None until calculate is called.

calculate(period, save_into=None, cached_data=None, **kwargs)

Run the calculation for the given period and optionally save the result.

Calls :meth:_compute to get the result, stores it in :attr:result, then calls :meth:save. Subclasses should implement :meth:_compute instead of overriding this method.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –
    • "all": save in performance_db and bazefield.
    • "performance_db": save only in performance_db.
    • None: do not save.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    Polars DataFrame with features already fetched/calculated. Passed to _compute to enable chained calculations without re-querying performance_db. By default None.

  • **kwargs

    Forwarded to :meth:save.

Returns:

  • DataFrame

    Polars DataFrame with a "timestamp" column and one or more feature value columns.

Source code in echo_energycalc/feature_calc_core.py
Python
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: pl.DataFrame | None = None,
    **kwargs,
) -> pl.DataFrame:
    """
    Run the calculation for the given period and optionally save the result.

    Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
    then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
    of overriding this method.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        - ``"all"``: save in performance_db and bazefield.
        - ``"performance_db"``: save only in performance_db.
        - ``None``: do not save.

        By default None.
    cached_data : pl.DataFrame | None, optional
        Polars DataFrame with features already fetched/calculated. Passed to
        ``_compute`` to enable chained calculations without re-querying
        performance_db. By default None.
    **kwargs
        Forwarded to :meth:`save`.

    Returns
    -------
    pl.DataFrame
        Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
    """
    result = self._compute(period, cached_data=cached_data)
    self._result = result
    self.save(save_into=save_into, **kwargs)
    return result

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
Python
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Please call 'calculate' before calling 'save'.",
        )

    if save_into is None:
        return

    upload_to_bazefield = save_into == "all"

    if not isinstance(self.result, pl.DataFrame):
        raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
    if "timestamp" not in self.result.columns:
        raise ValueError("result DataFrame must contain a 'timestamp' column.")

    # rename feature columns to "object@feature" format expected by perfdb polars insert
    feat_cols = [c for c in self.result.columns if c != "timestamp"]
    result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})

    self._perfdb.features.values.series.insert(
        df=result_pl,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )