Skip to content

Required Features

Overview

The RequiredFeatures class is a subclass of CalculationRequirement that is used to get feature values for specific objects and periods. This is one of the most important requirements, as most of the feature calculations will require some features to be present in the database before proceeding with the calculation.

Usage

This requirement can be instantiated with a list of features that need to be present for each object. Below there is an example of how to use this requirement:

Python
requirement = RequiredFeatures(features={"SDM1-VRN1-01": ["wind_speed", "active_power"]})

After calling check and get_data methods, the data attribute of the requirement will be a DataFrame with the required features for the desired period. This DataFrame will have the timestamps as the index and the columns will be a MultiIndex with the object as the first level and the feature as the second level.

Database Requirements

This requirement expects that the features table is set with the necessary features for the model of the wanted object. Examples on how to create a feature are better described in the FeatureCalculator section.

To check if the features are set correctly, go to the v_features view in the database.

Class Definition

RequiredFeatures(features, optional=False)

Subclass of CalculationRequirement that defines the features that are required for the calculation.

This will check the performance database for the existence of the required features for the wanted objects.

Parameters:

  • features

    (dict[str, list[str]]) –

    Features that are required for the calculation.

    Should be in the format {object_name: [feature_name, ...], ...}.

  • optional

    (bool, default: False ) –

    Set to True if this is an optional requirement. by default False

Source code in echo_energycalc/calculation_requirement_features.py
Python
def __init__(
    self,
    features: dict[str, list[str]],
    optional: bool = False,
) -> None:
    """
    Constructor of the RequiredFeatures class.

    This will check the performance database for the existence of the required features for the wanted objects.

    Parameters
    ----------
    features : dict[str, list[str]]
        Features that are required for the calculation.

        Should be in the format {object_name: [feature_name, ...], ...}.
    optional : bool, optional
        Set to True if this is an optional requirement. by default False
    """
    super().__init__(optional)

    self._validate_dict_of_lists(features, "features", key_type=str, item_type=str)

    self._features = features

checked property

Attribute that defines if the requirement has been checked. It's value will start as False and will be set to True after the check method is called.

Returns:

  • bool

    True if the requirement has been checked.

data property

Data required for the calculation.

Returns:

  • DataFrame | DataFrame

    DataFrame with the required features for the desired period.

    • pandas: columns are a MultiIndex with object name as first level and feature name as second level.
    • polars: flat columns in the format "object_name@feature_name" plus a "timestamp" column.

features property

Features that are required for the calculation.

Returns:

  • dict[str, list[str]]

    Features that are required for the calculation in the format {object_name: [feature_name, ...], ...}.

fetched property

Attribute that defines if get_data() has been called on this requirement.

True even when the fetch returned no data (e.g. an optional requirement that found nothing). Use this to distinguish "never fetched" from "fetched but empty/None".

Returns:

  • bool

    True if get_data() has been called at least once.

optional property

Attribute that defines if the requirement is optional.

If optional is True, the requirement is only validated to check if it could exist, not if it is actually present. This is useful for requirements that are not necessary for all calculations, but are useful for some of them.

Returns:

  • bool

    True if the requirement is optional.

check()

Check that the requirement is met.

This concrete implementation handles two concerns automatically so that subclasses only need to implement _do_check():

  1. Already-checked guard — returns True immediately if check() has already succeeded for this instance, avoiding redundant DB round-trips when _fetch_requirements() iterates requirements on every _compute() call.
  2. Per-thread caching — when _check_cache_key() returns a non-None key, the result produced by _do_check() is stored in a thread-local cache and reused by subsequent instances in the same thread with the same key. Because the cache is never shared across threads, no locking is needed and concurrent Polars operations inside _do_check cannot deadlock.

The optional guard is intentionally delegated to _do_check() because different subclasses have different optional semantics (see _do_check docs).

Returns:

  • bool

    True if the requirement is met; raises on unmet non-optional requirements.

