Skip to content

Power at Connection Point

Overview

The FeatureCalcPowerCP class is a subclass of FeatureCalculator that calculates the power at the connection point of an SPE.

Calculation Logic

The calculation logic is described in the constructor of the class, shown below in the Class Definition.

Database Requirements

  • Feature attribute server_calc_type must be set to power_connection_point.
  • The following object attributes for the object that is being calculated:
    • Required for the power meter object:
      • power_meter_location: A string that defines the location of the power meter. Only power meters with this attribute set to collecting substation will be considered. We assume other meters are at connection point, not needing this calculation.
      • parent_meter_name: Name of the connection point meter object in the database.
    • Optional for the SPE object:

      • loss_curve_cs_cp: A dictionary containing the loss curve from CS to CP. Below there is an example of the dictionary:

        {
            "bin_left": [0.0, 500.0, 1000.0, 1500.0, 2000.0, ...],
            "value_min": [-0.02019978614757563, 0.006128181269567942, 0.006545559741312885, 0.00727279963180927, 0.00769050089538309, ...]
        }
        

        If this is not set the power at connection point will not be calculated when data from at least one of the meters is missing.

        This can be calculated using the script at performance server in folder manual_routines\postgres_update_electrical_losses.

      • loss_curve_asset_cp: A dictionary containing the loss curve from asset to CP. It is in the same format as loss_curve_cs_cp.

        If this is not set the power at connection point will not be calculated when data from a CS meter is missing (case when we need to rely on asset measurements).

        This can be calculated using the script at performance server in folder manual_routines\postgres_update_electrical_losses.

      • test_operation_date and commercial_operation_date: dates that define when a CS power meter will start being used for the calculation. If the date is not set, the meter will be used from the beginning of the data. This is useful because sometimes the data during commissioning is not reliable.

      • The following features for all meters involved in the calculation:
        • active_power: Active power in kW.
      • The following features for the SPE, in case we need to get asset data:
        • active_power_turbine_sum: Active power in kW from all turbines in the SPE.

Class Definition

FeatureCalcPowerCP(object_name, feature)

FeatureCalculator class used to calculate the power at the connection point for an SPE.

This will go through the following steps to try to calculate the power at the connection point. The steps are only used to fill the missing data from the previous step.

  1. Calculate using power from all CS meters and power of CP meter. This basically does a scaling of the power from the CS meters so that the sum of all CS meters is equal to the power of the CP meter.

    This step will be skipped for timestamps in the following cases:

    • The sum of all CS meters is lower than the power of the CP meter.
    • Data for any of the CS or CP meters is missing.
    • There is significant difference between the sum of all CS meters and the power of the CP meter. This usually happens if there is a CS power meter not configured in the performance_db.
  2. Calculate using power from the specific CS meter and loss curve from CS to CP.

  3. Calculate using power from assets (wind turbine or solar inverter) from the specific CS meter and loss curve from asset to CP.

For this calculation to work, the following object attributes must be defined:

  • Power meter:
    • power_meter_location: Must be collecting substation, others will be skipped.
    • parent_meter_name: Must be the name of a valid power meter at the connection point.
  • SPE:
    • loss_curve_cs_cp: Losses curve must be defined for the respective SPE for filling gaps using active power at CS and historical curve
    • loss_curve_asset_cp: Losses curve must be defined for the respective SPE for filling gaps using active power at asset and historical curve

Parameters:

  • object_name

    (str) –

    Name of the object for which the feature is calculated. It must exist in performance_db.

  • feature

    (str) –

    Feature of the object that is calculated. It must exist in performance_db.

