Skip to content

Alarm Calc - Threshold

The AlarmCalcThreshold class is a subclass of AlarmCalc and is used to calculate alarms based on feature thresholds. This means that an alarm will be created if the value of a feature is above or below a certain threshold.

This type of alarm calculation requires the following keys in the trigger column of the alarm definition in the database:

  • trigger_type: Must be threshold.
  • threshold_type: One of "high" or "low".
    • "high": the alarm is triggered when the value is above the threshold.
    • "low": the alarm is triggered when the value is below the threshold.
  • feature_name: Name of the feature in the performance_db that will be used to calculate the alarm.
  • threshold_value: Value of the threshold that will trigger the alarm.

Class Definition

AlarmCalcThreshold(object_name, alarm_id)

Alarm calculator that calculates alarms based on thresholds.

It expects the following settings in the trigger columns of the alarm in performance_db:

  • trigger_type: "threshold"
  • threshold_type: One of "high" or "low":
    • "high": the alarm is triggered when the value is above the threshold.
    • "low": the alarm is triggered when the value is below the threshold.
  • feature_name: Name of the feature in the performance_db that will be used to calculate the alarm.
  • threshold_value: Value of the threshold that will trigger the alarm.

If non_overlapping_alarms is defined in the alarm settings, this alarm will not overlap with the alarms defined in the list, being the ones in this list considered as more important. ```

Parameters

object_name : str Name of the object for which the alarm is calculated. It must exist in performance_db. alarm_id : int ID of the alarm (manufacturer id) for which the alarm calculator is being created. It must exist in performance_db for the model of the object.

Source code in echo_energycalc/alarm_calc_threshold.py
def __init__(
    self,
    object_name: str,
    alarm_id: int,
) -> None:
    """Alarm calculator that calculates alarms based on thresholds.

    It expects the following settings in the trigger columns of the alarm in performance_db:

    - **trigger_type**: "threshold"
    - **threshold_type**: One of "high" or "low":
        - "high": the alarm is triggered when the value is above the threshold.
        - "low": the alarm is triggered when the value is below the threshold.
    - **feature_name**: Name of the feature in the performance_db that will be used to calculate the alarm.
    - **threshold_value**: Value of the threshold that will trigger the alarm.

    If `non_overlapping_alarms` is defined in the alarm settings, this alarm will not overlap with the alarms defined in the list, being the ones in this list considered as more important.
    ```

    Parameters
    ----------
    object_name : str
        Name of the object for which the alarm is calculated. It must exist in performance_db.
    alarm_id : int
        ID of the alarm (manufacturer id) for which the alarm calculator is being created. It must exist in performance_db for the model of the object.

    """
    super().__init__(object_name, alarm_id)

    # validating if all the required columns are present in the alarm settings

    required_settings = {
        "threshold_type": {"type": str, "values": ["high", "low"]},
        "feature_name": {"type": str},
        "threshold_value": {"type": (int, float)},
    }
    for setting, setting_info in required_settings.items():
        if setting not in self.alarm_settings["trigger"]:
            raise ValueError(
                f"Setting '{setting}' is missing in the alarm settings of alarm {alarm_id} and object {object_name}.",
            )
        if not isinstance(
            self.alarm_settings["trigger"][setting],
            setting_info["type"],
        ):
            raise TypeError(
                f"Setting '{setting}' must be of type {setting_info['type']}, not {type(self.alarm_settings['trigger'][setting])}.",
            )
        if "values" in setting_info and self.alarm_settings["trigger"][setting] not in setting_info["values"]:
            raise ValueError(
                f"Setting '{setting}' must be one of {setting_info['values']}, not {self.alarm_settings['trigger'][setting]}.",
            )

alarm_db_id property

ID of the alarm in the database. This is used to get the alarm settings.

Returns:

  • int

    ID of the alarm in the database.

alarm_id property

ID of the alarm that is calculated (manufacturer_id). This will be defined in the constructor and cannot be changed.

Returns:

  • int

    ID of the alarm that is calculated (manufacturer_id).

alarm_settings property

Settings of the alarm. This is a dictionary with the settings of the alarm that is being calculated.

Returns:

  • dict[str, Any]

    Settings of the alarm.

alarm_type property

Type of the alarm that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Type of the alarm that is calculated.

name property

Name of the alarm calculator. Is defined in child classes of AlarmCalculator.

This must be equal to the "server_calc_type" attribute of the alarm in performance_db.

Returns:

  • str

    Name of the alarm calculator.

object property

Object for which the alarm is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the alarm is calculated.

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • DataFrame | None:

    Result of the calculation if the method "calculate" was called. None otherwise.

calculate(period, cached_data=None, save=True)

Calculates the alarm for the given period.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the alarm will be calculated.

  • cached_data

    (dict[str, DataFrame] | None, default: None ) –

    Dict with DataFrame containing the cached data for each object. This is useful to avoid querying the database multiple times for the same data. The default is None.

  • save

    (bool, default: True ) –

    If True, the result of the alarms will be saved in the database. The default is True.

Returns:

  • DataFrame

    Pandas DataFrame with the calculated alarm.

Source code in echo_energycalc/alarm_calc_threshold.py
def calculate(
    self,
    period: DateTimeRange,
    cached_data: dict[str, DataFrame] | None = None,
    save: bool = True,
) -> DataFrame:
    """Calculates the alarm for the given period.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the alarm will be calculated.
    cached_data : dict[str, DataFrame] | None, optional
        Dict with DataFrame containing the cached data for each object. This is useful to avoid querying the database multiple times for the same data. The default is None.
    save : bool, optional
        If True, the result of the alarms will be saved in the database. The default is True.

    Returns
    -------
    DataFrame
        Pandas DataFrame with the calculated alarm.

    """
    logger.info(
        f"Calculating alarm {self.alarm_id} for object {self.object} for period {period}",
    )

    # getting data for the required period within the desired period
    feature_series = None
    if cached_data is not None and "features" in cached_data and cached_data["features"] is not None:
        logger.debug(
            f"Using cached data for object {self.object} and feature {self.alarm_settings['trigger']['feature_name']}",
        )
        # check if the desire feature is already in the cached data
        if (
            self.object,
            self.alarm_settings["trigger"]["feature_name"],
        ) in cached_data["features"].columns:
            feature_series = cached_data["features"][(self.object, self.alarm_settings["trigger"]["feature_name"])]

    if feature_series is None:
        feature_series = self._perfdb.features.values.series.get(
            features={
                self.object: [self.alarm_settings["trigger"]["feature_name"]],
            },
            period=period,
        )[(self.object, self.alarm_settings["trigger"]["feature_name"])]

        # adding the feature to the cached data
        if cached_data is not None:
            if "features" not in cached_data or cached_data["features"] is None:
                cached_data["features"] = feature_series.to_frame()
            else:
                cached_data["features"] = cached_data["features"].merge(
                    feature_series.to_frame(),
                    left_index=True,
                    right_index=True,
                    how="outer",
                )

    # renaming the feature series
    feature_series.name = "feature"
    feature_series.index.name = "timestamp"

    # dropping rows with NaN values
    feature_series = feature_series.dropna()

    # converting pandas Series to Polars Series for better performance
    df = pl.from_pandas(feature_series, include_index=True).lazy()

    # getting essential settings
    threshould_type: Literal["high", "low"] = self.alarm_settings["trigger"]["threshold_type"]
    threshold_value: float = self.alarm_settings["trigger"]["threshold_value"]

    # finding the periods where the alarm is triggered
    if threshould_type == "high":
        df = df.with_columns(alarmed=pl.col("feature") > threshold_value)
    else:
        df = df.with_columns(alarmed=pl.col("feature") < threshold_value)

    # * dealing with edge cases

    if not df.limit(1).collect().is_empty():
        # start of the period
        first_val: bool = df.select("alarmed").first().collect()[0, 0]
        first_timestamp: datetime = df.select("timestamp").first().collect()[0, 0]
        if first_val:
            # getting timestamp before the first timestamp where the alarm was not triggered
            not_trig_timestamp = self._perfdb.features.values.whenatcondition.get(
                {
                    self.object: {
                        self.alarm_settings["trigger"]["feature_name"]: {
                            "condition": f"value {'<' if threshould_type == 'high' else '>'} {threshold_value} AND timestamp < '{first_timestamp:%Y-%m-%d %H:%M:%S}'",
                            "order_by": "timestamp DESC",
                            "limit": 1,
                        },
                    },
                },
            )[self.object][self.alarm_settings["trigger"]["feature_name"]]
            # if found, lets find the subsequent timestamp when the alarm was triggered
            if not_trig_timestamp:
                not_trig_timestamp = not_trig_timestamp[0]
                trig_timestamp = self._perfdb.features.values.whenatcondition.get(
                    {
                        self.object: {
                            self.alarm_settings["trigger"]["feature_name"]: {
                                "condition": f"value {'>' if threshould_type == 'high' else '<'} {threshold_value} AND timestamp > '{not_trig_timestamp:%Y-%m-%d %H:%M:%S}'",
                                "order_by": "timestamp ASC",
                                "limit": 1,
                            },
                        },
                    },
                )[self.object][self.alarm_settings["trigger"]["feature_name"]]
                if trig_timestamp:
                    trig_timestamp = trig_timestamp[0]
                    # if the alarm was triggered before the first timestamp, we need to add a row with the alarm triggered
                    if not_trig_timestamp > trig_timestamp:
                        new_row = pl.DataFrame(
                            {
                                "timestamp": [first_timestamp],
                                "alarmed": [True],
                            },
                        ).lazy()
                        df = pl.concat([new_row, df], how="diagonal_relaxed")

        # end of the period
        last_val: bool = df.select("alarmed").last().collect()[0, 0]
        last_timestamp: datetime = df.select("timestamp").last().collect()[0, 0]
        if last_val:
            # getting timestamp after the last timestamp where the alarm was not triggered
            not_trig_timestamp = self._perfdb.features.values.whenatcondition.get(
                {
                    self.object: {
                        self.alarm_settings["trigger"]["feature_name"]: {
                            "condition": f"value {'<' if threshould_type == 'high' else '>'} {threshold_value} AND timestamp > '{last_timestamp:%Y-%m-%d %H:%M:%S}'",
                            "order_by": "timestamp ASC",
                            "limit": 1,
                        },
                    },
                },
            )[self.object][self.alarm_settings["trigger"]["feature_name"]]
            # if found, lets add a row with the alarm ended
            if not_trig_timestamp:
                not_trig_timestamp = not_trig_timestamp[0]
                new_row = pl.DataFrame(
                    {
                        "timestamp": [last_timestamp],
                        "alarmed": [False],
                    },
                ).lazy()
                df = pl.concat([df, new_row], how="diagonal_relaxed")

    # keeping original rows to evaluate truncation
    non_filtered_df = df
    self._evaluated_period = period

    if not df.limit(1).collect().is_empty():
        # keeping only rows with state changes
        df = df.filter(
            (pl.col("alarmed") != pl.col("alarmed").shift(1)) | pl.col("alarmed").is_first_distinct(),
        )

        # getting the period that is being evaluated
        non_overlap_start = df.select("timestamp").first().collect()[0, 0]
        non_overlap_end = df.select("timestamp").last().collect()[0, 0]
        self._evaluated_period = DateTimeRange(non_overlap_start, non_overlap_end)

        # * Evaluate non overlapping alarms
        if len(df.filter(pl.col("alarmed")).collect()) > 1 and self.alarm_settings["non_overlapping_alarms"]:
            # defining the period based on first and last timestamp of df

            logger.info(
                f"{self.object} - {self.alarm_id} - Evaluating non overlapping alarms {self.alarm_settings['non_overlapping_alarms']} during {self._evaluated_period}",
            )

            # iterating over the non overlapping alarms
            for non_overlapping_alarm in self.alarm_settings["non_overlapping_alarms"]:
                # getting the non overlapping alarms from the database
                non_overlapping_alarms = self._perfdb.alarms.history.get(
                    period=self._evaluated_period,
                    object_names=[self.object],
                    alarm_ids=[non_overlapping_alarm],
                    match_alarm_id_on="manufacturer_id",
                )
                # renaming columns
                rename = {
                    "start": "start",
                    "end": "end",
                }
                non_overlapping_alarms = non_overlapping_alarms[list(rename.keys())].rename(rename)

                if non_overlapping_alarms.empty:
                    continue

                # converting the result to a polars DataFrame
                non_overlapping_alarms = pl.DataFrame(non_overlapping_alarms).lazy()

                # melting the DataFrame into a timestamp and alarmed column
                non_overlapping_starts = (
                    non_overlapping_alarms.select("start")
                    .with_columns(
                        alarmed=pl.lit(value=True),
                    )
                    .rename({"start": "timestamp"})
                )
                non_overlapping_ends = (
                    non_overlapping_alarms.select("end")
                    .with_columns(
                        alarmed=pl.lit(value=False),
                    )
                    .rename({"end": "timestamp"})
                )

                non_overlapping_alarms = pl.concat(
                    [non_overlapping_starts, non_overlapping_ends],
                    how="diagonal_relaxed",
                )

                # adding new column to indicate the non overlapping alarms
                non_overlapping_alarms = non_overlapping_alarms.with_columns(
                    preceding_alarm=pl.lit(value=True),
                )

                # concatenating the non overlapping alarms with the original DataFrame
                df = pl.concat([df, non_overlapping_alarms], how="diagonal_relaxed")

    # make sure the DataFrame is sorted by timestamp
    df = df.sort("timestamp")

    # evaluating non overlapping alarms
    if "preceding_alarm" in df.collect_schema().names():
        # flip values of alarmed column when preceding_alarm is True
        df: pl.LazyFrame = df.with_columns(
            alarmed=pl.when(pl.col("preceding_alarm")).then(~pl.col("alarmed")).otherwise(pl.col("alarmed")),
        )
        # dropping rows with duplicated timestamp, giving priority to the ones where preceding_alarm is True
        df = df.sort(["alarmed", "preceding_alarm"], nulls_last=True)
        df = df.unique(subset=["timestamp"], keep="first")
        # sorting again
        df = df.sort("timestamp")
        # dropping the preceding_alarm column
        df = df.drop("preceding_alarm")

    # Add an index for pairing start and end rows
    df = df.with_columns(
        pl.when(pl.col("alarmed"))
        .then(pl.cum_count("alarmed").over("alarmed"))
        .otherwise(None)
        .fill_null(strategy="forward")
        .alias("group_id"),
    )

    # dropping rows without group_id
    # need as without this there is a bug when running in Airflow
    df = df.filter(pl.col("group_id").is_not_null())

    # Group by `group_id` and extract `start` and `end`
    result = df.group_by("group_id").agg(
        [
            pl.col("timestamp").filter(pl.col("alarmed")).first().alias("start"),
            pl.col("timestamp").filter(~pl.col("alarmed")).first().alias("end"),
        ],
    )

    # dropping the group_id column
    result = result.drop("group_id")

    # adding object_name, alarm_id, and alarm_type to the result
    result = result.with_columns(
        object_name=pl.lit(self.object),
        alarm_id=pl.lit(self.alarm_id),
        alarm_type=pl.lit(self.alarm_type),
    )

    # dropping rows without start value
    result = result.filter(pl.col("start").is_not_null())

    # sorting the result by start
    result = result.sort("start")

    # * evaluating truncation

    # first creating a columns defining the rows with state changes
    non_filtered_df = non_filtered_df.with_columns(
        state_change=pl.when(pl.col("alarmed") != pl.col("alarmed").shift(1)).then(statement=True).otherwise(statement=False),
    )

    # then creating a column with the amount of seconds from the previous row
    non_filtered_df = non_filtered_df.with_columns(
        seconds_from_previous=(pl.col("timestamp") - pl.col("timestamp").shift(1)).dt.total_seconds(),
    )

    # creating a column with the timestamp of the previous row
    non_filtered_df = non_filtered_df.with_columns(
        previous_timestamp=pl.col("timestamp").shift(1),
    )

    # then creating a column with the truncate timestamp. It will be None if seconds_from_previous is less than self._max_duration_seconds and will have the timestamp to truncate otherwise
    non_filtered_df = non_filtered_df.with_columns(
        truncate_timestamp=pl.when(
            pl.col("seconds_from_previous") > self._max_duration_seconds,
        )
        .then(
            pl.col("previous_timestamp") + pl.duration(seconds=self._max_duration_seconds),
        )  #  .shift(1)
        .otherwise(None),
    )

    # defining the groups to group by using the same criteria as the result
    non_filtered_df = non_filtered_df.with_columns(
        group_id=pl.when(pl.col("state_change"))
        .then(pl.cum_count("state_change").over("state_change"))
        .otherwise(None)
        .fill_null(strategy="forward"),
    )
    # removing empty group ids
    # need as without this there is a bug when running in Airflow
    non_filtered_df = non_filtered_df.filter(pl.col("group_id").is_not_null())

    # grouping by truncate_timestamp, where the truncate_timestamp column will be the first non null value
    truncate_alarms_timestamp = non_filtered_df.group_by("group_id").agg(
        [
            pl.col("timestamp").first().alias("start"),
            pl.col("truncate_timestamp").filter(pl.col("truncate_timestamp").is_not_null()).first().alias("truncate_timestamp"),
        ],
    )
    # dropping the group_id column
    truncate_alarms_timestamp = truncate_alarms_timestamp.drop("group_id")

    # merging the truncate_alarms_timestamp with the result
    result = result.join(truncate_alarms_timestamp, on="start", how="left")

    # truncating the alarms
    result = result.with_columns(
        end=pl.when(pl.col("end") > pl.col("truncate_timestamp")).then(pl.col("truncate_timestamp")).otherwise(pl.col("end")),
    )

    # dropping the truncate_timestamp column
    result = result.drop("truncate_timestamp")

    # * Final steps

    # converting the result to pandas DataFrame
    result = result.collect().to_pandas(use_pyarrow_extension_array=True)

    # setting the result to the instance
    self._result = result

    # saving the result in the database
    if save:
        self.save()

    return result

save()

Save the result of the calculation in the database.

If the method "calculate" was not called before, this method will raise an error.

Source code in echo_energycalc/alarm_calc_core.py
def save(self) -> None:
    """
    Save the result of the calculation in the database.

    If the method "calculate" was not called before, this method will raise an error.
    """
    # checking if the result was calculated
    if self._result is None:
        raise ValueError("The method 'calculate' must be called before saving the result.")
    # checking if the period was evaluated
    if self._evaluated_period is None:
        raise ValueError("Evaluated period was not set during calculation.")

    # getting result
    result = self._result

    # checking if there are rows where start and end are equal
    if not result.empty:
        equal_count = result[result["start"] == result["end"]].shape[0]
        if equal_count > 0:
            logger.warning(f"Found {equal_count} rows with start equal to end in the result. These rows will be removed.")
            result = result[result["start"] != result["end"]]
        end_before_start_count = result[result["end"] < result["start"]].shape[0]
        if end_before_start_count > 0:
            logger.warning(f"Found {end_before_start_count} rows with end before start in the result. These rows will be removed.")
            result = result[result["end"] >= result["start"]]

    # renaming columns
    result = result.rename(columns={"alarm_id": "manufacturer_id"})

    # dropping existing alarms within the period
    logger.info(
        f"Deleting existing alarms for object {self.object} and alarm {self.alarm_id} within the period {self._evaluated_period}.",
    )
    self._perfdb.alarms.history.delete(
        object_names=[self.object],
        period=self._evaluated_period,
        alarm_ids=[self.alarm_db_id],
    )

    if not result.empty:
        # saving the result in the database
        logger.info(f"Saving alarms for object {self.object} and alarm {self.alarm_id} within the period {self._evaluated_period}.")
        self._perfdb.alarms.history.insert(
            df=result,
            on_conflict="update",
        )
    else:
        logger.info(f"No alarms to save for object {self.object} and alarm {self.alarm_id} within the period {self._evaluated_period}.")