Alarm Active¶
Overview¶
FeatureCalcAlarmActive calculates, for each 10-minute period, how many seconds a specific alarm was active. The result is a time series of alarm duration values in the range [0, 600] seconds, where 600 means the alarm was active for the entire 10-minute period.
Example use case: Track how long turbine over-speed alarm ID 1234 was active in each 10-minute period.
Calculation Logic¶
Alarm Period Fetch¶
The query period is extended 10 minutes before the requested start to capture alarms that started before the window but were still active at the first timestamp. This is necessary because performance_db timestamps mark the end of each 10-minute period.
Vectorized Bucket Overlap¶
Each alarm record has a start and end timestamp. The calculator distributes alarm duration across all 10-minute buckets that the alarm overlaps, using a fully vectorized Polars approach:
-
Identify overlapping buckets: For each alarm, compute the first and last 10-minute bucket endpoint it touches:
Text Onlyfirst_bucket = ceil(alarm.start, "10min") last_bucket = ceil(alarm.end, "10min") -
Generate bucket list: Use
pl.datetime_ranges(first_bucket, last_bucket, "10min")to enumerate every bucket endpoint for that alarm in a single vectorized operation — no Python-level loop over timestamps. -
Compute overlap per bucket: After exploding the bucket lists, for each
(alarm, bucket_end)pair:Text Onlyoverlap_seconds = min(alarm.end, bucket_end) - max(alarm.start, bucket_end - 10min)Rows with
overlap_seconds ≤ 0are filtered out. -
Aggregate per bucket: Group by
bucket_endand sumoverlap_seconds, clipping the sum to[0, 600]seconds (multiple alarms of the same type can overlap). -
Fill missing buckets: Timestamps with no alarm activity get
0seconds (not null).
Pre-Processing Filters¶
- Alarms without an
endtimestamp (still active / unfinished) are excluded. - Alarms where
end < startare excluded as invalid.
Database Requirements¶
Feature Attribute¶
| Attribute | Value |
|---|---|
server_calc_type |
alarm_active_time |
feature_options_json |
JSON object — see below |
feature_options_json Schema¶
| Key | Type | Required | Description |
|---|---|---|---|
reference_alarm |
integer | Yes | manufacturer_id of the alarm to track (see view v_alarms_def). |
Example:
{
"reference_alarm": 1234
}
Note
The alarm must exist for the object's model in performance_db. The constructor validates this at instantiation — an invalid alarm ID raises ValueError.
Alarm History Data¶
The calculator reads from the alarms history table in performance_db. The relevant columns are start, end, and manufacturer_id. No additional feature data from SCADA is required.
Output¶
| Column | Type | Description |
|---|---|---|
timestamp |
Datetime | End of the 10-minute period |
<feature_name> |
Float64 | Alarm active time in seconds [0, 600]. 0 means not active. |
Class Definition¶
FeatureCalcAlarmActive(object_name, feature)
¶
FeatureCalculator class for features that represent the number of seconds an alarm is active in a 10 min period.
The method will get the records from alarms history table and calculate the number of seconds that the wanted alarm was active in a 10 min period.
For this to work the feature must have attribute feature_options_json with the following keys:
reference_alarm: Themanufacturer_id(see viewv_alarms_def) of the alarm that is used as reference.
Parameters:
-
(object_name¶str) –Name of the object for which the feature is calculated. It must exist in performance_db.
-
(feature¶str) –Feature of the object that is calculated. It must exist in performance_db.
Source code in echo_energycalc/feature_calc_alarm_active.py
def __init__(
self,
object_name: str,
feature: str,
) -> None:
"""
FeatureCalculator class for features that represent the number of seconds an alarm is active in a 10 min period.
The method will get the records from alarms history table and calculate the number of seconds that the wanted alarm was active in a 10 min period.
For this to work the feature must have attribute `feature_options_json` with the following keys:
- `reference_alarm`: The `manufacturer_id` (see view `v_alarms_def`) of the alarm that is used as reference.
Parameters
----------
object_name : str
Name of the object for which the feature is calculated. It must exist in performance_db.
feature : str
Feature of the object that is calculated. It must exist in performance_db.
"""
# initialize parent class
super().__init__(object_name, feature)
# requirements for the feature calculator
self._add_requirement(RequiredFeatureAttributes(self.object, self.feature, ["feature_options_json"]))
self._fetch_requirements()
# validating feature options
self._validate_feature_options()
# defining required alarms
self._add_requirement(
RequiredAlarms(
{
self.object: [
self._requirement_data("RequiredFeatureAttributes")[self.feature]["feature_options_json"]["reference_alarm"],
],
},
),
)
feature
property
¶
Feature that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Name of the feature that is calculated.
name
property
¶
Name of the feature calculator. Is defined in child classes of FeatureCalculator.
This must be equal to the "server_calc_type" attribute of the feature in performance_db.
Returns:
-
str–Name of the feature calculator.
object
property
¶
Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the feature is calculated.
requirements
property
¶
List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.
Returns:
-
dict[str, list[CalculationRequirement]]–Dict of requirements.
The keys are the names of the classes of the requirements and the values are lists of requirements of that class.
For example:
{"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
DataFrame | None–Polars DataFrame with a
"timestamp"column and one or more feature value columns. None untilcalculateis called.
calculate(period, save_into=None, cached_data=None, **kwargs)
¶
Run the calculation for the given period and optionally save the result.
Calls :meth:_compute to get the result, stores it in :attr:result,
then calls :meth:save. Subclasses should implement :meth:_compute instead
of overriding this method.
Parameters:
-
(period¶DateTimeRange) –Period for which the feature will be calculated.
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –"all": save in performance_db and bazefield."performance_db": save only in performance_db.None: do not save.
By default None.
-
(cached_data¶DataFrame | None, default:None) –Polars DataFrame with features already fetched/calculated. Passed to
_computeto enable chained calculations without re-querying performance_db. By default None. -
–**kwargs¶Forwarded to :meth:
save.
Returns:
-
DataFrame–Polars DataFrame with a
"timestamp"column and one or more feature value columns.
Source code in echo_energycalc/feature_calc_core.py
def calculate(
self,
period: DateTimeRange,
save_into: Literal["all", "performance_db"] | None = None,
cached_data: pl.DataFrame | None = None,
**kwargs,
) -> pl.DataFrame:
"""
Run the calculation for the given period and optionally save the result.
Calls :meth:`_compute` to get the result, stores it in :attr:`result`,
then calls :meth:`save`. Subclasses should implement :meth:`_compute` instead
of overriding this method.
Parameters
----------
period : DateTimeRange
Period for which the feature will be calculated.
save_into : Literal["all", "performance_db"] | None, optional
- ``"all"``: save in performance_db and bazefield.
- ``"performance_db"``: save only in performance_db.
- ``None``: do not save.
By default None.
cached_data : pl.DataFrame | None, optional
Polars DataFrame with features already fetched/calculated. Passed to
``_compute`` to enable chained calculations without re-querying
performance_db. By default None.
**kwargs
Forwarded to :meth:`save`.
Returns
-------
pl.DataFrame
Polars DataFrame with a ``"timestamp"`` column and one or more feature value columns.
"""
result = self._compute(period, cached_data=cached_data)
self._result = result
self.save(save_into=save_into, **kwargs)
return result
save(save_into=None, **kwargs)
¶
Method to save the calculated feature values in performance_db.
Parameters:
-
(save_into¶Literal['all', 'performance_db'] | None, default:None) –Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.
By default None.
-
(**kwargs¶dict, default:{}) –Not being used at the moment. Here only for compatibility.
Source code in echo_energycalc/feature_calc_core.py
def save(
self,
save_into: Literal["all", "performance_db"] | None = None,
**kwargs, # noqa: ARG002
) -> None:
"""
Method to save the calculated feature values in performance_db.
Parameters
----------
save_into : Literal["all", "performance_db"] | None, optional
Argument that will be passed to the method "save". The options are:
- "all": The feature will be saved in performance_db and bazefield.
- "performance_db": the feature will be saved only in performance_db.
- None: The feature will not be saved.
By default None.
**kwargs : dict, optional
Not being used at the moment. Here only for compatibility.
"""
# checking arguments
if not isinstance(save_into, str | type(None)):
raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")
# checking if calculation was done
if self.result is None:
raise ValueError(
"The calculation was not done. Please call 'calculate' before calling 'save'.",
)
if save_into is None:
return
upload_to_bazefield = save_into == "all"
if not isinstance(self.result, pl.DataFrame):
raise TypeError(f"result must be a polars DataFrame, not {type(self.result)}.")
if "timestamp" not in self.result.columns:
raise ValueError("result DataFrame must contain a 'timestamp' column.")
# rename feature columns to "object@feature" format expected by perfdb polars insert
feat_cols = [c for c in self.result.columns if c != "timestamp"]
result_pl = self.result.rename({col: f"{self.object}@{col}" for col in feat_cols})
self._perfdb.features.values.series.insert(
df=result_pl,
on_conflict="update",
bazefield_upload=upload_to_bazefield,
)