Source code in echo_energycalc/calculation_requirements_core.py
Python
def check(self) -> bool:
    """
    Check that the requirement is met.

    This concrete implementation handles two concerns automatically so that
    subclasses only need to implement ``_do_check()``:

    1. **Already-checked guard** — returns ``True`` immediately if ``check()`` has
       already succeeded for this instance, avoiding redundant DB round-trips when
       ``_fetch_requirements()`` iterates requirements on every ``_compute()`` call.
    2. **Per-thread caching** — when ``_check_cache_key()`` returns a non-None key,
       the result produced by ``_do_check()`` is stored in a thread-local cache and
       reused by subsequent instances in the same thread with the same key. Because
       the cache is never shared across threads, no locking is needed and concurrent
       Polars operations inside ``_do_check`` cannot deadlock.

    The **optional guard** is intentionally delegated to ``_do_check()`` because
    different subclasses have different optional semantics (see ``_do_check`` docs).

    Returns
    -------
    bool
        True if the requirement is met; raises on unmet non-optional requirements.
    """
    if self._checked:
        return True

    cache_key = self._check_cache_key()

    if cache_key is not None:
        _tl = type(self)._cache_local  # noqa: SLF001
        if not hasattr(_tl, "cache"):
            _tl.cache = {}
        cached = _tl.cache.get(cache_key)
        if cached is None:
            self._do_check()
            _tl.cache[cache_key] = self._get_cache_value()
            cached = _tl.cache[cache_key]
        else:
            logger.debug("Cache hit for %s (key=%s)", type(self).__name__, cache_key)
        self._set_from_cache(cached)
    else:
        self._do_check()

    self._checked = True
    return True

get_data(period, reindex='infer', cached_data=None, round_timestamps=None, output_type='pl.DataFrame', **kwargs)

Method used to get the data for the required features from performance_db.

All internal data handling is done with Polars for performance; the result is converted to pandas at the end when output_type="DataFrame".

This will only get the features that are not present in the cached_data DataFrame.

Parameters:

  • period

    (DateTimeRange) –

    Desired period for the features.

  • reindex

    (str, default: 'infer' ) –

    Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.

  • cached_data

    (DataFrame | None, default: None ) –

    Cached data as a Polars DataFrame with flat "object@feature" columns (the format returned by perfdb.features.values.series.get(output_type="pl.DataFrame")). Used to avoid re-querying performance_db for data already in memory. By default None.

  • round_timestamps

    (RoundTimeStampsDict | None, default: None ) –

    Dictionary used to round timestamps. Only applicable to bazefield features. By default None

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output format. "pl.DataFrame" returns a Polars DataFrame with flat "object@feature" columns. "DataFrame" returns a pandas DataFrame with a MultiIndex. By default "pl.DataFrame".

Returns:

  • DataFrame | DataFrame

    DataFrame with the required features for the desired period.

