Feature Calculator¶
Overview¶
The FeatureCalculator class is an abstract base class that defines the interface for all feature calculators. Each subclass computes a specific type of feature (identified by _name) for a given (object, feature) pair.
Lifecycle¶
__init__(object_name, feature)
│ 1. Validates that object and feature exist in performance_db
│ 2. Validates that feature's server_calc_type matches _name
│ 3. Calls _add_requirement(...) for each needed data source
│ 4. Calls _fetch_requirements() for static requirements
│ (object attributes, calc models, feature attributes)
└─ Ready to calculate
calculate(period, save_into, cached_data) ← called by CalculationHandler
│ 1. Calls _compute(period, cached_data)
│ 2. Stores result in self._result
│ 3. Calls save(save_into)
└─ Returns pl.DataFrame
_compute(period, cached_data) ← implemented in subclasses
│ 1. _fetch_requirements(period, ...) ← for time-series data
│ 2. _requirement_data("RequiredFeatures") → pl.DataFrame
│ 3. Calculation logic
└─ Returns pl.DataFrame({"timestamp": ..., "feature_name": ...})
Subclass Implementation¶
The minimum requirements for a concrete subclass are:
_nameclass attribute: must match theserver_calc_typeattribute inperformance_db.__init__: callssuper().__init__(object_name, feature), registers requirements, fetches static ones._compute: implements the calculation logic and returns a Polars DataFrame.
Complete example: FeatureCalcExample¶
from __future__ import annotations
import polars as pl
from echo_datetimerange import DateTimeRange
from echo_energycalc.calculation_requirement_object_attributes import RequiredObjectAttributes
from echo_energycalc.feature_calc_core import FeatureCalculator
class FeatureCalcExample(FeatureCalculator):
"""Fills every 10-minute slot with the object's nominal power (example only)."""
_name = "example_calc"
def __init__(self, object_name: str, feature: str) -> None:
"""
Parameters
----------
object_name : str
Name of the object (must exist in performance_db).
feature : str
Name of the feature (must have server_calc_type = "example_calc").
"""
super().__init__(object_name, feature)
# Register a static requirement: object attribute "nominal_power"
self._add_requirement(RequiredObjectAttributes({self.object: ["nominal_power"]}))
# Fetch static requirements immediately (no period needed for attributes)
self._fetch_requirements()
# Read the fetched data and store it as an instance variable
self._nominal_power = float(
self._requirement_data("RequiredObjectAttributes")[self.object]["nominal_power"]
)
def _compute(
self,
period: DateTimeRange,
cached_data: pl.DataFrame | None = None, # noqa: ARG002
) -> pl.DataFrame:
"""Return nominal_power as a constant for every 10-minute slot.
Parameters
----------
period : DateTimeRange
Calculation period.
cached_data : pl.DataFrame | None, optional
Passed by CalculationHandler — not used in this example.
Returns
-------
pl.DataFrame
DataFrame with columns ``["timestamp", self.feature]``.
"""
result = self._create_empty_result(period=period, result_type="Series", freq="10min")
result = result.with_columns(pl.lit(self._nominal_power).alias(self.feature))
return result
Note
If the calculation depends on time-series features (e.g., WindSpeed_10min.AVG), add those requirements in __init__ but do not call _fetch_requirements() for them there. Instead, call _fetch_requirements(period=period, ...) inside _compute, because the period is only known at calculation time.
Working with requirements that depend on a period¶
When RequiredFeatures or RequiredAlarms are needed:
def __init__(self, object_name: str, feature: str) -> None:
super().__init__(object_name, feature)
# Static requirements — fetch immediately
self._add_requirement(RequiredObjectAttributes({self.object: ["nominal_power"]}))
self._fetch_requirements()
self._nominal_power = float(
self._requirement_data("RequiredObjectAttributes")[self.object]["nominal_power"]
)
# Time-series requirements — registered now, fetched later in _compute
self._add_requirement(
RequiredFeatures({self.object: ["ActivePower_10min.AVG", "WindSpeed_10min.AVG"]})
)
def _compute(self, period: DateTimeRange, cached_data: pl.DataFrame | None = None) -> pl.DataFrame:
# Fetch time-series data for this specific period
self._fetch_requirements(period=period, reindex="10min", cached_data=cached_data)
# Get all fetched features as a single Polars DataFrame
# Columns: "timestamp" + "object@feature" (multi-object) or bare feature names (single object)
df = self._adjust_features(self.object)
result = self._create_empty_result(period=period, result_type="Series")
# ... do your calculation ...
return result
Key helper methods¶
_add_requirement(requirement)¶
Registers a CalculationRequirement instance. Handles deduplication for RequiredFeatures and RequiredVibrationData automatically — the same feature for the same object is never queried twice.
_fetch_requirements(period, reindex, round_timestamps, only_missing, cached_data)¶
Calls check() then get_data() on all registered requirements. Pass:
period: required for anyRequiredFeaturesorRequiredAlarms.reindex="10min": aligns all feature timestamps to a 10-minute grid.only_missing=True(default): skips requirements already fetched.cached_data: a Polars DataFrame of pre-fetched features from theCalculationHandlerin-memory cache, avoiding redundant DB queries when multiple calculators share inputs.
_requirement_data(requirement_type)¶
Returns the merged data from all requirements of a given type:
requirement_type |
Return type |
|---|---|
"RequiredFeatures" |
pl.DataFrame — columns "timestamp" + "object@feature" |
"RequiredObjectAttributes" |
dict[object_name, dict[attr, value]] |
"RequiredFeatureAttributes" |
dict[feature_name, dict[attr, value]] |
"RequiredCalcModels" |
dict[object_name, dict[model_name, {model, ...}]] |
"RequiredAlarms" |
pl.DataFrame — alarm event rows |
"RequiredVibrationData" |
pl.DataFrame — raw vibration records |
"RequiredVibrationFrequencies" |
pl.DataFrame — frequency definitions |
_adjust_features(object_name, rename_dict, adjust_wind_speed_std, valid_curtailment_states)¶
Extracts features for one or more objects from _requirement_data("RequiredFeatures") and applies standard quality filters:
WindSpeed_10min.AVG≤ 0 → nullActivePower_10min.AVG≤ 0 → nullActivePower_10min.AVGwhenIEC-OperationState_10min.REP≤ 1 → nullActivePower_10min.AVGwhenCurtailmentState_10min.REPnot in valid states → null
_create_empty_result(period, result_type, freq, columns)¶
Creates a Polars DataFrame with null-filled columns over the given period:
result_type="Series": single column namedself.featureresult_type="DataFrame": multiple columns specified bycolumns
_get_night_mask(timestamps, latitude, longitude, utc_offset_hours)¶
Returns a boolean Polars Series where True means the sun is below the horizon at that timestamp. Useful for solar calculators to zero out nighttime values.
Database Requirements¶
- Feature attribute
server_calc_typemust match the calculator's_name. - The
CalculationHandlerusesFEATURE_CALC_CLASS_MAPPING(auto-populated from all registered subclasses) to find the right class at runtime.
Class Definition¶
FeatureCalculator(object_name, feature)
¶
Abstract base class for all feature calculators.
Each concrete subclass computes one type of server-calculated feature
(identified by the _name class attribute) for a given (object, feature)
pair stored in performance_db.
Subclass contract
- Define
_nameas a class attribute matching theserver_calc_typeattribute inperformance_db. This also auto-registers the class in :attr:_registryvia :meth:__init_subclass__. - Override
__init__: callsuper().__init__(object_name, feature), register requirements with :meth:_add_requirement, and fetch static (period-independent) requirements with :meth:_fetch_requirements. - Implement :meth:
_compute: fetch time-series requirements for the given period, run the calculation, and return a Polars DataFrame with columns["timestamp", ...].
Do not override :meth:calculate — it calls _compute, stores the
result in :attr:result, and delegates to :meth:save.
Data conventions
- Multi-object intermediate DataFrames use flat
"object@feature"column names (e.g."WT01@WindSpeed_10min.AVG"). - Single-object result DataFrames use bare feature names as columns plus
a
"timestamp"column. - Timestamps are
pl.Datetimewith millisecond precision.
Examples:
Minimal subclass::
class FeatureCalcExample(FeatureCalculator):
_name = "example_calc"
def __init__(self, object_name: str, feature: str) -> None:
super().__init__(object_name, feature)
self._add_requirement(RequiredObjectAttributes({self.object: ["nominal_power"]}))
self._fetch_requirements()
self._nominal_power = float(self._requirement_data("RequiredObjectAttributes")[self.object]["nominal_power"])
def _compute(self, period: DateTimeRange, cached_data=None) -> pl.DataFrame:
result = self._create_empty_result(period=period, result_type="Series")
result = result.with_columns(pl.lit(self._nominal_power).alias(self.feature))
return result
This should be overloaded in child classes of FeatureCalculator to define the name, requirements and logic of the feature calculator.
It's extremely important to define the name of the feature calculator in the child class. This name must be equal to the "server_calc_type" attribute of the feature in performance_db and will be used to check if the feature calculator is the correct one for the feature. Set the name of the feature calculator in the child class as a class attribute "_name". Defining _name automatically registers the class in FeatureCalculator._registry via __init_subclass__.
Below there is a simple example on how to define a child class of FeatureCalculator:
class FeatureCalculatorExample(FeatureCalculator):
# name of the feature calculator
_name = "name"
def __init__(self, object_name: str, feature: str) -> None:
# initialize parent class
super().__init__(object_name, feature)
# requirements for the feature calculator
self.add_requirement(...)
Parameters:
-
(object_name¶str) –Name of the object for which the feature is calculated. It must exist in performance_db.
-
(feature¶str) –Feature of the object that is calculated. It must exist in performance_db.
Source code in echo_energycalc/feature_calc_core.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
Constructor of the FeatureCalculator class.
This should be overloaded in child classes of FeatureCalculator to define the name, requirements and logic of the feature calculator.
It's extremely important to define the name of the feature calculator in the child class. This name must be equal to the "server_calc_type" attribute of the feature in performance_db and will be used to check if the feature calculator is the correct one for the feature. Set the name of the feature calculator in the child class as a class attribute "_name". Defining ``_name`` automatically registers the class in ``FeatureCalculator._registry`` via ``__init_subclass__``.
Below there is a simple example on how to define a child class of FeatureCalculator:
```python
class FeatureCalculatorExample(FeatureCalculator):
# name of the feature calculator
_name = "name"
def __init__(self, object_name: str, feature: str) -> None:
# initialize parent class
super().__init__(object_name, feature)
# requirements for the feature calculator
self.add_requirement(...)
```
Parameters
----------
object_name : str
Name of the object for which the feature is calculated. It must exist in performance_db.
feature : str
Feature of the object that is calculated. It must exist in performance_db.
"""
# checking arguments
if not isinstance(object_name, str):
raise TypeError(f"object_name must be a string, not {type(object_name)}")
if not isinstance(feature, str):
raise TypeError(f"feature must be a string, not {type(feature)}")
# check if self._name is defined in child class
if not hasattr(self, "_name"):
raise ValueError(f"FeatureCalculator name is not defined in {self.__class__.__name__}.")
# empty set of requirements
self._requirements = None
# creating structure that will be used to connect to performance_db
self._perfdb = PerfDB(application_name=self.__class__.__name__)
# check if object exists in performance_db
obj_def = self._perfdb.objects.instances.get(object_names=[object_name], output_type="dict")
if len(obj_def) == 0:
raise ValueError(f"Object {object_name} does not exist in performance_db.")
self._object = object_name
obj_model = obj_def[object_name]["object_model_name"]
# check if feature exists in performance_db
obj_features = self._perfdb.features.definitions.get(
object_names=[object_name],
feature_names=[feature],
get_attributes=True,
attribute_names=["server_calc_type"],
output_type="dict",
)
if len(obj_features) != 1 or len(obj_features[obj_model]) != 1:
raise ValueError(f"Feature {feature} does not exist in performance_db for object {object_name}.")
self._feature = feature
feature_def = obj_features[obj_model][feature]
# checking if this calculation name is equal to the "server_calc_type" attribute of the feature
if "server_calc_type" not in feature_def:
raise ValueError(
f"Feature {feature} does not have the attribute 'server_calc_type' in performance_db for object {object_name}.",
)
if feature_def["server_calc_type"] != self.name:
raise ValueError(
f"Feature '{feature}' of object '{object_name}' has the attribute 'server_calc_type' equal to '{feature_def['server_calc_type']}' in performance_db, which is different from the name of the class {self.__class__.__name__}('{self.name}').",
)
# results of the calculation. It will be filled by the method "calculate".
self._result: pl.DataFrame | None = None
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
DataFrame | None–Polars DataFrame with a
"timestamp"column and one or more feature value columns. None untilcalculateis called.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Run the calculation for the given period and optionally save the result.
Calls :meth:_compute to get the result, stores it in :attr:result,
then calls :meth:save. Subclasses should implement :meth:_compute instead
of overriding this method.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –"all": save in performance_db and bazefield."performance_db": save only in performance_db.None: do not save.
By default None.
-
(cached_data¶DataFrame | None, default:None) –Polars DataFrame with features already fetched/calculated. Passed to
_computeto enable chained calculations without re-querying performance_db. By default None. -
–**kwargs¶Forwarded to :meth:
save.
Returns:
-
DataFrame–Polars DataFrame with a
"timestamp"column and one or more feature value columns.
Source code in echo_energycalc/feature_calc_core.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: pl.DataFrame | None = None,
**kwargs,
) -> pl.DataFrame:
"""
Run the calculation for the given period and optionally save the result.
Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
of overriding this method.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
- ``"all"``: save in performance_db and bazefield.
- ``"performance_db"``: save only in performance_db.
- ``None``: do not save.
By default None.
cached_data : pl.DataFrame | None, optional
Polars DataFrame with features already fetched/calculated. Passed to
``_compute`` to enable chained calculations without re-querying
performance_db. By default None.
**kwargs
Forwarded to :meth:`save`.
Returns
-------
pl.DataFrame
Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
"""
result = self._compute(period, cached_data=cached_data)
self._result = result
self.save(save_into=save_into, **kwargs)
return result
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Please call 'calculate' before calling 'save'.",
)
if save_into is None:
return
upload_to_bazefield = save_into == "all"
if not isinstance(self.result, pl.DataFrame):
raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
if "timestamp" not in self.result.columns:
raise ValueError("result DataFrame must contain a 'timestamp' column.")
# rename feature columns to "object@feature" format expected by perfdb polars insert
feat_cols = [c for c in self.result.columns if c != "timestamp"]
result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})
self._perfdb.features.values.series.insert(
df=result_pl,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)