Skip to content

Developer Guide

This guide walks through adding new components to echo-energycalc: feature calculators, calculation requirements, and job instances. Each section explains the pattern, what must be implemented, and provides a complete worked example.


1. Adding a New Feature Calculator

Overview

A FeatureCalculator subclass:

  1. Declares static requirements in __init__ (which data it needs from the DB).
  2. Fetches static requirements immediately (object attributes, calc models, feature attributes).
  3. Implements _compute(period, cached_data) which fetches time-series data and returns a Polars DataFrame.

The calculate() method (from the base class) calls _compute(), stores the result, and calls save(). Do not override calculate() — override _compute() instead.

Step 1: Create the module

Create echo_energycalc/feature_calc_<your_name>.py:

Python
"""Module for the MyNewCalculator feature calculator."""

from __future__ import annotations

__all__ = ["MyNewCalculator"]

import polars as pl
from echo_datetimerange import DateTimeRange

from .calculation_requirement_features import RequiredFeatures
from .calculation_requirement_object_attributes import RequiredObjectAttributes
from .feature_calc_core import FeatureCalculator


class MyNewCalculator(FeatureCalculator):
    """Brief one-line description.

    Longer description of what this calculator does, what methodology it uses,
    what fallback strategies exist, etc.
    """

    # This string must match the "server_calc_type" attribute of the feature in performance_db.
    # It also automatically registers this class in FeatureCalculator._registry.
    _name = "my_new_calc"

    def __init__(self, object_name: str, feature: str) -> None:
        """
        My new calculator.

        Requires the following object attributes:
            - ``nominal_power`` (kW): used to clip results.

        Parameters
        ----------
        object_name : str
            Name of the object for which the feature is calculated.
        feature : str
            Feature to be calculated (must have ``server_calc_type = "my_new_calc"``).
        """
        super().__init__(object_name, feature)

        # --- Static requirements (fetched immediately, no period needed) ---
        self._add_requirement(
            RequiredObjectAttributes({self.object: ["nominal_power"]})
        )
        self._fetch_requirements()

        # Read static data already available after the fetch above
        obj_attrs = self._requirement_data("RequiredObjectAttributes")[self.object]
        self._nominal_power: float = float(obj_attrs["nominal_power"])

        # --- Time-series requirements (fetched later inside _compute) ---
        self._add_requirement(
            RequiredFeatures({self.object: ["ActivePower_10min.AVG"]})
        )

    def _compute(
        self,
        period: DateTimeRange,
        cached_data: pl.DataFrame | None = None,
    ) -> pl.DataFrame:
        """Run the calculation for *period*.

        Parameters
        ----------
        period : DateTimeRange
            Calculation period.
        cached_data : pl.DataFrame | None, optional
            Pre-fetched features from the CalculationHandler cache.

        Returns
        -------
        pl.DataFrame
            DataFrame with columns ``["timestamp", "<feature_name>"]``.
        """
        # Fetch time-series data for the period
        self._fetch_requirements(period=period, reindex="10min", cached_data=cached_data)

        # Get a Polars DataFrame for this object's features
        df = self._adjust_features(self.object)

        # Build empty result and fill it with your logic
        result = self._create_empty_result(period=period, result_type="Series")

        # Example: cap active power at nominal power
        if "ActivePower_10min.AVG" in df.columns:
            clipped = df.with_columns(
                pl.col("ActivePower_10min.AVG")
                  .clip(upper_bound=self._nominal_power)
                  .alias(self.feature)
            ).select(["timestamp", self.feature])
            result = result.update(clipped, on="timestamp")

        return result

Step 2: Configure the database

In performance_db, set the server_calc_type attribute of the target feature to "my_new_calc". The CalculationHandler will then use MyNewCalculator for that feature automatically.

Step 3: Add documentation

Create docs/feature_calc/feature_calculators/my_new_calc.md with the following structure:

  • A heading and Overview section describing what the calculator does.
  • A Database Requirements section listing the server_calc_type value and any object/feature attributes.
  • A Class Definition section with an mkdocstrings autodoc directive pointing to your class. The directive format is ::: <module_path> on its own line, for example: ::: echo_energycalc.feature_calc_my_name.MyNewCalculator

Add the page to mkdocs.yml under the Feature Calculators section.


Key helper methods on FeatureCalculator

Method When to use
_add_requirement(req) Register a requirement (call in __init__)
_fetch_requirements(period, reindex, cached_data) Validate and fetch all pending requirements
_requirement_data("RequiredFeatures") Merge all RequiredFeatures into one Polars DataFrame
_requirement_data("RequiredObjectAttributes") Merge all RequiredObjectAttributes into one dict
_requirement_data("RequiredCalcModels") Merge all RequiredCalcModels into one dict
_adjust_features(object_name) Get features for an object with standard filters applied (null out invalid wind speed, active power, etc.)
_create_empty_result(period, result_type) Create a null-filled Polars DataFrame with the correct timestamps
_get_night_mask(timestamps, lat, lon) Boolean mask for nighttime rows (uses pvlib)

Notes on requirements

  • Static vs time-series requirements: Object attributes, feature attributes, and calc models do not depend on a period and can be fetched in __init__. Feature data (time series) depends on a period and must be fetched inside _compute.
  • Optional requirements: Pass optional=True to allow the requirement to be absent without raising an error.
  • Adding requirements on the fly: _add_requirement can be called inside _compute (e.g., after discovering which neighbor turbines are needed). _fetch_requirements handles deduplication automatically.
  • reindex parameter: Pass "10min" to align all features to a uniform 10-minute timestamp grid. Pass None for features at other frequencies.

