Skip to content

Calculation Handler

Overview

CalculationHandler is the main orchestrator for running feature calculations. It discovers which (object, feature) pairs need to be calculated, instantiates the correct FeatureCalculator subclass for each pair, executes the calculations, manages an in-memory result cache to avoid redundant database queries, and persists results back to performance_db.

It is used by the Airflow DAG feature-calculator and in ad-hoc scripts whenever batch calculations need to be triggered programmatically.

Note

Prefer CalculationHandler over using FeatureCalculator classes directly — it handles ordering, caching, error collection, memory management, and batched database writes that individual calculators cannot do on their own.


Internal Architecture

CalculationHandler is built from three mixins plus the main class:

Text Only
CalculationHandler
├── _FeatureEnumeratorMixin   — discovers (object, feature) pairs from performance_db
├── _ResultCacheMixin         — manages in-memory cache of results as flat "obj@feature" columns
└── _ResultPersisterMixin     — batches results and writes to performance_db

Construction and Feature Enumeration

When CalculationHandler is instantiated:

  1. Connects to performance_db.
  2. Queries all objects matching the provided object_filters (e.g., object_types=["wind_turbine"]).
  3. Queries all feature definitions for those objects' models, filtering to data_source_type = "server_calc" and fetching the server_calc_type attribute.
  4. Joins objects × features to produce the full list of (object, feature) pairs to calculate.
  5. Preserves the requested feature order — features are calculated in the exact order they appear in the features argument. This is critical for chained calculations where feature B depends on feature A.
  6. Optionally filters out ignored_features.
  7. Objects with a calculation_disabled attribute set to "all" or listing a specific feature name are skipped for those features at calculation time.

Execution Flow

Text Only
calculate(period, ...)
│
├── For each feature (in request order):
│   ├── For each object model that has this feature:
│   │   ├── Snapshot current in-memory cache (once per batch)
│   │   │
│   │   ├── [main thread] Pre-create all FeatureCalculator instances sequentially
│   │   │   (avoids concurrent echo-postgres DB calls in __init__)
│   │   │
│   │   ├── [sequential or ThreadPoolExecutor if max_workers > 1]
│   │   │   For each object of this model:
│   │   │     1. Check calculation_disabled attribute — skip if matched
│   │   │     2. Use pre-created FeatureCalculator instance
│   │   │     3. calculator.calculate(period, cached_data=snapshot, ...)
│   │   │     4. Return result dict without touching shared state
│   │   │
│   │   └── [after all workers exit] Merge each result into shared state:
│   │       - Add to in-memory result store
│   │       - Log any exception to ErrorDataSource
│   │       - Update feature completion tracking
│   │       - If save_method="end": save all features of a completed object in one DB write
│   │       - If memory > MAX_MEMORY_USAGE: flush all results and clear cache
│
└── Return ErrorDataSource with all collected errors

Parallel Execution

Within each (feature, object_model) batch, multiple objects can be calculated concurrently using ThreadPoolExecutor. The number of concurrent workers is controlled by the max_workers parameter (default: 1 — sequential).

Why the default is sequential

Threading in this codebase runs into two independent deadlock mechanisms that make max_workers > 1 unreliable in production environments:

1. Rayon work-stealing + GIL re-entry

Polars uses a single global Rayon thread pool (POOL). When a Python thread calls any Polars operation, it calls POOL.install(closure) which submits work to all available Rayon threads via work-stealing. With POLARS_MAX_THREADS > 1, a Rayon worker executing on behalf of thread A can steal a Python-callable task from thread B's install() scope and then block on PyGILState_Ensure while thread A still holds the GIL — a circular wait.

Setting POLARS_MAX_THREADS=1 prevents cross-install() work-stealing and eliminates this risk. The calculate() method enforces this and falls back to max_workers=1 with a warning if the condition is not met.

Warning

Increasing POLARS_MAX_THREADS to match or exceed max_workers does not fix this. Rayon's work-stealing is non-deterministic — a larger pool reduces the probability of cross-contamination but does not eliminate it. POLARS_MAX_THREADS=1 is the only safe setting.

2. echo-postgres class-level shared state

Several echo-postgres functions cache DataFrames at the class level and perform Polars/pandas operations on those shared objects when called from multiple threads simultaneously. Operations like join, to_pandas, cast_attributes, and merge on the same DataFrame instance from concurrent threads deadlock regardless of POLARS_MAX_THREADS.

The most common trigger is eval-expression features (server_calc_type = "expression_evaluation"). Their expression strings can call helper functions such as filters.power_curve_offset_filter that query echo-postgres internally — these calls happen inside the restricted exec() at compute time and cannot be pre-fetched on the main thread. With 4 workers all running the same expression type simultaneously, they all hit echo_postgres.object_instances.get() concurrently and deadlock.

Automatic fallbacks

calculate() automatically reduces max_workers to 1 and emits a warnings.warn when either of the following is detected at call time:

Condition Detection mechanism
POLARS_MAX_THREADS > 1 pl.thread_pool_size() > 1
Process was forked (e.g. Airflow Celery) os.register_at_fork flag set at fork time

