Developer Guide¶
This guide walks through adding new components to echo-energycalc: feature calculators, calculation requirements, and job instances. Each section explains the pattern, what must be implemented, and provides a complete worked example.
1. Adding a New Feature Calculator¶
Overview¶
A FeatureCalculator subclass:
- Declares static requirements in
__init__(which data it needs from the DB). - Fetches static requirements immediately (object attributes, calc models, feature attributes).
- Implements
_compute(period, cached_data)which fetches time-series data and returns a Polars DataFrame.
The calculate() method (from the base class) calls _compute(), stores the result, and calls save(). Do not override calculate() — override _compute() instead.
Step 1: Create the module¶
Create echo_energycalc/feature_calc_<your_name>.py:
"""Module for the MyNewCalculator feature calculator."""
from __future__ import annotations
__all__ = ["MyNewCalculator"]
import polars as pl
from echo_datetimerange import DateTimeRange
from .calculation_requirement_features import RequiredFeatures
from .calculation_requirement_object_attributes import RequiredObjectAttributes
from .feature_calc_core import FeatureCalculator
class MyNewCalculator(FeatureCalculator):
"""Brief one-line description.
Longer description of what this calculator does, what methodology it uses,
what fallback strategies exist, etc.
"""
# This string must match the "server_calc_type" attribute of the feature in performance_db.
# It also automatically registers this class in FeatureCalculator._registry.
_name = "my_new_calc"
def __init__(self, object_name: str, feature: str) -> None:
"""
My new calculator.
Requires the following object attributes:
- ``nominal_power`` (kW): used to clip results.
Parameters
----------
object_name : str
Name of the object for which the feature is calculated.
feature : str
Feature to be calculated (must have ``server_calc_type = "my_new_calc"``).
"""
super().__init__(object_name, feature)
# --- Static requirements (fetched immediately, no period needed) ---
self._add_requirement(
RequiredObjectAttributes({self.object: ["nominal_power"]})
)
self._fetch_requirements()
# Read static data already available after the fetch above
obj_attrs = self._requirement_data("RequiredObjectAttributes")[self.object]
self._nominal_power: float = float(obj_attrs["nominal_power"])
# --- Time-series requirements (fetched later inside _compute) ---
self._add_requirement(
RequiredFeatures({self.object: ["ActivePower_10min.AVG"]})
)
def _compute(
self,
period: DateTimeRange,
cached_data: pl.DataFrame | None = None,
) -> pl.DataFrame:
"""Run the calculation for *period*.
Parameters
----------
period : DateTimeRange
Calculation period.
cached_data : pl.DataFrame | None, optional
Pre-fetched features from the CalculationHandler cache.
Returns
-------
pl.DataFrame
DataFrame with columns ``["timestamp", "<feature_name>"]``.
"""
# Fetch time-series data for the period
self._fetch_requirements(period=period, reindex="10min", cached_data=cached_data)
# Get a Polars DataFrame for this object's features
df = self._adjust_features(self.object)
# Build empty result and fill it with your logic
result = self._create_empty_result(period=period, result_type="Series")
# Example: cap active power at nominal power
if "ActivePower_10min.AVG" in df.columns:
clipped = df.with_columns(
pl.col("ActivePower_10min.AVG")
.clip(upper_bound=self._nominal_power)
.alias(self.feature)
).select(["timestamp", self.feature])
result = result.update(clipped, on="timestamp")
return result
Step 2: Configure the database¶
In performance_db, set the server_calc_type attribute of the target feature to "my_new_calc". The CalculationHandler will then use MyNewCalculator for that feature automatically.
Step 3: Add documentation¶
Create docs/feature_calc/feature_calculators/my_new_calc.md with the following structure:
- A heading and Overview section describing what the calculator does.
- A Database Requirements section listing the
server_calc_typevalue and any object/feature attributes. - A Class Definition section with an mkdocstrings autodoc directive pointing to your class.
The directive format is
::: <module_path>on its own line, for example:::: echo_energycalc.feature_calc_my_name.MyNewCalculator
Add the page to mkdocs.yml under the Feature Calculators section.
Key helper methods on FeatureCalculator¶
| Method | When to use |
|---|---|
_add_requirement(req) |
Register a requirement (call in __init__) |
_fetch_requirements(period, reindex, cached_data) |
Validate and fetch all pending requirements |
_requirement_data("RequiredFeatures") |
Merge all RequiredFeatures into one Polars DataFrame |
_requirement_data("RequiredObjectAttributes") |
Merge all RequiredObjectAttributes into one dict |
_requirement_data("RequiredCalcModels") |
Merge all RequiredCalcModels into one dict |
_adjust_features(object_name) |
Get features for an object with standard filters applied (null out invalid wind speed, active power, etc.) |
_create_empty_result(period, result_type) |
Create a null-filled Polars DataFrame with the correct timestamps |
_get_night_mask(timestamps, lat, lon) |
Boolean mask for nighttime rows (uses pvlib) |
Notes on requirements¶
- Static vs time-series requirements: Object attributes, feature attributes, and calc models do not depend on a period and can be fetched in
__init__. Feature data (time series) depends on a period and must be fetched inside_compute. - Optional requirements: Pass
optional=Trueto allow the requirement to be absent without raising an error. - Adding requirements on the fly:
_add_requirementcan be called inside_compute(e.g., after discovering which neighbor turbines are needed)._fetch_requirementshandles deduplication automatically. reindexparameter: Pass"10min"to align all features to a uniform 10-minute timestamp grid. PassNonefor features at other frequencies.
2. Adding a New Calculation Requirement¶
Create a new requirement only if you need to fetch a data type not covered by the existing requirements. In most cases, RequiredFeatures, RequiredObjectAttributes, and RequiredCalcModels are sufficient.
Step 1: Create the class¶
"""Module for RequiredMyData."""
from __future__ import annotations
from typing import Any
from .calculation_requirements_core import CalculationRequirement
class RequiredMyData(CalculationRequirement):
"""Fetches XYZ data from performance_db for a given object."""
def __init__(self, object_name: str, optional: bool = False) -> None:
"""
Parameters
----------
object_name : str
Object to fetch data for.
optional : bool, optional
If True, a missing data set does not raise an error.
"""
super().__init__(optional=optional)
self._object_name = object_name
# ------------------------------------------------------------------ #
# Caching (optional but strongly recommended for static data) #
# ------------------------------------------------------------------ #
def _check_cache_key(self) -> tuple | None:
"""Return a hashable key so identical requests share cached data."""
return (self._object_name,)
def _get_cache_value(self) -> Any:
"""Return the value to store (defaults to self._data)."""
return self._data
def _set_from_cache(self, cached: Any) -> None:
"""Restore from cache."""
import copy
self._data = copy.deepcopy(cached)
# ------------------------------------------------------------------ #
# Core interface #
# ------------------------------------------------------------------ #
def _do_check(self) -> None:
"""Validate that the required data exists."""
if self.optional:
return
exists = self._perfdb.some_check(self._object_name)
if not exists:
raise ValueError(f"Data for '{self._object_name}' not found.")
def get_data(self, **kwargs) -> Any:
"""Fetch and return the data, setting ``self._data``."""
if not self._checked:
self.check()
self._data = self._perfdb.fetch_my_data(self._object_name)
return self._data
def __repr__(self) -> str:
return f"RequiredMyData(object={self._object_name!r}, optional={self.optional})"
Step 2: Expose it in the calculator¶
In your FeatureCalculator.__init__:
from .calculation_requirement_my_data import RequiredMyData
self._add_requirement(RequiredMyData(self.object))
self._fetch_requirements()
Then retrieve with:
my_data = self.requirements["RequiredMyData"][0].data
Or add a "RequiredMyData" case to _requirement_data() in feature_calc_core.py if you want the same merge-and-aggregate pattern as the built-in requirement types.
Caching guidance¶
Override _check_cache_key() when the requirement fetches static (period-independent) data (e.g., config, attributes, models). This allows multiple threads calculating different features for the same object to share the result.
Do not add caching for time-series data (features, alarms) since the result depends on the calculation period.
3. Adding a New Job Instance¶
Jobs are stored in performance_db in the job_instances table, along with their parameters (as JSON) and steps. The JobHandler queries for pending jobs and dispatches them.
Step 1: Create the module¶
"""Module for MyJob."""
from __future__ import annotations
__all__ = ["MyJob"]
from typing import Any
from .job_instance import JobInstance, JobStepError
class MyJob(JobInstance):
"""Brief description of what this job does."""
# Must match the name in the job_types table in performance_db.
_name = "My Job Name"
def __init__(self, job_id: int, skip_test: bool = True) -> None:
"""
Parameters
----------
job_id : int
ID of the job instance from the database.
skip_test : bool, optional
Skip jobs whose description contains "test". Default True.
"""
# Pass delay_steps_creation=True if you need to pre-process parameters
# before creating the DB steps (e.g., resolving object lists).
super().__init__(job_id, skip_test, delay_steps_creation=True)
# Access job parameters from the DB
my_param = self._parameters["my_param"]
# Resolve whatever you need here...
self._items_to_process = self._resolve_items(my_param)
# Create steps in the DB after parameter setup
self._create_database_steps()
def _create_steps(self) -> list[dict[str, Any]]:
"""Return a list of step dicts — one per unit of work."""
return [
{
"job_instance_id": self.job_id,
"parameters": {"item": item},
}
for item in self._items_to_process
]
def _run_step(self, step_id: int) -> None:
"""Execute the work for a single step."""
step = self._perfdb.jobs.steps.get(
job_instance_ids=[self.job_id],
job_step_ids=[step_id],
output_type="dict",
)[step_id]
try:
item = step["parameters"]["item"]
# ... do the actual work ...
except Exception as e:
raise JobStepError(f"Step {step_id} failed: {e}") from e
def _resolve_items(self, param: str) -> list[str]:
"""Helper to resolve items from the parameter."""
return [param]
Step 2: Configure the database¶
Add a row to the job_types table with name = "My Job Name". Create job_instances rows with job_type = "My Job Name" and the required parameters as JSON.
Step 3: Document it¶
Create docs/jobs/my_job.md and add it to mkdocs.yml.
Job step lifecycle¶
JobInstance.__init__
└─ _create_database_steps() → inserts rows into job_steps table
└─ calls _create_steps() to get the list of step dicts
JobInstance.run()
└─ for each pending step:
set state = "Running"
_run_step(step_id)
set state = "Finished" or "Failed"
Jobs automatically transition their own state in the database:
"Created"→"Running"whenrun()starts"Running"→"Finished"on success"Running"→"Failed"on unhandled exception
Raise JobStepError (not a generic exception) inside _run_step to mark only that step as failed without crashing the entire job.