Source code in echo_energycalc/feature_calc_electrical_loss.py
def __init__(
    self,
    object_name: str,
    feature: str,
) -> None:
    """
    FeatureCalculator class used to calculate the power at the connection point for an SPE.

    This will go through the following steps to try to calculate the power at the connection point. The steps are only used to fill the missing data from the previous step.

    1. Calculate using power from all CS meters and power of CP meter. This basically does a scaling of the power from the CS meters so that the sum of all CS meters is equal to the power of the CP meter.

        This step will be skipped for timestamps in the following cases:

        - The sum of all CS meters is lower than the power of the CP meter.
        - Data for any of the CS or CP meters is missing.
        - There is significant difference between the sum of all CS meters and the power of the CP meter. This usually happens if there is a CS power meter not configured in the performance_db.

    2. Calculate using power from the specific CS meter and loss curve from CS to CP.
    3. Calculate using power from assets (wind turbine or solar inverter) from the specific CS meter and loss curve from asset to CP.

    For this calculation to work, the following object attributes must be defined:

    - Power meter:
        - `power_meter_location`: Must be `collecting substation`, others will be skipped.
        - `parent_meter_name`: Must be the name of a valid power meter at the connection point.
    - SPE:
        - `loss_curve_cs_cp`: Losses curve must be defined for the respective SPE for filling gaps using active power at CS and historical curve
        - `loss_curve_asset_cp`: Losses curve must be defined for the respective SPE for filling gaps using active power at asset and historical curve

    Parameters
    ----------
    object_name : str
        Name of the object for which the feature is calculated. It must exist in performance_db.
    feature : str
        Feature of the object that is calculated. It must exist in performance_db.
    """
    # initialize parent class
    super().__init__(object_name, feature)

    # skipping calculation if current object is the secondary SMF
    if self.object.endswith("SMF2"):
        return

    # getting power meter location to be sure this is a power meter that needs to be calculated
    self._add_requirement(RequiredObjectAttributes({self.object: ["power_meter_location"]}))
    self._get_required_data()
    self._meter_location: str = self._get_requirement_data("RequiredObjectAttributes")[self.object]["power_meter_location"]
    if self._meter_location != "collecting substation":
        logger.warning(
            f"'{self.object}' - '{self.feature}': Skipping calculation as this power meter is not a 'collecting substation' meter. Please check 'power_meter_location' object attribute.",
        )
        return

    # defining attributes
    self._add_requirement(RequiredObjectAttributes({self.object: ["parent_meter_name"]}))
    self._get_required_data()
    self._cp_meter: str = self._get_requirement_data("RequiredObjectAttributes")[self.object]["parent_meter_name"]

    # getting collecting substation meters (the ones that share the same connection point meter)
    self._cs_meters = list(
        self._perfdb.objects.instances.get_ids(
            attributes={"parent_meter_name": self._cp_meter},
            object_types=["power_meter"],
        ).keys(),
    )

    # defining spe names (as most attributes are stored in the respective spes)
    # to do that we will remove the -SMF1 or -SMF2 from the end of the object name
    # split by "-" in reverse order and get the first element
    self._spe_name = self.object[::-1].split("-", maxsplit=1)[-1][::-1]
    # getting the loss curves
    self._add_requirement(
        RequiredObjectAttributes(
            {self._spe_name: ["loss_curve_cs_cp", "loss_curve_asset_cp"]},
            optional=True,
        ),
    )
    self._get_required_data()

    # adding start of operation dates as requirement
    self._add_requirement(
        RequiredObjectAttributes(
            {
                self._spe_name: ["test_operation_date", "commercial_operation_date"],
                self.object: ["test_operation_date", "commercial_operation_date"],
            },
            optional=True,
        ),
    )
    self._get_required_data()

    # defining the required features
    required_features = {
        self._cp_meter: ["ActivePower_5min.AVG"],
    }
    for meter in self._cs_meters:
        required_features[meter] = ["ActivePower_5min.AVG"]
    self._add_requirement(RequiredFeatures(required_features))

feature property

Feature that is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Name of the feature that is calculated.

name property

Name of the feature calculator. Is defined in child classes of FeatureCalculator.

This must be equal to the "server_calc_type" attribute of the feature in performance_db.

Returns:

  • str

    Name of the feature calculator.

object property

Object for which the feature is calculated. This will be defined in the constructor and cannot be changed.

Returns:

  • str

    Object name for which the feature is calculated.

requirements property

List of requirements of the feature calculator. Is defined in child classes of FeatureCalculator.

Returns:

  • dict[str, list[CalculationRequirement]]

    Dict of requirements.

    The keys are the names of the classes of the requirements and the values are lists of requirements of that class.

    For example: {"RequiredFeatures": [RequiredFeatures(...), RequiredFeatures(...)], "RequiredObjects": [RequiredObjects(...)]}

result property

Result of the calculation. This is None until the method "calculate" is called.