The fork detection uses os.register_at_fork(after_in_child=...) rather than PID comparison, which correctly catches Airflow Celery workers even when echo-energycalc is imported after the fork.

When max_workers > 1 is safe

Outside a forked environment and with POLARS_MAX_THREADS=1, parallelism is safe if and only if none of the features being calculated use eval expressions that call echo-postgres functions at compute time. Features whose server_calc_type is anything other than "expression_evaluation" are generally safe because their _compute methods perform pure Polars operations on thread-local DataFrames.

Bash
# Required environment variable — must be set before Python imports polars
export POLARS_MAX_THREADS=1

Thread-safety design (when max_workers > 1)

  • FeatureCalculator instances are pre-created on the main thread before the executor starts. Their __init__ calls echo-postgres to validate the object/feature — pre-creating them sequentially avoids concurrent access to echo-postgres class-level state.
  • _calculate_single_object() is pure — it does not touch any shared handler state and only reads a pre-snapshotted immutable copy of the result cache.
  • _merge_result() runs after executor.shutdown() — all Polars and DB operations performed during result merging (filter, concat, save) happen after every worker thread has exited, eliminating GIL + Rayon contention between the main thread and workers.
  • Per-thread PerfDB connections — each thread gets its own lazily-created PerfDB instance via threading.local, so DB connections are never shared across threads.
  • Per-thread CalculationRequirement caches — each thread has its own cache dict (via threading.local) so cached DataFrames are never shared or mutated across threads.

In-Memory Result Cache

After each feature is calculated, its result (a Polars DataFrame with "object@feature" flat columns) is added to _cached_data. This is passed as cached_data to every subsequent calculator, allowing downstream features to read already-calculated values without a round trip to performance_db.

Example: if ActivePowerTheoretical_10min.AVG is calculated first, the soiling loss calculator that runs later can read it from the cache instead of querying the database.

Cache behavior:

  • The cache snapshot is taken once per (feature, object_model) batch before workers start — all workers see the same consistent snapshot.
  • New results become visible to the next batch after the merge step.
  • If memory usage (checked via psutil) exceeds MAX_MEMORY_USAGE (default 1024 MB, overridable via ECHO_ENERGYCALC_MAX_MEMORY_MB env var), all results are flushed to the database and the cache is cleared.

Save Modes

save_method Behavior When to use
"end" (default) Batches all features for an object and writes them in a single DB call once all features for that object are done. Most cases — more efficient, fewer DB write amplifications.
"each" Saves each feature result immediately after calculation. When you need live results in the database as quickly as possible.

The save_into parameter controls the target:

save_into Behavior
"all" (default) Save to performance_db and upload to Bazefield.
"performance_db" Save to performance_db only.
None Do not save — useful for testing or dry-runs.

Error Handling

All exceptions raised inside a single (object, feature) calculation are caught, logged, and stored in the ErrorDataSource returned by calculate(). A single failure never aborts the remaining calculations.

Errors are organized hierarchically:

Text Only
ErrorDataSource("server_calc")
└── ErrorObject("WT01")
    └── ErrorFeature("ActivePowerTheoretical_10min.AVG")
        └── <Exception>

Factory Methods

Instead of constructing CalculationHandler directly, prefer the factory class methods for common patterns:

from_type_defaults(object_type, object_filters, ...)

Looks up the default_feature_order setting in performance_db for the given object_type and creates a handler with features in the correct dependency order.

  • Use object_type="infer" to automatically detect the object type(s) from the provided object_filters and return one handler per type.
  • Use one_per_object=True to return one handler per individual object (useful for per-turbine Airflow tasks).
Python
# Calculate all default wind turbine features for an SPE, in correct dependency order
handler = CalculationHandler.from_type_defaults(
    object_type="wind_turbine",
    object_filters={"spe_names": ["SPE-01"]},
)
handler.calculate(period)

from_calc_types(calc_types, object_filters, ...)

Creates a handler for all features that have a specific server_calc_type. Useful for targeted recalculations.

Python
# Recalculate all theoretical power features for two turbines
handler = CalculationHandler.from_calc_types(
    calc_types=["theoretical_active_power"],
    object_filters={"object_names": ["WT01", "WT02"]},
)
handler.calculate(period)

Usage Examples

Standard usage (Airflow / production)

Python
from echo_datetimerange import DateTimeRange
from echo_energycalc import CalculationHandler

period = DateTimeRange("2024-01-01", "2024-01-31")

handler = CalculationHandler(
    features=["ActivePowerTheoretical_10min.AVG", "LostActivePower_10min.AVG"],
    object_filters={"spe_names": ["SPE-01"]},
)

# max_workers=1 is the default — safe in all environments including Airflow
errors = handler.calculate(
    period=period,
    save_into="performance_db",
    save_method="end",
)

if errors.has_errors():
    print(errors)

Parallel usage (outside Airflow, no eval-expression features)

Python
import os
os.environ["POLARS_MAX_THREADS"] = "1"  # must be set before importing polars

