Alarm Calc - Threshold¶
The AlarmCalcThreshold class is a subclass of AlarmCalc and is used to calculate alarms based on feature thresholds. This means that an alarm will be created if the value of a feature is above or below a certain threshold.
This type of alarm calculation requires the following keys in the trigger column of the alarm definition in the database:
trigger_type: Must bethreshold.threshold_type: One of "high" or "low".- "high": the alarm is triggered when the value is above the threshold.
- "low": the alarm is triggered when the value is below the threshold.
feature_name: Name of the feature in the performance_db that will be used to calculate the alarm.threshold_value: Value of the threshold that will trigger the alarm.
Class Definition¶
AlarmCalcThreshold(object_name, alarm_id)
¶
Alarm calculator that calculates alarms based on thresholds.
It expects the following settings in the trigger columns of the alarm in performance_db:
- trigger_type: "threshold"
- threshold_type: One of "high" or "low":
- "high": the alarm is triggered when the value is above the threshold.
- "low": the alarm is triggered when the value is below the threshold.
- feature_name: Name of the feature in the performance_db that will be used to calculate the alarm.
- threshold_value: Value of the threshold that will trigger the alarm.
If non_overlapping_alarms is defined in the alarm settings, this alarm will not overlap with the alarms defined in the list, being the ones in this list considered as more important.
```
Parameters¶
object_name : str Name of the object for which the alarm is calculated. It must exist in performance_db. alarm_id : int ID of the alarm (manufacturer id) for which the alarm calculator is being created. It must exist in performance_db for the model of the object.
Source code in echo_energycalc/alarm_calc_threshold.py
def __init__(
self,
object_name: str,
alarm_id: int,
) -> None:
"""Alarm calculator that calculates alarms based on thresholds.
It expects the following settings in the trigger columns of the alarm in performance_db:
- **trigger_type**: "threshold"
- **threshold_type**: One of "high" or "low":
- "high": the alarm is triggered when the value is above the threshold.
- "low": the alarm is triggered when the value is below the threshold.
- **feature_name**: Name of the feature in the performance_db that will be used to calculate the alarm.
- **threshold_value**: Value of the threshold that will trigger the alarm.
If `non_overlapping_alarms` is defined in the alarm settings, this alarm will not overlap with the alarms defined in the list, being the ones in this list considered as more important.
```
Parameters
----------
object_name : str
Name of the object for which the alarm is calculated. It must exist in performance_db.
alarm_id : int
ID of the alarm (manufacturer id) for which the alarm calculator is being created. It must exist in performance_db for the model of the object.
"""
super().__init__(object_name, alarm_id)
# validating if all the required columns are present in the alarm settings
required_settings = {
"threshold_type": {"type": str, "values": ["high", "low"]},
"feature_name": {"type": str},
"threshold_value": {"type": (int, float)},
}
for setting, setting_info in required_settings.items():
if setting not in self.alarm_settings["trigger"]:
raise ValueError(
f"Setting '{setting}' is missing in the alarm settings of alarm {alarm_id} and object {object_name}.",
)
if not isinstance(
self.alarm_settings["trigger"][setting],
setting_info["type"],
):
raise TypeError(
f"Setting '{setting}' must be of type {setting_info['type']}, not {type(self.alarm_settings['trigger'][setting])}.",
)
if "values" in setting_info and self.alarm_settings["trigger"][setting] not in setting_info["values"]:
raise ValueError(
f"Setting '{setting}' must be one of {setting_info['values']}, not {self.alarm_settings['trigger'][setting]}.",
)
alarm_db_id
property
¶
ID of the alarm in the database. This is used to get the alarm settings.
Returns:
-
int–ID of the alarm in the database.
alarm_id
property
¶
ID of the alarm that is calculated (manufacturer_id). This will be defined in the constructor and cannot be changed.
Returns:
-
int–ID of the alarm that is calculated (manufacturer_id).
alarm_settings
property
¶
Settings of the alarm. This is a dictionary with the settings of the alarm that is being calculated.
Returns:
-
dict[str, Any]–Settings of the alarm.
alarm_type
property
¶
Type of the alarm that is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Type of the alarm that is calculated.
name
property
¶
Name of the alarm calculator. Is defined in child classes of AlarmCalculator.
This must be equal to the "server_calc_type" attribute of the alarm in performance_db.
Returns:
-
str–Name of the alarm calculator.
object
property
¶
Object for which the alarm is calculated. This will be defined in the constructor and cannot be changed.
Returns:
-
str–Object name for which the alarm is calculated.
result
property
¶
Result of the calculation. This is None until the method "calculate" is called.
Returns:
-
DataFrame | None:–Result of the calculation if the method "calculate" was called. None otherwise.
calculate(period, cached_data=None, save=True)
¶
Calculates the alarm for the given period.
Parameters:
-
(period¶DateTimeRange) –Period for which the alarm will be calculated.
-
(cached_data¶dict[str, DataFrame] | None, default:None) –Dict with DataFrame containing the cached data for each object. This is useful to avoid querying the database multiple times for the same data. The default is None.
-
(save¶bool, default:True) –If True, the result of the alarms will be saved in the database. The default is True.
Returns:
-
DataFrame–Pandas DataFrame with the calculated alarm.
Source code in echo_energycalc/alarm_calc_threshold.py
def calculate(
self,
period: DateTimeRange,
cached_data: dict[str, DataFrame] | None = None,
save: bool = True,
) -> DataFrame:
"""Calculates the alarm for the given period.
Parameters
----------
period : DateTimeRange
Period for which the alarm will be calculated.
cached_data : dict[str, DataFrame] | None, optional
Dict with DataFrame containing the cached data for each object. This is useful to avoid querying the database multiple times for the same data. The default is None.
save : bool, optional
If True, the result of the alarms will be saved in the database. The default is True.
Returns
-------
DataFrame
Pandas DataFrame with the calculated alarm.
"""
logger.info(
f"Calculating alarm {self.alarm_id} for object {self.object} for period {period}",
)
# getting data for the required period within the desired period
feature_series = None
if cached_data is not None and "features" in cached_data and cached_data["features"] is not None:
logger.debug(
f"Using cached data for object {self.object} and feature {self.alarm_settings['trigger']['feature_name']}",
)
# check if the desire feature is already in the cached data
if (
self.object,
self.alarm_settings["trigger"]["feature_name"],
) in cached_data["features"].columns:
feature_series = cached_data["features"][(self.object, self.alarm_settings["trigger"]["feature_name"])]
if feature_series is None:
feature_series = self._perfdb.features.values.series.get(
features={
self.object: [self.alarm_settings["trigger"]["feature_name"]],
},
period=period,
)[(self.object, self.alarm_settings["trigger"]["feature_name"])]
# adding the feature to the cached data
if cached_data is not None:
if "features" not in cached_data or cached_data["features"] is None:
cached_data["features"] = feature_series.to_frame()
else:
cached_data["features"] = cached_data["features"].merge(
feature_series.to_frame(),
left_index=True,
right_index=True,
how="outer",
)
# renaming the feature series
feature_series.name = "feature"
feature_series.index.name = "timestamp"
# dropping rows with NaN values
feature_series = feature_series.dropna()
# converting pandas Series to Polars Series for better performance
df = pl.from_pandas(feature_series, include_index=True).lazy()
# getting essential settings
threshould_type: Literal["high", "low"] = self.alarm_settings["trigger"]["threshold_type"]
threshold_value: float = self.alarm_settings["trigger"]["threshold_value"]
# finding the periods where the alarm is triggered
if threshould_type == "high":
df = df.with_columns(alarmed=pl.col("feature") > threshold_value)
else:
df = df.with_columns(alarmed=pl.col("feature") < threshold_value)
# * dealing with edge cases
if not df.limit(1).collect().is_empty():
# start of the period
first_val: bool = df.select("alarmed").first().collect()[0, 0]
first_timestamp: datetime = df.select("timestamp").first().collect()[0, 0]
if first_val:
# getting timestamp before the first timestamp where the alarm was not triggered
not_trig_timestamp = self._perfdb.features.values.whenatcondition.get(
{
self.object: {
self.alarm_settings["trigger"]["feature_name"]: {
"condition": f"value {'<' if threshould_type == 'high' else '>'} {threshold_value} AND timestamp < '{first_timestamp:%Y-%m-%d %H:%M:%S}'",
"order_by": "timestamp DESC",
"limit": 1,
},
},
},
)[self.object][self.alarm_settings["trigger"]["feature_name"]]
# if found, lets find the subsequent timestamp when the alarm was triggered
if not_trig_timestamp:
not_trig_timestamp = not_trig_timestamp[0]
trig_timestamp = self._perfdb.features.values.whenatcondition.get(
{
self.object: {
self.alarm_settings["trigger"]["feature_name"]: {
"condition": f"value {'>' if threshould_type == 'high' else '<'} {threshold_value} AND timestamp > '{not_trig_timestamp:%Y-%m-%d %H:%M:%S}'",
"order_by": "timestamp ASC",
"limit": 1,
},
},
},
)[self.object][self.alarm_settings["trigger"]["feature_name"]]
if trig_timestamp:
trig_timestamp = trig_timestamp[0]
# if the alarm was triggered before the first timestamp, we need to add a row with the alarm triggered
if not_trig_timestamp > trig_timestamp:
new_row = pl.DataFrame(
{
"timestamp": [first_timestamp],
"alarmed": [True],
},
).lazy()
df = pl.concat([new_row, df], how="diagonal_relaxed")
# end of the period
last_val: bool = df.select("alarmed").last().collect()[0, 0]
last_timestamp: datetime = df.select("timestamp").last().collect()[0, 0]
if last_val:
# getting timestamp after the last timestamp where the alarm was not triggered
not_trig_timestamp = self._perfdb.features.values.whenatcondition.get(
{
self.object: {
self.alarm_settings["trigger"]["feature_name"]: {
"condition": f"value {'<' if threshould_type == 'high' else '>'} {threshold_value} AND timestamp > '{last_timestamp:%Y-%m-%d %H:%M:%S}'",
"order_by": "timestamp ASC",
"limit": 1,
},
},
},
)[self.object][self.alarm_settings["trigger"]["feature_name"]]
# if found, lets add a row with the alarm ended
if not_trig_timestamp:
not_trig_timestamp = not_trig_timestamp[0]
new_row = pl.DataFrame(
{
"timestamp": [last_timestamp],
"alarmed": [False],
},
).lazy()
df = pl.concat([df, new_row], how="diagonal_relaxed")
# keeping original rows to evaluate truncation
non_filtered_df = df
self._evaluated_period = period
if not df.limit(1).collect().is_empty():
# keeping only rows with state changes
df = df.filter(
(pl.col("alarmed") != pl.col("alarmed").shift(1)) | pl.col("alarmed").is_first_distinct(),
)
# getting the period that is being evaluated
non_overlap_start = df.select("timestamp").first().collect()[0, 0]
non_overlap_end = df.select("timestamp").last().collect()[0, 0]
self._evaluated_period = DateTimeRange(non_overlap_start, non_overlap_end)
# * Evaluate non overlapping alarms
if len(df.filter(pl.col("alarmed")).collect()) > 1 and self.alarm_settings["non_overlapping_alarms"]:
# defining the period based on first and last timestamp of df
logger.info(
f"{self.object} - {self.alarm_id} - Evaluating non overlapping alarms {self.alarm_settings['non_overlapping_alarms']} during {self._evaluated_period}",
)
# iterating over the non overlapping alarms
for non_overlapping_alarm in self.alarm_settings["non_overlapping_alarms"]:
# getting the non overlapping alarms from the database
non_overlapping_alarms = self._perfdb.alarms.history.get(
period=self._evaluated_period,
object_names=[self.object],
alarm_ids=[non_overlapping_alarm],
match_alarm_id_on="manufacturer_id",
)
# renaming columns
rename = {
"start": "start",
"end": "end",
}
non_overlapping_alarms = non_overlapping_alarms[list(rename.keys())].rename(rename)
if non_overlapping_alarms.empty:
continue
# converting the result to a polars DataFrame
non_overlapping_alarms = pl.DataFrame(non_overlapping_alarms).lazy()
# melting the DataFrame into a timestamp and alarmed column
non_overlapping_starts = (
non_overlapping_alarms.select("start")
.with_columns(
alarmed=pl.lit(value=True),
)
.rename({"start": "timestamp"})
)
non_overlapping_ends = (
non_overlapping_alarms.select("end")
.with_columns(
alarmed=pl.lit(value=False),
)
.rename({"end": "timestamp"})
)
non_overlapping_alarms = pl.concat(
[non_overlapping_starts, non_overlapping_ends],
how="diagonal_relaxed",
)
# adding new column to indicate the non overlapping alarms
non_overlapping_alarms = non_overlapping_alarms.with_columns(
preceding_alarm=pl.lit(value=True),
)
# concatenating the non overlapping alarms with the original DataFrame
df = pl.concat([df, non_overlapping_alarms], how="diagonal_relaxed")
# make sure the DataFrame is sorted by timestamp
df = df.sort("timestamp")
# evaluating non overlapping alarms
if "preceding_alarm" in df.collect_schema().names():
# flip values of alarmed column when preceding_alarm is True
df: pl.LazyFrame = df.with_columns(
alarmed=pl.when(pl.col("preceding_alarm")).then(~pl.col("alarmed")).otherwise(pl.col("alarmed")),
)
# dropping rows with duplicated timestamp, giving priority to the ones where preceding_alarm is True
df = df.sort(["alarmed", "preceding_alarm"], nulls_last=True)
df = df.unique(subset=["timestamp"], keep="first")
# sorting again
df = df.sort("timestamp")
# dropping the preceding_alarm column
df = df.drop("preceding_alarm")
# Add an index for pairing start and end rows
df = df.with_columns(
pl.when(pl.col("alarmed"))
.then(pl.cum_count("alarmed").over("alarmed"))
.otherwise(None)
.fill_null(strategy="forward")
.alias("group_id"),
)
# dropping rows without group_id
# need as without this there is a bug when running in Airflow
df = df.filter(pl.col("group_id").is_not_null())
# Group by `group_id` and extract `start` and `end`
result = df.group_by("group_id").agg(
[
pl.col("timestamp").filter(pl.col("alarmed")).first().alias("start"),
pl.col("timestamp").filter(~pl.col("alarmed")).first().alias("end"),
],
)
# dropping the group_id column
result = result.drop("group_id")
# adding object_name, alarm_id, and alarm_type to the result
result = result.with_columns(
object_name=pl.lit(self.object),
alarm_id=pl.lit(self.alarm_id),
alarm_type=pl.lit(self.alarm_type),
)
# dropping rows without start value
result = result.filter(pl.col("start").is_not_null())
# sorting the result by start
result = result.sort("start")
# * evaluating truncation
# first creating a columns defining the rows with state changes
non_filtered_df = non_filtered_df.with_columns(
state_change=pl.when(pl.col("alarmed") != pl.col("alarmed").shift(1)).then(statement=True).otherwise(statement=False),
)
# then creating a column with the amount of seconds from the previous row
non_filtered_df = non_filtered_df.with_columns(
seconds_from_previous=(pl.col("timestamp") - pl.col("timestamp").shift(1)).dt.total_seconds(),
)
# creating a column with the timestamp of the previous row
non_filtered_df = non_filtered_df.with_columns(
previous_timestamp=pl.col("timestamp").shift(1),
)
# then creating a column with the truncate timestamp. It will be None if seconds_from_previous is less than self._max_duration_seconds and will have the timestamp to truncate otherwise
non_filtered_df = non_filtered_df.with_columns(
truncate_timestamp=pl.when(
pl.col("seconds_from_previous") > self._max_duration_seconds,
)
.then(
pl.col("previous_timestamp") + pl.duration(seconds=self._max_duration_seconds),
) # .shift(1)
.otherwise(None),
)
# defining the groups to group by using the same criteria as the result
non_filtered_df = non_filtered_df.with_columns(
group_id=pl.when(pl.col("state_change"))
.then(pl.cum_count("state_change").over("state_change"))
.otherwise(None)
.fill_null(strategy="forward"),
)
# removing empty group ids
# need as without this there is a bug when running in Airflow
non_filtered_df = non_filtered_df.filter(pl.col("group_id").is_not_null())
# grouping by truncate_timestamp, where the truncate_timestamp column will be the first non null value
truncate_alarms_timestamp = non_filtered_df.group_by("group_id").agg(
[
pl.col("timestamp").first().alias("start"),
pl.col("truncate_timestamp").filter(pl.col("truncate_timestamp").is_not_null()).first().alias("truncate_timestamp"),
],
)
# dropping the group_id column
truncate_alarms_timestamp = truncate_alarms_timestamp.drop("group_id")
# merging the truncate_alarms_timestamp with the result
result = result.join(truncate_alarms_timestamp, on="start", how="left")
# truncating the alarms
result = result.with_columns(
end=pl.when(pl.col("end") > pl.col("truncate_timestamp")).then(pl.col("truncate_timestamp")).otherwise(pl.col("end")),
)
# dropping the truncate_timestamp column
result = result.drop("truncate_timestamp")
# * Final steps
# converting the result to pandas DataFrame
result = result.collect().to_pandas(use_pyarrow_extension_array=True)
# setting the result to the instance
self._result = result
# saving the result in the database
if save:
self.save()
return result
save()
¶
Save the result of the calculation in the database.
If the method "calculate" was not called before, this method will raise an error.
Source code in echo_energycalc/alarm_calc_core.py
def save(self) -> None:
"""
Save the result of the calculation in the database.
If the method "calculate" was not called before, this method will raise an error.
"""
# checking if the result was calculated
if self._result is None:
raise ValueError("The method 'calculate' must be called before saving the result.")
# checking if the period was evaluated
if self._evaluated_period is None:
raise ValueError("Evaluated period was not set during calculation.")
# getting result
result = self._result
# checking if there are rows where start and end are equal
if not result.empty:
equal_count = result[result["start"] == result["end"]].shape[0]
if equal_count > 0:
logger.warning(f"Found {equal_count} rows with start equal to end in the result. These rows will be removed.")
result = result[result["start"] != result["end"]]
end_before_start_count = result[result["end"] < result["start"]].shape[0]
if end_before_start_count > 0:
logger.warning(f"Found {end_before_start_count} rows with end before start in the result. These rows will be removed.")
result = result[result["end"] >= result["start"]]
# renaming columns
result = result.rename(columns={"alarm_id": "manufacturer_id"})
# dropping existing alarms within the period
logger.info(
f"Deleting existing alarms for object {self.object} and alarm {self.alarm_id} within the period {self._evaluated_period}.",
)
self._perfdb.alarms.history.delete(
object_names=[self.object],
period=self._evaluated_period,
alarm_ids=[self.alarm_db_id],
)
if not result.empty:
# saving the result in the database
logger.info(f"Saving alarms for object {self.object} and alarm {self.alarm_id} within the period {self._evaluated_period}.")
self._perfdb.alarms.history.insert(
df=result,
on_conflict="update",
)
else:
logger.info(f"No alarms to save for object {self.object} and alarm {self.alarm_id} within the period {self._evaluated_period}.")