Source code in echo_energycalc/calculation_requirement_features.py
Python
@validate_call
def get_data(
    self,
    period: DateTimeRange,
    reindex: str | None = "infer",
    cached_data: pl.DataFrame | None = None,
    round_timestamps: RoundTimeStampsDict | None = None,
    output_type: Literal["DataFrame", "pl.DataFrame"] = "pl.DataFrame",
    **kwargs,  # noqa: ARG002
) -> DataFrame | pl.DataFrame:
    """
    Method used to get the data for the required features from performance_db.

    All internal data handling is done with Polars for performance; the result is
    converted to pandas at the end when ``output_type="DataFrame"``.

    This will only get the features that are not present in the cached_data DataFrame.

    Parameters
    ----------
    period : DateTimeRange
        Desired period for the features.
    reindex : str, optional
        Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.
    cached_data : pl.DataFrame | None, optional
        Cached data as a Polars DataFrame with flat ``"object@feature"`` columns
        (the format returned by ``perfdb.features.values.series.get(output_type="pl.DataFrame")``).
        Used to avoid re-querying performance_db for data already in memory.
        By default None.
    round_timestamps : RoundTimeStampsDict | None, optional
        Dictionary used to round timestamps. Only applicable to bazefield features.
        By default None
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Output format. ``"pl.DataFrame"`` returns a Polars DataFrame with flat
        ``"object@feature"`` columns. ``"DataFrame"`` returns a pandas DataFrame with
        a MultiIndex. By default ``"pl.DataFrame"``.

    Returns
    -------
    DataFrame | pl.DataFrame
        DataFrame with the required features for the desired period.
    """
    # check if requirement has been checked
    if not self._checked:
        self.check()

    try:
        cached_pl: pl.DataFrame | None = cached_data if (cached_data is not None and not cached_data.is_empty()) else None

        # ------------------------------------------------------------------
        # Determine which features can be served from cache vs must be fetched
        # ------------------------------------------------------------------
        features_to_fetch = copy.deepcopy(self.features)
        features_from_cache: dict[str, list[str]] = {}

        if cached_pl is not None:
            for obj in list(features_to_fetch.keys()):
                for feat in list(features_to_fetch[obj]):
                    col = encode_col(obj, feat)
                    if col in cached_pl.columns:
                        non_null_ts = cached_pl.filter(pl.col(col).is_not_null()).select("timestamp")
                        if len(non_null_ts) > 0:
                            ts_min = non_null_ts["timestamp"].min()
                            ts_max = non_null_ts["timestamp"].max()
                            # period is covered by cache — skip DB fetch for this feature
                            if period.start >= ts_min and period.end <= ts_max:
                                features_to_fetch[obj].remove(feat)
                                features_from_cache.setdefault(obj, []).append(feat)
                if not features_to_fetch[obj]:
                    del features_to_fetch[obj]

        # ------------------------------------------------------------------
        # Fetch uncovered features from databases (polars throughout)
        # ------------------------------------------------------------------
        fetched_pl: pl.DataFrame | None = None

        if features_to_fetch:
            baze_features = {obj: [f for f in feats if f.endswith("_b#")] for obj, feats in features_to_fetch.items()}
            perfdb_features = {obj: [f for f in feats if not f.endswith("_b#")] for obj, feats in features_to_fetch.items()}
            baze_features = {obj: feats for obj, feats in baze_features.items() if feats}
            perfdb_features = {obj: feats for obj, feats in perfdb_features.items() if feats}

            dfs_pl: list[pl.DataFrame] = []

            if baze_features:
                # Baze still returns pandas MultiIndex — convert to polars
                baze_clean = {obj: [f.removesuffix("_b#") for f in feats] for obj, feats in baze_features.items()}
                baze_pd = self._baze.points.values.series.get(
                    points=baze_clean,
                    period=period,
                    reindex=reindex,
                    round_timestamps=round_timestamps,
                )
                baze_pd.columns = MultiIndex.from_tuples([(col[0], col[1] + "_b#") for col in baze_pd.columns])
                baze_pd.columns.names = ["object", "feature"]
                dfs_pl.append(pandas_mi_to_polars(baze_pd))

            if perfdb_features:
                # Native polars output — columns already in "object@feature" format
                perfdb_pl = self._perfdb.features.values.series.get(
                    features=perfdb_features,
                    period=period,
                    reindex=reindex,
                    output_type="pl.DataFrame",
                )
                dfs_pl.append(perfdb_pl)

            if dfs_pl:
                # Normalize timestamp precision to ms before joining — baze returns
                # datetime[μs] while perfdb returns datetime[ms]; mismatched units
                # cause a SchemaError on join.
                dfs_pl = [df.with_columns(pl.col("timestamp").cast(pl.Datetime("ms"))) for df in dfs_pl]
                fetched_pl = dfs_pl[0]
                for extra_df in dfs_pl[1:]:
                    fetched_pl = fetched_pl.join(extra_df, on="timestamp", how="full", coalesce=True)

        # ------------------------------------------------------------------
        # Merge fetched data with cached slices
        # ------------------------------------------------------------------
        if cached_pl is not None and features_from_cache:
            cache_cols = ["timestamp"] + [encode_col(obj, feat) for obj, feats in features_from_cache.items() for feat in feats]
            cache_cols = [c for c in cache_cols if c in cached_pl.columns]
            cached_slice_pl = cached_pl.select(cache_cols)

            if fetched_pl is None:
                main_pl = cached_slice_pl
            else:
                fresh_data_cols = set(fetched_pl.columns) - {"timestamp"}
                cached_data_cols = set(cached_slice_pl.columns) - {"timestamp"}
                intersection_cols = fresh_data_cols & cached_data_cols

                main_pl = fetched_pl.join(cached_slice_pl, on="timestamp", how="full", coalesce=True, suffix="_cached")

                # For columns present in both: prefer cached (non-null) over freshly fetched
                for col in intersection_cols:
                    col_cached = f"{col}_cached"
                    if col_cached in main_pl.columns:
                        main_pl = main_pl.with_columns(
                            pl.coalesce([col_cached, col]).alias(col),
                        ).drop(col_cached)
        else:
            main_pl = fetched_pl

        # Empty result
        if main_pl is None:
            main_pl = pl.DataFrame({"timestamp": pl.Series([], dtype=pl.Datetime("ms"))})

        # Sort by timestamp; sort data columns alphabetically
        main_pl = main_pl.sort("timestamp")
        data_cols = sorted(c for c in main_pl.columns if c != "timestamp")
        main_pl = main_pl.select(["timestamp", *data_cols])

        # Store as polars internally; convert to pandas if caller wants pandas
        if output_type == "pl.DataFrame":
            self._data = main_pl
        else:
            self._data = polars_to_pandas_mi(main_pl)

    except Exception as e:
        if self.optional:
            self._data = None
        else:
            raise e

    finally:
        self._fetched = True

    return self.data