from echo_datetimerange import DateTimeRange
from echo_energycalc import CalculationHandler

handler = CalculationHandler(
    # Only non-expression feature types — safe to parallelise
    features=["ActivePowerTheoretical_10min.AVG"],
    object_filters={"spe_names": ["SPE-01"]},
)

errors = handler.calculate(
    period=DateTimeRange("2024-01-01", "2024-01-31"),
    max_workers=4,  # safe: POLARS_MAX_THREADS=1, not forked, no eval expressions
)

Warning

Do not use max_workers > 1 if any of the calculated features have server_calc_type = "expression_evaluation" and their expressions call echo-postgres functions via helpers such as filters.*. This will deadlock reliably. The root fix requires echo-postgres to make its class-level DataFrame caches thread-safe.


Class Definition

CalculationHandler(features, object_filters=None, regex_feature_names=False, ignored_features=None)

Class used to handle the calculation of features.

This will essentially be a wrapper for the FeatureCalculator classes, iterating object by object and feature by feature, performing the calculations and saving the results in performance_db.

An instance of this class should be created in most scripts that import data and them do some calculations to store in the database. Avoid using the FeatureCalculator classes directly.

Parameters:

  • features

    (str | list[str]) –

    List of features to be calculated. Can be a single feature as a string as well.

    If a list is passed, the features will be calculated in the order they are passed.

  • object_filters

    (dict[str, list[str]], default: None ) –

    A dict containing the keyword arguments that will be passed to the function perfdb.objects.instances.get(). These will be used to filter the objects that will be calculated.

    Most common arguments are object_names, object_types, object_models, parent_objects and spe_names.

    By default None, which means that all objects will be calculated.

  • regex_feature_names

    (bool, default: False ) –

    If True, the features will be treated as regular expressions. This is useful when you want to calculate all features that match a certain pattern.

    For example, if features=['^active.', '^reactive.'], it will return all features that start with 'active' or 'reactive'.

    By default False.

  • ignored_features

    (list[str] | None, default: None ) –

    List of features that should be ignored. These will not be calculated. Keep in mind that they are not regex patterns even if regex_feature_names is True.

    By default None.

Source code in echo_energycalc/calculation_handler.py
Python
def __init__(
    self,
    features: str | list[str],
    object_filters: dict[str, list[str]] | None = None,
    regex_feature_names: bool = False,
    ignored_features: list[str] | None = None,
) -> None:
    """
    Class used to handle the calculation of features.

    This will essentially be a wrapper for the FeatureCalculator classes, iterating object by object and feature by feature, performing the calculations and saving the results in performance_db.

    An instance of this class should be created in most scripts that import data and them do some calculations to store in the database. Avoid using the FeatureCalculator classes directly.

    Parameters
    ----------
    features : str | list[str]
        List of features to be calculated. Can be a single feature as a string as well.

        If a list is passed, the features will be calculated in the order they are passed.
    object_filters : dict[str, list[str]], optional
        A dict containing the keyword arguments that will be passed to the function perfdb.objects.instances.get(). These will be used to filter the objects that will be calculated.

        Most common arguments are `object_names`, `object_types`, `object_models`, `parent_objects` and `spe_names`.

        By default None, which means that all objects will be calculated.
    regex_feature_names : bool, optional
        If True, the features will be treated as regular expressions. This is useful when you want to calculate all features that match a certain pattern.

        For example, if features=['^active.*', '^reactive.*'], it will return all features that start with 'active' or 'reactive'.

        By default False.
    ignored_features : list[str] | None, optional
        List of features that should be ignored. These will not be calculated. Keep in mind that they are not regex patterns even if regex_feature_names is True.

        By default None.
    """
    if object_filters is None:
        object_filters = {}
    self._errors = ErrorDataSource("server_calc")

    # thread-local storage so each thread gets its own PerfDB connection
    self._tls = threading.local()

    # getting objects from performance_db — convert to polars with object_name as a column
    _objects = self._perfdb.objects.instances.get(
        **object_filters,
        output_type="pl.DataFrame",
        get_attributes=True,
        attribute_names=["calculation_disabled"],
    )
    _objects = _objects.rename({"name": "object_name"})
    self._objects: pl.DataFrame = _objects

    # enumerate features (delegated to _FeatureEnumeratorMixin)
    self._enumerate_features(features, regex_feature_names, ignored_features)
    if self._features is None:
        return

    # initialize result cache (delegated to _ResultCacheMixin)
    self._init_cache()

errors property

Errors that occurred during calculations.

Returns:

  • ErrorDataSource

    ErrorDataSource object with all the errors that occurred during calculations.

features property

Features that will be calculated for each object model.

Returns:

  • dict[str, list[str]]

    Dictionary mapping object model name to list of features that will be calculated for that object model.

objects property

List of objects for which the features will be calculated.

Returns:

  • list[str]

    List of objects for which the features will be calculated.

results property

Results of the calculations.

Returns:

  • dict[str, dict[str, FeatureCalculator]]

    Dict containing the results of the calculations for each object and feature.

    First level of keys are the object names and the second level of keys are the feature names.

    The values are FeatureCalculator objects with the results of the calculations.