Returns:

  • Series | DataFrame | None:

    Result of the calculation if the method "calculate" was called. None otherwise.

calculate(period, save_into=None, cached_data=None, **kwargs)

Method that will calculate the feature.

Parameters:

  • period

    (DateTimeRange) –

    Period for which the feature will be calculated.

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • cached_data

    (DataFrame | None, default: None ) –

    DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient. By default None

  • **kwargs

    (dict, default: {} ) –

    Additional arguments that will be passed to the "save" method.

Returns:

  • Series

    Series with the calculated feature.

Source code in echo_energycalc/feature_calc_electrical_loss.py
def calculate(
    self,
    period: DateTimeRange,
    save_into: Literal["all", "performance_db"] | None = None,
    cached_data: DataFrame | None = None,
    **kwargs,
) -> Series:
    """
    Method that will calculate the feature.

    Parameters
    ----------
    period : DateTimeRange
        Period for which the feature will be calculated.
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    cached_data : DataFrame | None, optional
        DataFrame with features already queried/calculated. This is useful to avoid needing to query all the data again from performance_db, making chained calculations a lot more efficient.
        By default None
    **kwargs : dict, optional
        Additional arguments that will be passed to the "save" method.

    Returns
    -------
    Series
        Series with the calculated feature.
    """
    # skipping calculation if current object is the secondary SMF or if the meter is not a collecting substation meter
    if self.object.endswith("SMF2") or self._meter_location != "collecting substation":
        logger.info(f"'{self.object}' - '{self.feature}': Skipping calculation for SMF2 or non-CS meter.")
        return self._create_empty_result(
            period=period,
            result_type="DataFrame",
            columns=MultiIndex.from_product(
                [[self.object], [self.feature]],
                names=["object", "feature"],
            ),
        )
    # creating Series to store results
    result = self._create_empty_result(period=period, result_type="Series", freq="5min")

    # converting to NA enabled dtype
    result = result.astype("Float64")

    # getting feature values
    self._get_required_data(period=period, reindex="infer", cached_data=cached_data)

    df = self._get_requirement_data("RequiredFeatures").loc[:, IndexSlice[:, "ActivePower_5min.AVG"]].droplevel(1, axis=1)

    # defining all values to zero before start of operation
    # this is important so period where there is no production for an SPE are not considered as "missing data" if the SPE did not exist yet
    start_date = None
    # date from SPE
    if self.object.split("-")[0] in self._get_requirement_data("RequiredObjectAttributes"):
        if "test_operation_date" in self._get_requirement_data("RequiredObjectAttributes")[self.object.split("-")[0]]:
            start_date = self._get_requirement_data("RequiredObjectAttributes")[self.object.split("-")[0]]["test_operation_date"]
        elif "commercial_operation_date" in self._get_requirement_data("RequiredObjectAttributes")[self.object.split("-")[0]]:
            start_date = self._get_requirement_data("RequiredObjectAttributes")[self.object.split("-")[0]]["commercial_operation_date"]
    # date from SMF
    if self.object in self._get_requirement_data("RequiredObjectAttributes") and start_date is None:
        if "test_operation_date" in self._get_requirement_data("RequiredObjectAttributes")[self.object]:
            start_date = self._get_requirement_data("RequiredObjectAttributes")[self.object]["test_operation_date"]
        elif "commercial_operation_date" in self._get_requirement_data("RequiredObjectAttributes")[self.object]:
            start_date = self._get_requirement_data("RequiredObjectAttributes")[self.object]["commercial_operation_date"]
    # setting values to zero
    if start_date is not None:
        df.loc[df.index < start_date, self.object] = 0

    # getting sum of all cs meters when data from all cs meters is available
    all_cs_idx = df.loc[:, self._cs_meters].notna().all(axis=1).index
    df.loc[all_cs_idx, "sum_cs"] = df.loc[all_cs_idx, self._cs_meters].sum(axis=1)

    # checking if there are timestamps where there is negative loss (some data might be missing)
    # first checking when the power plant is producing (CP meter > 0) -> sum of CS should be higher than CP
    negative_loss_idx = df.index[(df["sum_cs"] < df[self._cp_meter]) & (df[self._cp_meter] > 0.0)]
    # then checking when the power plant is consuming (CP meter < 0) -> sum of CS should be lower than CP
    negative_loss_idx = negative_loss_idx.union(df.index[(df["sum_cs"] > df[self._cp_meter]) & (df[self._cp_meter] <= 0.0)])
    if len(negative_loss_idx) > 0:
        logger.warning(
            f"CP meter '{self._cs_meters}'. There are {len(negative_loss_idx)} timestamps where sum of -CS is lower than CP power. Please check if all CS meters are correctly defined in performance_db. If logger is set to DEBUG, the timestamps will be printed.",
        )
        logger.debug(f"Timestamps with negative loss: {negative_loss_idx.to_list()}")
    df.loc[negative_loss_idx, "sum_cs"] = pd.NA

    # calculating power @cp
    # * calculating based on CS meter and CP meter
    result.loc[df.index] = df[self._cp_meter] * (df[self.object] / df["sum_cs"].replace(0.0, pd.NA))

    # checking how many timestamps were calculated and how many are left
    last_calculated_idx = result[result.notna()].index
    missing_data_idx = result[result.isna()].index
    logger.info(
        f"'{self.object}' - '{self.feature}': Calculated {len(last_calculated_idx) / len(result):.2%} of timestamps using CS meter and CP meter.",
    )

    # * calculating based on CS meter and loss curve
    if (
        len(missing_data_idx) > 0
        and self._spe_name in self._get_requirement_data("RequiredObjectAttributes")
        and "loss_curve_cs_cp" in self._get_requirement_data("RequiredObjectAttributes")[self._spe_name]
    ):
        loss_curve = self._get_requirement_data("RequiredObjectAttributes")[self._spe_name]["loss_curve_cs_cp"]

        # getting losses curve as a function
        loss_curve_df = DataFrame.from_dict(loss_curve)
        loss_curve_df = loss_curve_df.dropna(subset=["bin_mean", "value_mean"], how="any")
        loss_fn = convert_curve_df_to_func(loss_curve_df, x_col="bin_mean", y_col="value_mean", extrapolate=True)

        # getting not NaN index as loss curve cannot be calculated for NaN values
        not_nan_idx = df[df[self.object].notna()].index
        wanted_idx = not_nan_idx.intersection(missing_data_idx)

        # calculating cp power based on cs power and loss curve
        result.loc[wanted_idx] = df.loc[wanted_idx, self.object].apply(lambda x: x * (1 - loss_fn(x)))

    # checking how many timestamps were calculated and how many are left
    last_calculated_idx = missing_data_idx.difference(result[result.isna()].index)
    missing_data_idx = result[result.isna()].index
    if len(last_calculated_idx) > 0:
        logger.info(
            f"'{self.object}' - '{self.feature}': Calculated {len(last_calculated_idx) / len(result):.2%} of timestamps using CS meter and loss curve.",
        )

    # * calculating based on asset power and loss curve
    if len(missing_data_idx) > 0:
        # adding sum of asset power as a requirement
        needed_period = DateTimeRange(missing_data_idx.min(), missing_data_idx.max())
        self._add_requirement(RequiredFeatures({self._spe_name: ["ActivePower_10min.AVG"]}, optional=True))
        self._get_required_data(period=needed_period, reindex="5min", cached_data=cached_data)

        # checking if found data
        if (self._spe_name, "ActivePower_10min.AVG") in self._get_requirement_data("RequiredFeatures").columns:
            asset_power = self._get_requirement_data("RequiredFeatures").loc[
                missing_data_idx,
                IndexSlice[self._spe_name, "ActivePower_10min.AVG"],
            ]
            # filling previous NaN timestamp with next one as asset_power is in 10 min basis and we need 5 min
            asset_power = asset_power.bfill(limit=1)

            # getting losses curve as a function
            if (
                self._spe_name in self._get_requirement_data("RequiredObjectAttributes")
                and "loss_curve_asset_cp" in self._get_requirement_data("RequiredObjectAttributes")[self._spe_name]
            ):
                loss_curve = self._get_requirement_data("RequiredObjectAttributes")[self._spe_name]["loss_curve_asset_cp"]

                # getting losses curve as a function
                loss_curve_df = DataFrame.from_dict(loss_curve)
                loss_curve_df = loss_curve_df.dropna(subset=["bin_mean", "value_mean"], how="any")
                loss_fn = convert_curve_df_to_func(loss_curve_df, x_col="bin_mean", y_col="value_mean", extrapolate=True)

                # getting not NaN index as loss curve cannot be calculated for NaN values
                not_nan_idx = asset_power[asset_power.notna()].index
                wanted_idx = not_nan_idx.intersection(missing_data_idx)

                # calculating cp power based on cs power and loss curve
                result.loc[wanted_idx] = asset_power.loc[wanted_idx].apply(lambda x: x * (1 - loss_fn(x)))

    # checking how many timestamps were calculated and how many are left
    last_calculated_idx = missing_data_idx.difference(result[result.isna()].index)
    missing_data_idx = result[result.isna()].index
    if len(last_calculated_idx) > 0:
        logger.info(
            f"'{self.object}' - '{self.feature}': Calculated {len(last_calculated_idx) / len(result):.2%} of timestamps using asset power and loss curve.",
        )

    # checking if result has NaNs
    missing_data_idx = result[result.isna()].index
    if len(missing_data_idx) > 0:
        logger.warning(
            f"'{self.object}' - '{self.feature}': There are {len(missing_data_idx)} timestamps ({len(missing_data_idx) / len(result):.2%}) that could not be calculated.",
        )

    # adding calculated feature to class result attribute
    self._result = result.copy()

    # saving results
    self.save(save_into=save_into, **kwargs)

    return result