2. Adding a New Calculation Requirement

Create a new requirement only if you need to fetch a data type not covered by the existing requirements. In most cases, RequiredFeatures, RequiredObjectAttributes, and RequiredCalcModels are sufficient.

Step 1: Create the class

Python
"""Module for RequiredMyData."""

from __future__ import annotations

from typing import Any

from .calculation_requirements_core import CalculationRequirement


class RequiredMyData(CalculationRequirement):
    """Fetches XYZ data from performance_db for a given object."""

    def __init__(self, object_name: str, optional: bool = False) -> None:
        """
        Parameters
        ----------
        object_name : str
            Object to fetch data for.
        optional : bool, optional
            If True, a missing data set does not raise an error.
        """
        super().__init__(optional=optional)
        self._object_name = object_name

    # ------------------------------------------------------------------ #
    # Caching (optional but strongly recommended for static data)         #
    # ------------------------------------------------------------------ #

    def _check_cache_key(self) -> tuple | None:
        """Return a hashable key so identical requests share cached data."""
        return (self._object_name,)

    def _get_cache_value(self) -> Any:
        """Return the value to store (defaults to self._data)."""
        return self._data

    def _set_from_cache(self, cached: Any) -> None:
        """Restore from cache."""
        import copy
        self._data = copy.deepcopy(cached)

    # ------------------------------------------------------------------ #
    # Core interface                                                       #
    # ------------------------------------------------------------------ #

    def _do_check(self) -> None:
        """Validate that the required data exists."""
        if self.optional:
            return

        exists = self._perfdb.some_check(self._object_name)
        if not exists:
            raise ValueError(f"Data for '{self._object_name}' not found.")

    def get_data(self, **kwargs) -> Any:
        """Fetch and return the data, setting ``self._data``."""
        if not self._checked:
            self.check()
        self._data = self._perfdb.fetch_my_data(self._object_name)
        return self._data

    def __repr__(self) -> str:
        return f"RequiredMyData(object={self._object_name!r}, optional={self.optional})"

Step 2: Expose it in the calculator

In your FeatureCalculator.__init__:

Python
from .calculation_requirement_my_data import RequiredMyData

self._add_requirement(RequiredMyData(self.object))
self._fetch_requirements()

Then retrieve with:

Python
my_data = self.requirements["RequiredMyData"][0].data

Or add a "RequiredMyData" case to _requirement_data() in feature_calc_core.py if you want the same merge-and-aggregate pattern as the built-in requirement types.

Caching guidance

Override _check_cache_key() when the requirement fetches static (period-independent) data (e.g., config, attributes, models). This allows multiple threads calculating different features for the same object to share the result.

Do not add caching for time-series data (features, alarms) since the result depends on the calculation period.


3. Adding a New Job Instance

Jobs are stored in performance_db in the job_instances table, along with their parameters (as JSON) and steps. The JobHandler queries for pending jobs and dispatches them.

Step 1: Create the module

Python
"""Module for MyJob."""

from __future__ import annotations

__all__ = ["MyJob"]

from typing import Any

from .job_instance import JobInstance, JobStepError


class MyJob(JobInstance):
    """Brief description of what this job does."""

    # Must match the name in the job_types table in performance_db.
    _name = "My Job Name"

    def __init__(self, job_id: int, skip_test: bool = True) -> None:
        """
        Parameters
        ----------
        job_id : int
            ID of the job instance from the database.
        skip_test : bool, optional
            Skip jobs whose description contains "test". Default True.
        """
        # Pass delay_steps_creation=True if you need to pre-process parameters
        # before creating the DB steps (e.g., resolving object lists).
        super().__init__(job_id, skip_test, delay_steps_creation=True)

        # Access job parameters from the DB
        my_param = self._parameters["my_param"]

        # Resolve whatever you need here...
        self._items_to_process = self._resolve_items(my_param)

        # Create steps in the DB after parameter setup
        self._create_database_steps()

    def _create_steps(self) -> list[dict[str, Any]]:
        """Return a list of step dicts — one per unit of work."""
        return [
            {
                "job_instance_id": self.job_id,
                "parameters": {"item": item},
            }
            for item in self._items_to_process
        ]

    def _run_step(self, step_id: int) -> None:
        """Execute the work for a single step."""
        step = self._perfdb.jobs.steps.get(
            job_instance_ids=[self.job_id],
            job_step_ids=[step_id],
            output_type="dict",
        )[step_id]

        try:
            item = step["parameters"]["item"]
            # ... do the actual work ...
        except Exception as e:
            raise JobStepError(f"Step {step_id} failed: {e}") from e

    def _resolve_items(self, param: str) -> list[str]:
        """Helper to resolve items from the parameter."""
        return [param]

Step 2: Configure the database

Add a row to the job_types table with name = "My Job Name". Create job_instances rows with job_type = "My Job Name" and the required parameters as JSON.

Step 3: Document it

Create docs/jobs/my_job.md and add it to mkdocs.yml.


Job step lifecycle

Text Only
JobInstance.__init__
  └─ _create_database_steps()  →  inserts rows into job_steps table
       └─ calls _create_steps() to get the list of step dicts

JobInstance.run()
  └─ for each pending step:
       set state = "Running"
       _run_step(step_id)
       set state = "Finished" or "Failed"

Jobs automatically transition their own state in the database:

  • "Created""Running" when run() starts
  • "Running""Finished" on success
  • "Running""Failed" on unhandled exception

Raise JobStepError (not a generic exception) inside _run_step to mark only that step as failed without crashing the entire job.