calculate(period, save_into='all', save_method='end', free_up_memory=True, max_workers=1, timeout=600, **kwargs)

Calculate all features for all objects and optionally save results to the database.

Iterates over each feature in declaration order, then over each object model, then over each object, calling the appropriate :class:FeatureCalculator subclass for every (object, feature) pair. Errors are recorded per-object-per-feature and do not interrupt the remaining calculations.

Parameters:

  • period

    (DateTimeRange) –

    Desired calculation period.

  • save_into

    (Literal['all', 'performance_db'] | None, default: 'all' ) –

    Where to persist results:

    • "all": write to both performance_db and Bazefield.
    • "performance_db": write only to performance_db.
    • None: do not save (useful for testing or dry-runs).

    By default "all".

  • save_method

    (Literal['end', 'each'], default: 'end' ) –

    Controls when results are written to the database:

    • "end": accumulate all results in memory and save each object's features together once every feature for that object is done. More efficient (fewer DB round-trips) but uses more RAM.
    • "each": save after every individual (object, feature) calculation. Results appear in the database sooner but generate more DB traffic.

    By default "end".

  • free_up_memory

    (bool, default: True ) –

    When True (default), discard each object's in-memory results immediately after saving to reduce peak RAM usage. Set to False only if you need to access :attr:results after the call.

  • max_workers

    (int | None, default: 1 ) –

    Number of objects to calculate in parallel within each (feature, model) batch. 1 (the default) runs sequentially and is the recommended setting for production use.

    Why 1 is the default — threading caveats

    Parallelism here is achieved with :class:~concurrent.futures.ThreadPoolExecutor. Threads share the process GIL and the Polars Rayon thread pool, which creates two independent deadlock risks:

    1. Rayon work-stealing + GIL re-entry. Polars uses a single global Rayon thread pool (POOL). With POLARS_MAX_THREADS > 1, a Rayon worker executing on behalf of thread A can steal a Python-callable task submitted by thread B and then block on PyGILState_Ensure while thread A still holds the GIL — a circular wait. Setting POLARS_MAX_THREADS=1 prevents work-stealing across POOL.install() boundaries and eliminates this risk. The guard in this method enforces that and falls back to max_workers=1 if the condition is not met.

    2. echo-postgres class-level shared state. Several echo-postgres functions (feature_definitions.get, object_instances.get, cast_attributes, etc.) cache DataFrames at the class level and perform concurrent Polars/pandas operations on them when called from multiple threads simultaneously. This deadlocks regardless of POLARS_MAX_THREADS. The most common trigger is eval-expression features (server_calc_type = "expression_evaluation") whose expression strings call helper functions such as filters.power_curve_offset_filter that query echo-postgres internally — these calls happen inside _safe_exec at compute time and cannot be pre-fetched on the main thread.

    When max_workers > 1 is safe

    Outside a forked environment (i.e. not inside an Airflow Celery worker) and with POLARS_MAX_THREADS=1, parallelism is safe if and only if none of the features being calculated use eval expressions that call echo-postgres functions at compute time. Features whose server_calc_type is anything other than "expression_evaluation" are generally safe because their _compute methods only perform pure Polars operations on thread-local DataFrames.

    Automatic fallbacks

    The method reduces max_workers to 1 and emits a :func:warnings.warn when either of the following is detected:

    • POLARS_MAX_THREADS > 1 (Rayon work-stealing risk).
    • The process was forked after module import (e.g. Airflow/billiard Celery workers detected via :func:os.register_at_fork).

    None lets :class:~concurrent.futures.ThreadPoolExecutor choose min(32, os.cpu_count() + 4) workers automatically.

    Default is 1.

  • timeout

    (float | None, default: 600 ) –

    Maximum seconds to wait for all objects in a single (feature, model) batch. When the deadline is reached, objects that have not yet produced a result are recorded as :class:~echo_errorsummary.ErrorFeature entries and the calculation moves on to the next batch. Running threads are released without blocking (they cannot be interrupted in Python but will no longer delay progress). None disables the timeout. Default is 600.

  • **kwargs

    (dict, default: {} ) –

    Forwarded to :meth:_save_obj_results and :meth:_save_all_obj_results.

Returns:

  • ErrorDataSource

    Nested error report with one :class:~echo_errorsummary.ErrorObject per object and one :class:~echo_errorsummary.ErrorFeature per (object, feature) pair that raised an exception. Successful calculations produce empty children with no exceptions attached.