save(save_into=None, **kwargs)

Method to save the calculated feature values in performance_db.

Parameters:

  • save_into

    (Literal['all', 'performance_db'] | None, default: None ) –

    Argument that will be passed to the method "save". The options are: - "all": The feature will be saved in performance_db and bazefield. - "performance_db": the feature will be saved only in performance_db. - None: The feature will not be saved.

    By default None.

  • **kwargs

    (dict, default: {} ) –

    Not being used at the moment. Here only for compatibility.

Source code in echo_energycalc/feature_calc_core.py
def save(
    self,
    save_into: Literal["all", "performance_db"] | None = None,
    **kwargs,  # noqa: ARG002
) -> None:
    """
    Method to save the calculated feature values in performance_db.

    Parameters
    ----------
    save_into : Literal["all", "performance_db"] | None, optional
        Argument that will be passed to the method "save". The options are:
        - "all": The feature will be saved in performance_db and bazefield.
        - "performance_db": the feature will be saved only in performance_db.
        - None: The feature will not be saved.

        By default None.
    **kwargs : dict, optional
        Not being used at the moment. Here only for compatibility.
    """
    # checking arguments
    if not isinstance(save_into, str | type(None)):
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}")
    if isinstance(save_into, str) and save_into not in ["all", "performance_db"]:
        raise ValueError(f"save_into must be 'all', 'performance_db' or None, not {save_into}")

    # checking if calculation was done
    if self.result is None:
        raise ValueError(
            "The calculation was not done. Cannot save the feature calculation results. Please make sure to do something like 'self._result = df[self.feature].copy()' in the method 'calculate' before calling 'self.save()'.",
        )

    if save_into is None:
        return

    if isinstance(save_into, str):
        if save_into not in ["performance_db", "all"]:
            raise ValueError(f"save_into must be 'performance_db' or 'all', not {save_into}.")
        upload_to_bazefield = save_into == "all"
    elif save_into is None:
        upload_to_bazefield = False
    else:
        raise TypeError(f"save_into must be a string or None, not {type(save_into)}.")

    # converting result series to DataFrame if needed
    if isinstance(self.result, Series):
        result_df = self.result.to_frame()
    elif isinstance(self.result, DataFrame):
        result_df = self.result.droplevel(0, axis=1)
    else:
        raise TypeError(f"result must be a pandas Series or DataFrame, not {type(self.result)}.")

    # adjusting DataFrame to be inserted in the database
    # making the columns a Multindex with levels object_name and feature_name
    result_df.columns = MultiIndex.from_product([[self.object], result_df.columns], names=["object_name", "feature_name"])

    self._perfdb.features.values.series.insert(
        df=result_df,
        on_conflict="update",
        bazefield_upload=upload_to_bazefield,
    )