Calculation Handler¶
Overview¶
CalculationHandler is the main orchestrator for running feature calculations. It discovers which (object, feature) pairs need to be calculated, instantiates the correct FeatureCalculator subclass for each pair, executes the calculations, manages an in-memory result cache to avoid redundant database queries, and persists results back to performance_db.
It is used by the Airflow DAG feature-calculator and in ad-hoc scripts whenever batch calculations need to be triggered programmatically.
Note
Prefer CalculationHandler over using FeatureCalculator classes directly — it handles ordering, caching, error collection, memory management, and batched database writes that individual calculators cannot do on their own.
Internal Architecture¶
CalculationHandler is built from three mixins plus the main class:
CalculationHandler
├── _FeatureEnumeratorMixin — discovers (object, feature) pairs from performance_db
├── _ResultCacheMixin — manages in-memory cache of results as flat "obj@feature" columns
└── _ResultPersisterMixin — batches results and writes to performance_db
Construction and Feature Enumeration¶
When CalculationHandler is instantiated:
- Connects to performance_db.
- Queries all objects matching the provided
object_filters(e.g.,object_types=["wind_turbine"]). - Queries all feature definitions for those objects' models, filtering to
data_source_type = "server_calc"and fetching theserver_calc_typeattribute. - Joins objects × features to produce the full list of
(object, feature)pairs to calculate. - Preserves the requested feature order — features are calculated in the exact order they appear in the
featuresargument. This is critical for chained calculations where feature B depends on feature A. - Optionally filters out
ignored_features. - Objects with a
calculation_disabledattribute set to"all"or listing a specific feature name are skipped for those features at calculation time.
Execution Flow¶
calculate(period, ...)
│
├── For each feature (in request order):
│ ├── For each object model that has this feature:
│ │ ├── Snapshot current in-memory cache (once per batch)
│ │ │
│ │ ├── [main thread] Pre-create all FeatureCalculator instances sequentially
│ │ │ (avoids concurrent echo-postgres DB calls in __init__)
│ │ │
│ │ ├── [sequential or ThreadPoolExecutor if max_workers > 1]
│ │ │ For each object of this model:
│ │ │ 1. Check calculation_disabled attribute — skip if matched
│ │ │ 2. Use pre-created FeatureCalculator instance
│ │ │ 3. calculator.calculate(period, cached_data=snapshot, ...)
│ │ │ 4. Return result dict without touching shared state
│ │ │
│ │ └── [after all workers exit] Merge each result into shared state:
│ │ - Add to in-memory result store
│ │ - Log any exception to ErrorDataSource
│ │ - Update feature completion tracking
│ │ - If save_method="end": save all features of a completed object in one DB write
│ │ - If memory > MAX_MEMORY_USAGE: flush all results and clear cache
│
└── Return ErrorDataSource with all collected errors
Parallel Execution¶
Within each (feature, object_model) batch, multiple objects can be calculated concurrently using ThreadPoolExecutor. The number of concurrent workers is controlled by the max_workers parameter (default: 1 — sequential).
Why the default is sequential¶
Threading in this codebase runs into two independent deadlock mechanisms that make max_workers > 1 unreliable in production environments:
1. Rayon work-stealing + GIL re-entry¶
Polars uses a single global Rayon thread pool (POOL). When a Python thread calls any Polars operation, it calls POOL.install(closure) which submits work to all available Rayon threads via work-stealing. With POLARS_MAX_THREADS > 1, a Rayon worker executing on behalf of thread A can steal a Python-callable task from thread B's install() scope and then block on PyGILState_Ensure while thread A still holds the GIL — a circular wait.
Setting POLARS_MAX_THREADS=1 prevents cross-install() work-stealing and eliminates this risk. The calculate() method enforces this and falls back to max_workers=1 with a warning if the condition is not met.
Warning
Increasing POLARS_MAX_THREADS to match or exceed max_workers does not fix this. Rayon's work-stealing is non-deterministic — a larger pool reduces the probability of cross-contamination but does not eliminate it. POLARS_MAX_THREADS=1 is the only safe setting.
2. echo-postgres class-level shared state¶
Several echo-postgres functions cache DataFrames at the class level and perform Polars/pandas operations on those shared objects when called from multiple threads simultaneously. Operations like join, to_pandas, cast_attributes, and merge on the same DataFrame instance from concurrent threads deadlock regardless of POLARS_MAX_THREADS.
The most common trigger is eval-expression features (server_calc_type = "expression_evaluation"). Their expression strings can call helper functions such as filters.power_curve_offset_filter that query echo-postgres internally — these calls happen inside the restricted exec() at compute time and cannot be pre-fetched on the main thread. With 4 workers all running the same expression type simultaneously, they all hit echo_postgres.object_instances.get() concurrently and deadlock.
Automatic fallbacks¶
calculate() automatically reduces max_workers to 1 and emits a warnings.warn when either of the following is detected at call time:
| Condition | Detection mechanism |
|---|---|
POLARS_MAX_THREADS > 1 |
pl.thread_pool_size() > 1 |
| Process was forked (e.g. Airflow Celery) | os.register_at_fork flag set at fork time |
The fork detection uses os.register_at_fork(after_in_child=...) rather than PID comparison, which correctly catches Airflow Celery workers even when echo-energycalc is imported after the fork.
When max_workers > 1 is safe¶
Outside a forked environment and with POLARS_MAX_THREADS=1, parallelism is safe if and only if none of the features being calculated use eval expressions that call echo-postgres functions at compute time. Features whose server_calc_type is anything other than "expression_evaluation" are generally safe because their _compute methods perform pure Polars operations on thread-local DataFrames.
# Required environment variable — must be set before Python imports polars
export POLARS_MAX_THREADS=1
Thread-safety design (when max_workers > 1)¶
FeatureCalculatorinstances are pre-created on the main thread before the executor starts. Their__init__callsecho-postgresto validate the object/feature — pre-creating them sequentially avoids concurrent access toecho-postgresclass-level state._calculate_single_object()is pure — it does not touch any shared handler state and only reads a pre-snapshotted immutable copy of the result cache._merge_result()runs afterexecutor.shutdown()— all Polars and DB operations performed during result merging (filter, concat, save) happen after every worker thread has exited, eliminating GIL + Rayon contention between the main thread and workers.- Per-thread
PerfDBconnections — each thread gets its own lazily-createdPerfDBinstance viathreading.local, so DB connections are never shared across threads. - Per-thread
CalculationRequirementcaches — each thread has its own cache dict (viathreading.local) so cached DataFrames are never shared or mutated across threads.
In-Memory Result Cache¶
After each feature is calculated, its result (a Polars DataFrame with "object@feature" flat columns) is added to _cached_data. This is passed as cached_data to every subsequent calculator, allowing downstream features to read already-calculated values without a round trip to performance_db.
Example: if ActivePowerTheoretical_10min.AVG is calculated first, the soiling loss calculator that runs later can read it from the cache instead of querying the database.
Cache behavior:
- The cache snapshot is taken once per
(feature, object_model)batch before workers start — all workers see the same consistent snapshot. - New results become visible to the next batch after the merge step.
- If memory usage (checked via
psutil) exceedsMAX_MEMORY_USAGE(default 1024 MB, overridable viaECHO_ENERGYCALC_MAX_MEMORY_MBenv var), all results are flushed to the database and the cache is cleared.
Save Modes¶
save_method |
Behavior | When to use |
|---|---|---|
"end" (default) |
Batches all features for an object and writes them in a single DB call once all features for that object are done. | Most cases — more efficient, fewer DB write amplifications. |
"each" |
Saves each feature result immediately after calculation. | When you need live results in the database as quickly as possible. |
The save_into parameter controls the target:
save_into |
Behavior |
|---|---|
"all" (default) |
Save to performance_db and upload to Bazefield. |
"performance_db" |
Save to performance_db only. |
None |
Do not save — useful for testing or dry-runs. |
Error Handling¶
All exceptions raised inside a single (object, feature) calculation are caught, logged, and stored in the ErrorDataSource returned by calculate(). A single failure never aborts the remaining calculations.
Errors are organized hierarchically:
ErrorDataSource("server_calc")
└── ErrorObject("WT01")
└── ErrorFeature("ActivePowerTheoretical_10min.AVG")
└── <Exception>
Factory Methods¶
Instead of constructing CalculationHandler directly, prefer the factory class methods for common patterns:
from_type_defaults(object_type, object_filters, ...)¶
Looks up the default_feature_order setting in performance_db for the given object_type and creates a handler with features in the correct dependency order.
- Use
object_type="infer"to automatically detect the object type(s) from the providedobject_filtersand return one handler per type. - Use
one_per_object=Trueto return one handler per individual object (useful for per-turbine Airflow tasks).
# Calculate all default wind turbine features for an SPE, in correct dependency order
handler = CalculationHandler.from_type_defaults(
object_type="wind_turbine",
object_filters={"spe_names": ["SPE-01"]},
)
handler.calculate(period)
from_calc_types(calc_types, object_filters, ...)¶
Creates a handler for all features that have a specific server_calc_type. Useful for targeted recalculations.
# Recalculate all theoretical power features for two turbines
handler = CalculationHandler.from_calc_types(
calc_types=["theoretical_active_power"],
object_filters={"object_names": ["WT01", "WT02"]},
)
handler.calculate(period)
Usage Examples¶
Standard usage (Airflow / production)¶
from echo_datetimerange import DateTimeRange
from echo_energycalc import CalculationHandler
period = DateTimeRange("2024-01-01", "2024-01-31")
handler = CalculationHandler(
features=["ActivePowerTheoretical_10min.AVG", "LostActivePower_10min.AVG"],
object_filters={"spe_names": ["SPE-01"]},
)
# max_workers=1 is the default — safe in all environments including Airflow
errors = handler.calculate(
period=period,
save_into="performance_db",
save_method="end",
)
if errors.has_errors():
print(errors)
Parallel usage (outside Airflow, no eval-expression features)¶
import os
os.environ["POLARS_MAX_THREADS"] = "1" # must be set before importing polars
from echo_datetimerange import DateTimeRange
from echo_energycalc import CalculationHandler
handler = CalculationHandler(
# Only non-expression feature types — safe to parallelise
features=["ActivePowerTheoretical_10min.AVG"],
object_filters={"spe_names": ["SPE-01"]},
)
errors = handler.calculate(
period=DateTimeRange("2024-01-01", "2024-01-31"),
max_workers=4, # safe: POLARS_MAX_THREADS=1, not forked, no eval expressions
)
Warning
Do not use max_workers > 1 if any of the calculated features have server_calc_type = "expression_evaluation" and their expressions call echo-postgres functions via helpers such as filters.*. This will deadlock reliably. The root fix requires echo-postgres to make its class-level DataFrame caches thread-safe.
Class Definition¶
CalculationHandler(features, object_filters=None, regex_feature_names=False, ignored_features=None)
¶
Class used to handle the calculation of features.
This will essentially be a wrapper for the FeatureCalculator classes, iterating object by object and feature by feature, performing the calculations and saving the results in performance_db.
An instance of this class should be created in most scripts that import data and them do some calculations to store in the database. Avoid using the FeatureCalculator classes directly.
Parameters:
-
(features¶str | list[str]) –List of features to be calculated. Can be a single feature as a string as well.
If a list is passed, the features will be calculated in the order they are passed.
-
(object_filters¶dict[str, list[str]], default:None) –A dict containing the keyword arguments that will be passed to the function perfdb.objects.instances.get(). These will be used to filter the objects that will be calculated.
Most common arguments are
object_names,object_types,object_models,parent_objectsandspe_names.By default None, which means that all objects will be calculated.
-
(regex_feature_names¶bool, default:False) –If True, the features will be treated as regular expressions. This is useful when you want to calculate all features that match a certain pattern.
For example, if features=['^active.', '^reactive.'], it will return all features that start with 'active' or 'reactive'.
By default False.
-
(ignored_features¶list[str] | None, default:None) –List of features that should be ignored. These will not be calculated. Keep in mind that they are not regex patterns even if regex_feature_names is True.
By default None.
Source code in echo_energycalc/calculation_handler.py
def __init__(
self,
features: str | list[str],
object_filters: dict[str, list[str]] | None = None,
regex_feature_names: bool = False,
ignored_features: list[str] | None = None,
) -> None:
"""
Class used to handle the calculation of features.
This will essentially be a wrapper for the FeatureCalculator classes, iterating object by object and feature by feature, performing the calculations and saving the results in performance_db.
An instance of this class should be created in most scripts that import data and them do some calculations to store in the database. Avoid using the FeatureCalculator classes directly.
Parameters
----------
features : str | list[str]
List of features to be calculated. Can be a single feature as a string as well.
If a list is passed, the features will be calculated in the order they are passed.
object_filters : dict[str, list[str]], optional
A dict containing the keyword arguments that will be passed to the function perfdb.objects.instances.get(). These will be used to filter the objects that will be calculated.
Most common arguments are `object_names`, `object_types`, `object_models`, `parent_objects` and `spe_names`.
By default None, which means that all objects will be calculated.
regex_feature_names : bool, optional
If True, the features will be treated as regular expressions. This is useful when you want to calculate all features that match a certain pattern.
For example, if features=['^active.*', '^reactive.*'], it will return all features that start with 'active' or 'reactive'.
By default False.
ignored_features : list[str] | None, optional
List of features that should be ignored. These will not be calculated. Keep in mind that they are not regex patterns even if regex_feature_names is True.
By default None.
"""
if object_filters is None:
object_filters = {}
self._errors = ErrorDataSource("server_calc")
# thread-local storage so each thread gets its own PerfDB connection
self._tls = threading.local()
# getting objects from performance_db — convert to polars with object_name as a column
_objects = self._perfdb.objects.instances.get(
**object_filters,
output_type="pl.DataFrame",
get_attributes=True,
attribute_names=["calculation_disabled"],
)
_objects = _objects.rename({"name": "object_name"})
self._objects: pl.DataFrame = _objects
# enumerate features (delegated to _FeatureEnumeratorMixin)
self._enumerate_features(features, regex_feature_names, ignored_features)
if self._features is None:
return
# initialize result cache (delegated to _ResultCacheMixin)
self._init_cache()
errors
property
¶
Errors that occurred during calculations.
Returns:
-
ErrorDataSource–ErrorDataSource object with all the errors that occurred during calculations.
features
property
¶
Features that will be calculated for each object model.
Returns:
-
dict[str, list[str]]–Dictionary mapping object model name to list of features that will be calculated for that object model.
objects
property
¶
List of objects for which the features will be calculated.
Returns:
-
list[str]–List of objects for which the features will be calculated.
results
property
¶
Results of the calculations.
Returns:
-
dict[str, dict[str, FeatureCalculator]]–Dict containing the results of the calculations for each object and feature.
First level of keys are the object names and the second level of keys are the feature names.
The values are FeatureCalculator objects with the results of the calculations.
calculate(period, save_into='all', save_method='end', free_up_memory=True, max_workers=1, timeout=600, **kwargs)
¶
Calculate all features for all objects and optionally save results to the database.
Iterates over each feature in declaration order, then over each object model, then
over each object, calling the appropriate :class:FeatureCalculator subclass for
every (object, feature) pair. Errors are recorded per-object-per-feature and do
not interrupt the remaining calculations.
Parameters:
-
(period¶DateTimeRange) –Desired calculation period.
-
(save_into¶Literal['all', 'performance_db'] | None, default:'all') –Where to persist results:
"all": write to bothperformance_dband Bazefield."performance_db": write only toperformance_db.None: do not save (useful for testing or dry-runs).
By default
"all". -
(save_method¶Literal['end', 'each'], default:'end') –Controls when results are written to the database:
"end": accumulate all results in memory and save each object's features together once every feature for that object is done. More efficient (fewer DB round-trips) but uses more RAM."each": save after every individual (object, feature) calculation. Results appear in the database sooner but generate more DB traffic.
By default
"end". -
(free_up_memory¶bool, default:True) –When
True(default), discard each object's in-memory results immediately after saving to reduce peak RAM usage. Set toFalseonly if you need to access :attr:resultsafter the call. -
(max_workers¶int | None, default:1) –Number of objects to calculate in parallel within each (feature, model) batch.
1(the default) runs sequentially and is the recommended setting for production use.Why 1 is the default — threading caveats
Parallelism here is achieved with :class:
~concurrent.futures.ThreadPoolExecutor. Threads share the process GIL and the Polars Rayon thread pool, which creates two independent deadlock risks:-
Rayon work-stealing + GIL re-entry. Polars uses a single global Rayon thread pool (
POOL). WithPOLARS_MAX_THREADS > 1, a Rayon worker executing on behalf of thread A can steal a Python-callable task submitted by thread B and then block onPyGILState_Ensurewhile thread A still holds the GIL — a circular wait. SettingPOLARS_MAX_THREADS=1prevents work-stealing acrossPOOL.install()boundaries and eliminates this risk. The guard in this method enforces that and falls back tomax_workers=1if the condition is not met. -
echo-postgres class-level shared state. Several
echo-postgresfunctions (feature_definitions.get,object_instances.get,cast_attributes, etc.) cache DataFrames at the class level and perform concurrent Polars/pandas operations on them when called from multiple threads simultaneously. This deadlocks regardless ofPOLARS_MAX_THREADS. The most common trigger is eval-expression features (server_calc_type = "expression_evaluation") whose expression strings call helper functions such asfilters.power_curve_offset_filterthat queryecho-postgresinternally — these calls happen inside_safe_execat compute time and cannot be pre-fetched on the main thread.
When
max_workers > 1is safeOutside a forked environment (i.e. not inside an Airflow Celery worker) and with
POLARS_MAX_THREADS=1, parallelism is safe if and only if none of the features being calculated use eval expressions that callecho-postgresfunctions at compute time. Features whoseserver_calc_typeis anything other than"expression_evaluation"are generally safe because their_computemethods only perform pure Polars operations on thread-local DataFrames.Automatic fallbacks
The method reduces
max_workersto1and emits a :func:warnings.warnwhen either of the following is detected:POLARS_MAX_THREADS > 1(Rayon work-stealing risk).- The process was forked after module import (e.g. Airflow/billiard
Celery workers detected via :func:
os.register_at_fork).
Nonelets :class:~concurrent.futures.ThreadPoolExecutorchoosemin(32, os.cpu_count() + 4)workers automatically.Default is
1. -
-
(timeout¶float | None, default:600) –Maximum seconds to wait for all objects in a single (feature, model) batch. When the deadline is reached, objects that have not yet produced a result are recorded as :class:
~echo_errorsummary.ErrorFeatureentries and the calculation moves on to the next batch. Running threads are released without blocking (they cannot be interrupted in Python but will no longer delay progress).Nonedisables the timeout. Default is600. -
(**kwargs¶dict, default:{}) –Forwarded to :meth:
_save_obj_resultsand :meth:_save_all_obj_results.
Returns:
-
ErrorDataSource–Nested error report with one :class:
~echo_errorsummary.ErrorObjectper object and one :class:~echo_errorsummary.ErrorFeatureper (object, feature) pair that raised an exception. Successful calculations produce empty children with no exceptions attached.
Source code in echo_energycalc/calculation_handler.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = "all",
save_method: Literal["end", "each"] = "end",
free_up_memory: bool = True,
max_workers: int | None = 1,
timeout: float | None = 600,
**kwargs,
) -> ErrorDataSource:
"""
Calculate all features for all objects and optionally save results to the database.
Iterates over each feature in declaration order, then over each object model, then
over each object, calling the appropriate :class:`FeatureCalculator` subclass for
every (object, feature) pair. Errors are recorded per-object-per-feature and do
not interrupt the remaining calculations.
Parameters
----------
period : DateTimeRange
Desired calculation period.
save_into : Literal["all", "performance_db"] | None, optional
Where to persist results:
- ``"all"``: write to both ``performance_db`` and Bazefield.
- ``"performance_db"``: write only to ``performance_db``.
- ``None``: do not save (useful for testing or dry-runs).
By default ``"all"``.
save_method : Literal["end", "each"], optional
Controls when results are written to the database:
- ``"end"``: accumulate all results in memory and save each object's
features together once every feature for that object is done. More
efficient (fewer DB round-trips) but uses more RAM.
- ``"each"``: save after every individual (object, feature) calculation.
Results appear in the database sooner but generate more DB traffic.
By default ``"end"``.
free_up_memory : bool, optional
When ``True`` (default), discard each object's in-memory results
immediately after saving to reduce peak RAM usage. Set to ``False``
only if you need to access :attr:`results` after the call.
max_workers : int | None, optional
Number of objects to calculate in parallel within each (feature, model)
batch. ``1`` (the default) runs sequentially and is the recommended
setting for production use.
**Why 1 is the default — threading caveats**
Parallelism here is achieved with :class:`~concurrent.futures.ThreadPoolExecutor`.
Threads share the process GIL and the Polars Rayon thread pool, which
creates two independent deadlock risks:
1. **Rayon work-stealing + GIL re-entry.** Polars uses a single global
Rayon thread pool (``POOL``). With ``POLARS_MAX_THREADS > 1``, a
Rayon worker executing on behalf of thread A can steal a Python-callable
task submitted by thread B and then block on ``PyGILState_Ensure`` while
thread A still holds the GIL — a circular wait. Setting
``POLARS_MAX_THREADS=1`` prevents work-stealing across
``POOL.install()`` boundaries and eliminates this risk. The guard in
this method enforces that and falls back to ``max_workers=1`` if the
condition is not met.
2. **echo-postgres class-level shared state.** Several ``echo-postgres``
functions (``feature_definitions.get``, ``object_instances.get``,
``cast_attributes``, etc.) cache DataFrames at the class level and
perform concurrent Polars/pandas operations on them when called from
multiple threads simultaneously. This deadlocks regardless of
``POLARS_MAX_THREADS``. The most common trigger is eval-expression
features (``server_calc_type = "expression_evaluation"``) whose
expression strings call helper functions such as
``filters.power_curve_offset_filter`` that query ``echo-postgres``
internally — these calls happen inside ``_safe_exec`` at compute time
and cannot be pre-fetched on the main thread.
**When ``max_workers > 1`` is safe**
Outside a forked environment (i.e. not inside an Airflow Celery worker)
and with ``POLARS_MAX_THREADS=1``, parallelism is safe *if and only if*
none of the features being calculated use eval expressions that call
``echo-postgres`` functions at compute time. Features whose
``server_calc_type`` is anything other than ``"expression_evaluation"``
are generally safe because their ``_compute`` methods only perform pure
Polars operations on thread-local DataFrames.
**Automatic fallbacks**
The method reduces ``max_workers`` to ``1`` and emits a
:func:`warnings.warn` when either of the following is detected:
- ``POLARS_MAX_THREADS > 1`` (Rayon work-stealing risk).
- The process was forked after module import (e.g. Airflow/billiard
Celery workers detected via :func:`os.register_at_fork`).
``None`` lets :class:`~concurrent.futures.ThreadPoolExecutor` choose
``min(32, os.cpu_count() + 4)`` workers automatically.
Default is ``1``.
timeout : float | None, optional
Maximum seconds to wait for all objects in a single (feature, model)
batch. When the deadline is reached, objects that have not yet produced
a result are recorded as :class:`~echo_errorsummary.ErrorFeature` entries
and the calculation moves on to the next batch. Running threads are
released without blocking (they cannot be interrupted in Python but will
no longer delay progress). ``None`` disables the timeout.
Default is ``600``.
**kwargs : dict, optional
Forwarded to :meth:`_save_obj_results` and :meth:`_save_all_obj_results`.
Returns
-------
ErrorDataSource
Nested error report with one :class:`~echo_errorsummary.ErrorObject` per
object and one :class:`~echo_errorsummary.ErrorFeature` per
(object, feature) pair that raised an exception. Successful calculations
produce empty children with no exceptions attached.
"""
if self._features is None:
return self._errors
if max_workers is not None and max_workers > 1:
if pl.thread_pool_size() > 1:
message = (
f"POLARS_MAX_THREADS is {pl.thread_pool_size()} — parallel execution requires it to be 1. "
f"With multiple Rayon threads, work-stealing across concurrent POOL.install() calls can "
f"cause GIL + Rayon circular waits. "
f"Set POLARS_MAX_THREADS=1 before importing polars. Falling back to max_workers=1."
)
warnings.warn(
message,
stacklevel=2,
)
logger.warning(message)
max_workers = 1
elif _FORKED:
message = (
"Process has been forked (e.g. Airflow Celery). echo-postgres uses class-level cached state that is not safe to access from multiple threads after a fork. "
"Falling back to max_workers=1."
)
warnings.warn(
message,
stacklevel=2,
)
logger.warning(message)
max_workers = 1
# resetting calculation history
self._reset_calc_history()
# iterating over calculated features
features_list = self._features["name"].unique(maintain_order=True).to_list()
for feature in features_list:
# iterating over object models
obj_models = self._features.filter(pl.col("name") == feature)["object_model_name"].unique().to_list()
for obj_model in obj_models:
selected_objects = self._objects.filter(pl.col("object_model_name") == obj_model)["object_name"].to_list()
# Snapshot the cache once for the whole (feature, obj_model) batch.
_snapshot = self._get_cached_data() if save_method == "end" else None
cached_data_snapshot = _snapshot if (_snapshot is not None and not _snapshot.is_empty()) else None
calc_save_into = save_into if save_method == "each" else None
# Pre-create all FeatureCalculator instances sequentially on the main
# thread. FeatureCalculator.__init__ calls echo-postgres internally and
# that library has class-level cached state that deadlocks under concurrent
# access even when each thread holds its own PerfDB instance. Doing this
# here means worker threads only run .calculate() — no DB calls in __init__.
pre_created: dict[str, tuple[FeatureCalculator | None, Exception | None]] = {}
for _obj in selected_objects:
try:
pre_created[_obj] = (self._create_feature_calc(_obj, obj_model, feature), None)
except Exception as _e:
pre_created[_obj] = (None, _e)
_lock = threading.Lock()
def _merge_result(res: dict, *, _feature: str = feature) -> None:
"""Apply one object's result to shared handler state. Must run under _lock."""
_obj = res["obj"]
_fc = res["feature_calc"]
_exc = res["exception"]
self._errors.add_child(ErrorObject(name=_obj))
self._errors.children[_obj].add_child(ErrorFeature(name=_feature))
if _exc is not None:
self._errors.children[_obj].children[_feature].add_exception(_exc)
if _fc is not None:
self._add_result(object_name=_obj, feature_name=_feature, result=_fc)
for extra_feat in res["extra_features"]:
if not self._features.filter(
(pl.col("object_name") == _obj) & (pl.col("name") == extra_feat),
).is_empty():
continue
new_row = pl.DataFrame(
{
"object_name": [_obj],
"name": [extra_feat],
"object_model_name": [None],
"data_source_type_name": [None],
"server_calc_type": [None],
"finished_calc": [True],
"cached": [False],
},
schema=self._features.schema,
)
self._features = pl.concat([self._features, new_row], how="diagonal")
self._features = self._features.with_columns(
pl.when(
(pl.col("name") == _feature) & (pl.col("object_name") == _obj),
)
.then(statement=True)
.otherwise(pl.col("finished_calc"))
.alias("finished_calc"),
)
if save_method == "end":
try:
if psutil.Process().memory_info().rss / 1024**2 > MAX_MEMORY_USAGE:
self._save_all_obj_results(save_into=save_into, free_up_memory=free_up_memory, **kwargs)
self._remove_cached_data()
elif self._features.filter(
(pl.col("object_name") == _obj) & (~pl.col("finished_calc")),
).is_empty():
self._save_obj_results(object_name=_obj, save_into=save_into, free_up_memory=free_up_memory, **kwargs)
except Exception as e:
message = f"'{_obj}' Error while saving results to database."
logger.exception(message)
self._errors.children[_obj].add_exception(e)
if _fc is not None and _fc.result is not None:
t0, t1, t2, t3 = res["t0"], res["t1"], res["t2"], res["t3"]
message = f"{_fc} - {period=} - Pre {t1 - t0:.2f}s, Cache {t2 - t1:.2f}s, Calc {t3 - t2:.2f}s, Post {perf_counter() - t3:.2f}s, Total {perf_counter() - t0:.2f}s."
logger.info(message)
if max_workers == 1 or len(selected_objects) == 1:
# Sequential fast path — no executor overhead
for obj in selected_objects:
fc, fc_exc = pre_created[obj]
res = self._calculate_single_object(
obj,
obj_model,
feature,
period,
features_list,
selected_objects,
obj_models,
cached_data_snapshot,
calc_save_into,
fc,
fc_exc,
**kwargs,
)
_merge_result(res)
else:
executor = ThreadPoolExecutor(max_workers=max_workers)
futures = {
executor.submit(
self._calculate_single_object,
obj,
obj_model,
feature,
period,
features_list,
selected_objects,
obj_models,
cached_data_snapshot,
calc_save_into,
pre_created[obj][0],
pre_created[obj][1],
**kwargs,
): obj
for obj in selected_objects
}
completed_results: list[dict] = []
try:
for fut in as_completed(futures, timeout=timeout):
# Only collect here — no _merge_result while workers are
# alive. _merge_result calls Polars and echo-postgres
# (to_pandas, filter, concat, DB save), which deadlocks
# with concurrent Polars ops in worker threads under
# POLARS_MAX_THREADS=1 (GIL + Rayon circular wait).
completed_results.append(fut.result()) # noqa: PERF401
except FutureTimeoutError:
# Release the executor without waiting for hung threads, then
# record a timeout error for every object that never finished.
executor.shutdown(wait=False, cancel_futures=True)
for fut, obj in futures.items():
if not fut.done():
message = f"'{obj}' - '{feature}': Calculation timed out after {timeout:.0f}s."
logger.error(message)
cprint(message, "red")
self._errors.add_child(ErrorObject(name=obj))
self._errors.children[obj].add_child(ErrorFeature(name=feature))
self._errors.children[obj].children[feature].add_exception(
FutureTimeoutError(message),
)
# All workers have exited — safe to call _merge_result now.
for res in completed_results:
_merge_result(res)
return self._errors
from_calc_types(calc_types, object_filters=None, one_per_object=False)
classmethod
¶
Class method that returns a CalculationHandler for all features of the desired server_calc_types.
Parameters:
-
(calc_types¶list[str]) –Desired calculation types, as defined in
server_calc_typefeature attribute. -
(object_filters¶dict[str, list[str]] | None, default:None) –A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.
Most common arguments are
object_names,object_types, "object_models,parent_objectsandspe_names`.By default None, which means that all objects will be calculated.
-
(one_per_object¶bool, default:False) –If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.
Returns:
-
CalculationHandler | list[CalculationHandler]–CalculationHandler object with the features for the desired calculation types.
If one_per_object is True, a list of CalculationHandler objects will be returned, one for each object found.
Source code in echo_energycalc/calculation_handler.py
@classmethod
def from_calc_types(
cls,
calc_types: list[str],
object_filters: dict[str, list[str]] | None = None,
one_per_object: bool = False,
) -> Self | list[Self]:
"""
Class method that returns a CalculationHandler for all features of the desired `server_calc_types`.
Parameters
----------
calc_types : list[str]
Desired calculation types, as defined in `server_calc_type` feature attribute.
object_filters : dict[str, list[str]] | None, optional
A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.
Most common arguments are `object_names`, `object_types`, "object_models`, `parent_objects` and `spe_names`.
By default None, which means that all objects will be calculated.
one_per_object : bool, optional
If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.
Returns
-------
CalculationHandler | list[CalculationHandler]
CalculationHandler object with the features for the desired calculation types.
If one_per_object is True, a list of CalculationHandler objects will be returned, one for each object found.
"""
if object_filters is None:
object_filters = {}
perfdb = PerfDB(application_name="CalculationHandler")
_objs = perfdb.objects.instances.get(**object_filters, output_type="pl.DataFrame")
_objs = _objs.rename({"name": "object_name"})
models = _objs["object_model_name"].unique().to_list()
model_features_list = []
for calc_type in calc_types:
this_model_features = perfdb.features.definitions.get(
object_models=models,
attributes={"server_calc_type": calc_type},
output_type="pl.DataFrame",
)
model_features_list.append(this_model_features)
model_features = pl.concat(model_features_list, rechunk=True)
feature_names = model_features["name"].to_list()
if not one_per_object:
return CalculationHandler(
features=feature_names,
object_filters=object_filters,
)
obj_to_model = dict(zip(_objs["object_name"].to_list(), _objs["object_model_name"].to_list(), strict=False))
model_to_features = {
model: model_features.filter(pl.col("object_model_name") == model)["name"].to_list()
for model in _objs["object_model_name"].unique().to_list()
}
handlers = []
for obj_name in _objs["object_name"].to_list():
obj_model = obj_to_model[obj_name]
handlers.append(
CalculationHandler(
features=model_to_features[obj_model],
object_filters={"object_names": [obj_name]},
),
)
return handlers
from_type_defaults(object_type='infer', object_filters=None, ignored_features=None, one_per_object=False)
classmethod
¶
Class method that returns a CalculationHandler object with the features that should be calculated for each object type, in the correct order.
The type defaults are defined in the default_feature_order setting in performance database
Parameters:
-
(object_type¶str, default:'infer') –Object type that will be used to get the features that should be calculated.
If "infer" is used, the object type will be inferred from the object_filters. This will also cause the method to return a list of CalculationHandler objects, one for each object type found.
Default is "infer".
-
(object_filters¶dict[str, list[str]] | None, default:None) –A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.
Most common arguments are
object_names,object_types, "object_models,parent_objectsandspe_names`.By default None, which means that all objects will be calculated.
-
(ignored_features¶list[str] | None, default:None) –List of features that should be ignored. These will not be calculated. By default None.
-
(one_per_object¶bool, default:False) –If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.
Returns:
-
CalculationHandler | list[CalculationHandler]–CalculationHandler object with the features that should be calculated for the desired object type, in the correct order.
If object_type is "infer" or one_per_object is True, a list of CalculationHandler objects will be returned, one for each object type or object found.
Source code in echo_energycalc/calculation_handler.py
@classmethod
def from_type_defaults(
cls,
object_type: str = "infer",
object_filters: dict[str, list[str]] | None = None,
ignored_features: list[str] | None = None,
one_per_object: bool = False,
) -> Self | list[Self]:
"""
Class method that returns a CalculationHandler object with the features that should be calculated for each object type, in the correct order.
The type defaults are defined in the default_feature_order setting in performance database
Parameters
----------
object_type : str, optional
Object type that will be used to get the features that should be calculated.
If "infer" is used, the object type will be inferred from the object_filters. This will also cause the method to return a list of CalculationHandler objects, one for each object type found.
Default is "infer".
object_filters : dict[str, list[str]] | None, optional
A dict containing the keyword arguments that will be passed to the function PerfDB.objects.instances.get(). These will be used to filter the objects that will be calculated.
Most common arguments are `object_names`, `object_types`, "object_models`, `parent_objects` and `spe_names`.
By default None, which means that all objects will be calculated.
ignored_features : list[str] | None, optional
List of features that should be ignored. These will not be calculated.
By default None.
one_per_object : bool, optional
If True, one CalculationHandler per object will be returned. This is useful when you want to calculate only one feature per object.
Returns
-------
CalculationHandler | list[CalculationHandler]
CalculationHandler object with the features that should be calculated for the desired object type, in the correct order.
If object_type is "infer" or one_per_object is True, a list of CalculationHandler objects will be returned, one for each object type or object found.
"""
perfdb = PerfDB(application_name="CalculationHandler")
if ignored_features is None:
ignored_features = []
if object_filters is None:
object_filters = {}
if object_type == "infer" or one_per_object:
_objects_df = perfdb.objects.instances.get(**object_filters, output_type="pl.DataFrame")
_objects_df = _objects_df.rename({"name": "object_name"})
obj_list = _objects_df["object_name"].to_list()
# group objects by type
objects_by_type: dict[str, list[str]] = {}
for row in _objects_df.select(["object_name", "object_type_name"]).iter_rows():
objects_by_type.setdefault(row[1], []).append(row[0])
object_types = list(objects_by_type.keys())
logger.info(
f"Object type inferred as the following: \n'{jsbeautifier.beautify(json.dumps(objects_by_type, sort_keys=True), js_options)}'.",
)
obj_to_type = dict(zip(_objects_df["object_name"].to_list(), _objects_df["object_type_name"].to_list(), strict=False))
else:
object_types = [object_type]
# getting feature order from database
feature_order = perfdb.settings.get()["feature_calculation"]["default_feature_order"]["value"]
# checking if object_type is defined in feature_order
missing_object_types = set(object_types) - set(feature_order.keys())
if missing_object_types:
raise ValueError(f"Object types '{missing_object_types}' are not defined in default_feature_order in database.")
if object_type != "infer":
if all(len(value) == 0 for value in object_filters.values()):
object_filters["object_types"] = [object_type]
if not one_per_object:
handler = CalculationHandler(
features=feature_order[object_type],
object_filters=object_filters,
regex_feature_names=True,
ignored_features=ignored_features,
)
else:
handler = []
for obj_name in obj_list:
obj_type = obj_to_type[obj_name]
handler.append(
CalculationHandler(
features=feature_order[obj_type],
object_filters={"object_names": [obj_name]},
regex_feature_names=True,
ignored_features=ignored_features,
),
)
else:
handler = []
if not one_per_object:
for obj_type in object_types:
handler.append(
CalculationHandler(
features=feature_order[obj_type],
object_filters={"object_names": objects_by_type[obj_type]},
regex_feature_names=True,
ignored_features=ignored_features,
),
)
else:
for obj_name in obj_list:
obj_type = obj_to_type[obj_name]
handler.append(
CalculationHandler(
features=feature_order[obj_type],
object_filters={"object_names": [obj_name]},
regex_feature_names=True,
ignored_features=ignored_features,
),
)
return handler