Source code in echo_energycalc/calculation_handler.py
Python
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = "all",
    save_method: Literal["end", "each"] = "end",
    free_up_memory: bool = True,
    max_workers: int | None = 1,
    timeout: float | None = 600,
    **kwargs,
) -> ErrorDataSource:
    """
    Calculate all features for all objects and optionally save results to the database.

    Iterates over each feature in declaration order, then over each object model, then
    over each object, calling the appropriate :class:`FeatureCalculator` subclass for
    every (object, feature) pair.  Errors are recorded per-object-per-feature and do
    not interrupt the remaining calculations.

    Parameters
    ----------
    period : DateTimeRange
        Desired calculation period.
    save_into : Literal["all", "performance_db"] | None, optional
        Where to persist results:

        - ``"all"``: write to both ``performance_db`` and Bazefield.
        - ``"performance_db"``: write only to ``performance_db``.
        - ``None``: do not save (useful for testing or dry-runs).

        By default ``"all"``.
    save_method : Literal["end", "each"], optional
        Controls when results are written to the database:

        - ``"end"``: accumulate all results in memory and save each object's
          features together once every feature for that object is done.  More
          efficient (fewer DB round-trips) but uses more RAM.
        - ``"each"``: save after every individual (object, feature) calculation.
          Results appear in the database sooner but generate more DB traffic.

        By default ``"end"``.
    free_up_memory : bool, optional
        When ``True`` (default), discard each object's in-memory results
        immediately after saving to reduce peak RAM usage.  Set to ``False``
        only if you need to access :attr:`results` after the call.
    max_workers : int | None, optional
        Number of objects to calculate in parallel within each (feature, model)
        batch.  ``1`` (the default) runs sequentially and is the recommended
        setting for production use.

        **Why 1 is the default — threading caveats**

        Parallelism here is achieved with :class:`~concurrent.futures.ThreadPoolExecutor`.
        Threads share the process GIL and the Polars Rayon thread pool, which
        creates two independent deadlock risks:

        1. **Rayon work-stealing + GIL re-entry.**  Polars uses a single global
           Rayon thread pool (``POOL``).  With ``POLARS_MAX_THREADS > 1``, a
           Rayon worker executing on behalf of thread A can steal a Python-callable
           task submitted by thread B and then block on ``PyGILState_Ensure`` while
           thread A still holds the GIL — a circular wait.  Setting
           ``POLARS_MAX_THREADS=1`` prevents work-stealing across
           ``POOL.install()`` boundaries and eliminates this risk.  The guard in
           this method enforces that and falls back to ``max_workers=1`` if the
           condition is not met.

        2. **echo-postgres class-level shared state.**  Several ``echo-postgres``
           functions (``feature_definitions.get``, ``object_instances.get``,
           ``cast_attributes``, etc.) cache DataFrames at the class level and
           perform concurrent Polars/pandas operations on them when called from
           multiple threads simultaneously.  This deadlocks regardless of
           ``POLARS_MAX_THREADS``.  The most common trigger is eval-expression
           features (``server_calc_type = "expression_evaluation"``) whose
           expression strings call helper functions such as
           ``filters.power_curve_offset_filter`` that query ``echo-postgres``
           internally — these calls happen inside ``_safe_exec`` at compute time
           and cannot be pre-fetched on the main thread.

        **When ``max_workers > 1`` is safe**

        Outside a forked environment (i.e. not inside an Airflow Celery worker)
        and with ``POLARS_MAX_THREADS=1``, parallelism is safe *if and only if*
        none of the features being calculated use eval expressions that call
        ``echo-postgres`` functions at compute time.  Features whose
        ``server_calc_type`` is anything other than ``"expression_evaluation"``
        are generally safe because their ``_compute`` methods only perform pure
        Polars operations on thread-local DataFrames.

        **Automatic fallbacks**

        The method reduces ``max_workers`` to ``1`` and emits a
        :func:`warnings.warn` when either of the following is detected:

        - ``POLARS_MAX_THREADS > 1`` (Rayon work-stealing risk).
        - The process was forked after module import (e.g. Airflow/billiard
          Celery workers detected via :func:`os.register_at_fork`).

        ``None`` lets :class:`~concurrent.futures.ThreadPoolExecutor` choose
        ``min(32, os.cpu_count() + 4)`` workers automatically.

        Default is ``1``.
    timeout : float | None, optional
        Maximum seconds to wait for all objects in a single (feature, model)
        batch.  When the deadline is reached, objects that have not yet produced
        a result are recorded as :class:`~echo_errorsummary.ErrorFeature` entries
        and the calculation moves on to the next batch.  Running threads are
        released without blocking (they cannot be interrupted in Python but will
        no longer delay progress).  ``None`` disables the timeout.
        Default is ``600``.
    **kwargs : dict, optional
        Forwarded to :meth:`_save_obj_results` and :meth:`_save_all_obj_results`.

    Returns
    -------
    ErrorDataSource
        Nested error report with one :class:`~echo_errorsummary.ErrorObject` per
        object and one :class:`~echo_errorsummary.ErrorFeature` per
        (object, feature) pair that raised an exception.  Successful calculations
        produce empty children with no exceptions attached.
    """
    if self._features is None:
        return self._errors

    if max_workers is not None and max_workers > 1:
        if pl.thread_pool_size() > 1:
            message = (
                f"POLARS_MAX_THREADS is {pl.thread_pool_size()} — parallel execution requires it to be 1. "
                f"With multiple Rayon threads, work-stealing across concurrent POOL.install() calls can "
                f"cause GIL + Rayon circular waits. "
                f"Set POLARS_MAX_THREADS=1 before importing polars. Falling back to max_workers=1."
            )
            warnings.warn(
                message,
                stacklevel=2,
            )
            logger.warning(message)
            max_workers = 1
        elif _FORKED:
            message = (
                "Process has been forked (e.g. Airflow Celery). echo-postgres uses class-level cached state that is not safe to access from multiple threads after a fork. "
                "Falling back to max_workers=1."
            )
            warnings.warn(
                message,
                stacklevel=2,
            )
            logger.warning(message)
            max_workers = 1

    # resetting calculation history
    self._reset_calc_history()

    # iterating over calculated features
    features_list = self._features["name"].unique(maintain_order=True).to_list()
    for feature in features_list:
        # iterating over object models
        obj_models = self._features.filter(pl.col("name") == feature)["object_model_name"].unique().to_list()
        for obj_model in obj_models:
            selected_objects = self._objects.filter(pl.col("object_model_name") == obj_model)["object_name"].to_list()

            # Snapshot the cache once for the whole (feature, obj_model) batch.
            _snapshot = self._get_cached_data() if save_method == "end" else None
            cached_data_snapshot = _snapshot if (_snapshot is not None and not _snapshot.is_empty()) else None
            calc_save_into = save_into if save_method == "each" else None

            # Pre-create all FeatureCalculator instances sequentially on the main
            # thread.  FeatureCalculator.__init__ calls echo-postgres internally and
            # that library has class-level cached state that deadlocks under concurrent
            # access even when each thread holds its own PerfDB instance.  Doing this
            # here means worker threads only run .calculate() — no DB calls in __init__.
            pre_created: dict[str, tuple[FeatureCalculator | None, Exception | None]] = {}
            for _obj in selected_objects:
                try:
                    pre_created[_obj] = (self._create_feature_calc(_obj, obj_model, feature), None)
                except Exception as _e:
                    pre_created[_obj] = (None, _e)

            _lock = threading.Lock()

            def _merge_result(res: dict, *, _feature: str = feature) -> None:
                """Apply one object's result to shared handler state. Must run under _lock."""
                _obj = res["obj"]
                _fc = res["feature_calc"]
                _exc = res["exception"]

                self._errors.add_child(ErrorObject(name=_obj))
                self._errors.children[_obj].add_child(ErrorFeature(name=_feature))
                if _exc is not None:
                    self._errors.children[_obj].children[_feature].add_exception(_exc)

                if _fc is not None:
                    self._add_result(object_name=_obj, feature_name=_feature, result=_fc)

                for extra_feat in res["extra_features"]:
                    if not self._features.filter(
                        (pl.col("object_name") == _obj) & (pl.col("name") == extra_feat),
                    ).is_empty():
                        continue
                    new_row = pl.DataFrame(
                        {
                            "object_name": [_obj],
                            "name": [extra_feat],
                            "object_model_name": [None],
                            "data_source_type_name": [None],
                            "server_calc_type": [None],
                            "finished_calc": [True],
                            "cached": [False],
                        },
                        schema=self._features.schema,
                    )
                    self._features = pl.concat([self._features, new_row], how="diagonal")

                self._features = self._features.with_columns(
                    pl.when(
                        (pl.col("name") == _feature) & (pl.col("object_name") == _obj),
                    )
                    .then(statement=True)
                    .otherwise(pl.col("finished_calc"))
                    .alias("finished_calc"),
                )

                if save_method == "end":
                    try:
                        if psutil.Process().memory_info().rss / 1024**2 > MAX_MEMORY_USAGE:
                            self._save_all_obj_results(save_into=save_into, free_up_memory=free_up_memory, **kwargs)
                            self._remove_cached_data()
                        elif self._features.filter(
                            (pl.col("object_name") == _obj) & (~pl.col("finished_calc")),
                        ).is_empty():
                            self._save_obj_results(object_name=_obj, save_into=save_into, free_up_memory=free_up_memory, **kwargs)
                    except Exception as e:
                        message = f"'{_obj}' Error while saving results to database."
                        logger.exception(message)
                        self._errors.children[_obj].add_exception(e)

                if _fc is not None and _fc.result is not None:
                    t0, t1, t2, t3 = res["t0"], res["t1"], res["t2"], res["t3"]
                    message = f"{_fc} - {period=} - Pre {t1 - t0:.2f}s, Cache {t2 - t1:.2f}s, Calc {t3 - t2:.2f}s, Post {perf_counter() - t3:.2f}s, Total {perf_counter() - t0:.2f}s."
                    logger.info(message)

            if max_workers == 1 or len(selected_objects) == 1:
                # Sequential fast path — no executor overhead
                for obj in selected_objects:
                    fc, fc_exc = pre_created[obj]
                    res = self._calculate_single_object(
                        obj,
                        obj_model,
                        feature,
                        period,
                        features_list,
                        selected_objects,
                        obj_models,
                        cached_data_snapshot,
                        calc_save_into,
                        fc,
                        fc_exc,
                        **kwargs,
                    )
                    _merge_result(res)
            else:
                executor = ThreadPoolExecutor(max_workers=max_workers)
                futures = {
                    executor.submit(
                        self._calculate_single_object,
                        obj,
                        obj_model,
                        feature,
                        period,
                        features_list,
                        selected_objects,
                        obj_models,
                        cached_data_snapshot,
                        calc_save_into,
                        pre_created[obj][0],
                        pre_created[obj][1],
                        **kwargs,
                    ): obj
                    for obj in selected_objects
                }
                completed_results: list[dict] = []
                try:
                    for fut in as_completed(futures, timeout=timeout):
                        # Only collect here — no _merge_result while workers are
                        # alive.  _merge_result calls Polars and echo-postgres
                        # (to_pandas, filter, concat, DB save), which deadlocks
                        # with concurrent Polars ops in worker threads under
                        # POLARS_MAX_THREADS=1 (GIL + Rayon circular wait).
                        completed_results.append(fut.result())  # noqa: PERF401
                except FutureTimeoutError:
                    # Release the executor without waiting for hung threads, then
                    # record a timeout error for every object that never finished.
                    executor.shutdown(wait=False, cancel_futures=True)
                    for fut, obj in futures.items():
                        if not fut.done():
                            message = f"'{obj}' - '{feature}': Calculation timed out after {timeout:.0f}s."
                            logger.error(message)
                            cprint(message, "red")
                            self._errors.add_child(ErrorObject(name=obj))
                            self._errors.children[obj].add_child(ErrorFeature(name=feature))
                            self._errors.children[obj].children[feature].add_exception(
                                FutureTimeoutError(message),
                            )

                # All workers have exited — safe to call _merge_result now.
                for res in completed_results:
                    _merge_result(res)

    return self._errors

from_calc_types(calc_types, object_filters=None, one_per_object=False) classmethod

Class method that returns a CalculationHandler for all features of the desired server_calc_types.

Parameters:

  • calc_types

    (list[str]) –

    Desired calculation types, as defined in server_calc_type feature attribute.

  • object_filters

    (dict[str, list[str]] | None, default: None ) –

    A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.

    Most common arguments are object_names, object_types, "object_models,parent_objectsandspe_names`.

    By default None, which means that all objects will be calculated.

  • one_per_object

    (bool, default: False ) –

    If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.

Returns:

  • CalculationHandler | list[CalculationHandler]

    CalculationHandler object with the features for the desired calculation types.

    If one_per_object is True, a list of CalculationHandler objects will be returned, one for each object found.

Source code in echo_energycalc/calculation_handler.py
Python
@classmethod
def from_calc_types(
    cls,
    calc_types: list[str],
    object_filters: dict[str, list[str]] | None = None,
    one_per_object: bool = False,
) -> Self | list[Self]:
    """
    Class method that returns a CalculationHandler for all features of the desired `server_calc_types`.

    Parameters
    ----------
    calc_types : list[str]
        Desired calculation types, as defined in `server_calc_type` feature attribute.
    object_filters : dict[str, list[str]] | None, optional
        A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.

        Most common arguments are `object_names`, `object_types`, "object_models`, `parent_objects` and `spe_names`.

        By default None, which means that all objects will be calculated.
    one_per_object : bool, optional
        If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.

    Returns
    -------
    CalculationHandler | list[CalculationHandler]
        CalculationHandler object with the features for the desired calculation types.

        If one_per_object is True, a list of CalculationHandler objects will be returned, one for each object found.
    """
    if object_filters is None:
        object_filters = {}
    perfdb = PerfDB(application_name="CalculationHandler")

    _objs = perfdb.objects.instances.get(**object_filters, output_type="pl.DataFrame")
    _objs = _objs.rename({"name": "object_name"})
    models = _objs["object_model_name"].unique().to_list()

    model_features_list = []
    for calc_type in calc_types:
        this_model_features = perfdb.features.definitions.get(
            object_models=models,
            attributes={"server_calc_type": calc_type},
            output_type="pl.DataFrame",
        )
        model_features_list.append(this_model_features)

    model_features = pl.concat(model_features_list, rechunk=True)
    feature_names = model_features["name"].to_list()

    if not one_per_object:
        return CalculationHandler(
            features=feature_names,
            object_filters=object_filters,
        )

    obj_to_model = dict(zip(_objs["object_name"].to_list(), _objs["object_model_name"].to_list(), strict=False))
    model_to_features = {
        model: model_features.filter(pl.col("object_model_name") == model)["name"].to_list()
        for model in _objs["object_model_name"].unique().to_list()
    }
    handlers = []
    for obj_name in _objs["object_name"].to_list():
        obj_model = obj_to_model[obj_name]
        handlers.append(
            CalculationHandler(
                features=model_to_features[obj_model],
                object_filters={"object_names": [obj_name]},
            ),
        )

    return handlers

from_type_defaults(object_type='infer', object_filters=None, ignored_features=None, one_per_object=False) classmethod

Class method that returns a CalculationHandler object with the features that should be calculated for each object type, in the correct order.

The type defaults are defined in the default_feature_order setting in performance database

Parameters:

  • object_type

    (str, default: 'infer' ) –

    Object type that will be used to get the features that should be calculated.

    If "infer" is used, the object type will be inferred from the object_filters. This will also cause the method to return a list of CalculationHandler objects, one for each object type found.

    Default is "infer".

  • object_filters

    (dict[str, list[str]] | None, default: None ) –

    A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.

    Most common arguments are object_names, object_types, "object_models,parent_objectsandspe_names`.

    By default None, which means that all objects will be calculated.

  • ignored_features

    (list[str] | None, default: None ) –

    List of features that should be ignored. These will not be calculated. By default None.

  • one_per_object

    (bool, default: False ) –

    If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.

Returns:

  • CalculationHandler | list[CalculationHandler]

    CalculationHandler object with the features that should be calculated for the desired object type, in the correct order.

    If object_type is "infer" or one_per_object is True, a list of CalculationHandler objects will be returned, one for each object type or object found.

Source code in echo_energycalc/calculation_handler.py
Python
@classmethod
def from_type_defaults(
    cls,
    object_type: str = "infer",
    object_filters: dict[str, list[str]] | None = None,
    ignored_features: list[str] | None = None,
    one_per_object: bool = False,
) -> Self | list[Self]:
    """
    Class method that returns a CalculationHandler object with the features that should be calculated for each object type, in the correct order.

    The type defaults are defined in the default_feature_order setting in performance database

    Parameters
    ----------
    object_type : str, optional
        Object type that will be used to get the features that should be calculated.

        If "infer" is used, the object type will be inferred from the object_filters. This will also cause the method to return a list of CalculationHandler objects, one for each object type found.

        Default is "infer".
    object_filters : dict[str, list[str]] | None, optional
        A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.

        Most common arguments are `object_names`, `object_types`, "object_models`, `parent_objects` and `spe_names`.

        By default None, which means that all objects will be calculated.
    ignored_features : list[str] | None, optional
        List of features that should be ignored. These will not be calculated.
        By default  None.
    one_per_object : bool, optional
        If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.

    Returns
    -------
    CalculationHandler | list[CalculationHandler]
        CalculationHandler object with the features that should be calculated for the desired object type, in the correct order.

        If object_type is "infer" or one_per_object is True, a list of CalculationHandler objects will be returned, one for each object type or object found.
    """
    perfdb = PerfDB(application_name="CalculationHandler")
    if ignored_features is None:
        ignored_features = []
    if object_filters is None:
        object_filters = {}

    if object_type == "infer" or one_per_object:
        _objects_df = perfdb.objects.instances.get(**object_filters, output_type="pl.DataFrame")
        _objects_df = _objects_df.rename({"name": "object_name"})
        obj_list = _objects_df["object_name"].to_list()
        # group objects by type
        objects_by_type: dict[str, list[str]] = {}
        for row in _objects_df.select(["object_name", "object_type_name"]).iter_rows():
            objects_by_type.setdefault(row[1], []).append(row[0])
        object_types = list(objects_by_type.keys())

        logger.info(
            f"Object type inferred as the following: \n'{jsbeautifier.beautify(json.dumps(objects_by_type, sort_keys=True), js_options)}'.",
        )
        obj_to_type = dict(zip(_objects_df["object_name"].to_list(), _objects_df["object_type_name"].to_list(), strict=False))
    else:
        object_types = [object_type]

    # getting feature order from database
    feature_order = perfdb.settings.get()["feature_calculation"]["default_feature_order"]["value"]

    # checking if object_type is defined in feature_order
    missing_object_types = set(object_types) - set(feature_order.keys())
    if missing_object_types:
        raise ValueError(f"Object types '{missing_object_types}' are not defined in default_feature_order in database.")

    if object_type != "infer":
        if all(len(value) == 0 for value in object_filters.values()):
            object_filters["object_types"] = [object_type]
        if not one_per_object:
            handler = CalculationHandler(
                features=feature_order[object_type],
                object_filters=object_filters,
                regex_feature_names=True,
                ignored_features=ignored_features,
            )
        else:
            handler = []
            for obj_name in obj_list:
                obj_type = obj_to_type[obj_name]
                handler.append(
                    CalculationHandler(
                        features=feature_order[obj_type],
                        object_filters={"object_names": [obj_name]},
                        regex_feature_names=True,
                        ignored_features=ignored_features,
                    ),
                )
    else:
        handler = []
        if not one_per_object:
            for obj_type in object_types:
                handler.append(
                    CalculationHandler(
                        features=feature_order[obj_type],
                        object_filters={"object_names": objects_by_type[obj_type]},
                        regex_feature_names=True,
                        ignored_features=ignored_features,
                    ),
                )
        else:
            for obj_name in obj_list:
                obj_type = obj_to_type[obj_name]
                handler.append(
                    CalculationHandler(
                        features=feature_order[obj_type],
                        object_filters={"object_names": [obj_name]},
                        regex_feature_names=True,
                        ignored_features=ignored_features,
                    ),
                )

    return handler