Required Features¶
Overview¶
The RequiredFeatures class is a subclass of CalculationRequirement that is used to get feature values for specific objects and periods. This is one of the most important requirements, as most of the feature calculations will require some features to be present in the database before proceeding with the calculation.
Usage¶
This requirement can be instantiated with a list of features that need to be present for each object. Below there is an example of how to use this requirement:
requirement = RequiredFeatures(features={"SDM1-VRN1-01": ["wind_speed", "active_power"]})
After calling check and get_data methods, the data attribute of the requirement will be a DataFrame with the required features for the desired period. This DataFrame will have the timestamps as the index and the columns will be a MultiIndex with the object as the first level and the feature as the second level.
Database Requirements¶
This requirement expects that the features table is set with the necessary features for the model of the wanted object. Examples on how to create a feature are better described in the FeatureCalculator section.
To check if the features are set correctly, go to the v_features view in the database.
Class Definition¶
RequiredFeatures(features, optional=False)
¶
Subclass of CalculationRequirement that defines the features that are required for the calculation.
This will check the performance database for the existence of the required features for the wanted objects.
Parameters:
-
(features¶dict[str, list[str]]) –Features that are required for the calculation.
Should be in the format {object_name: [feature_name, ...], ...}.
-
(optional¶bool, default:False) –Set to True if this is an optional requirement. by default False
Source code in echo_energycalc/calculation_requirement_features.py
def __init__(
self,
features: dict[str, list[str]],
optional: bool = False,
) -> None:
"""
Constructor of the RequiredFeatures class.
This will check the performance database for the existence of the required features for the wanted objects.
Parameters
----------
features : dict[str, list[str]]
Features that are required for the calculation.
Should be in the format {object_name: [feature_name, ...], ...}.
optional : bool, optional
Set to True if this is an optional requirement. by default False
"""
super().__init__(optional)
self._validate_dict_of_lists(features, "features", key_type=str, item_type=str)
self._features = features
checked
property
¶
Attribute that defines if the requirement has been checked. It's value will start as False and will be set to True after the check method is called.
Returns:
-
bool–True if the requirement has been checked.
data
property
¶
Data required for the calculation.
Returns:
-
DataFrame | DataFrame–DataFrame with the required features for the desired period.
- pandas: columns are a MultiIndex with object name as first level and feature name as second level.
- polars: flat columns in the format
"object_name@feature_name"plus a"timestamp"column.
features
property
¶
Features that are required for the calculation.
Returns:
-
dict[str, list[str]]–Features that are required for the calculation in the format {object_name: [feature_name, ...], ...}.
fetched
property
¶
Attribute that defines if get_data() has been called on this requirement.
True even when the fetch returned no data (e.g. an optional requirement
that found nothing). Use this to distinguish "never fetched" from "fetched
but empty/None".
Returns:
-
bool–True if get_data() has been called at least once.
optional
property
¶
Attribute that defines if the requirement is optional.
If optional is True, the requirement is only validated to check if it could exist, not if it is actually present. This is useful for requirements that are not necessary for all calculations, but are useful for some of them.
Returns:
-
bool–True if the requirement is optional.
check()
¶
Check that the requirement is met.
This concrete implementation handles two concerns automatically so that
subclasses only need to implement _do_check():
- Already-checked guard — returns
Trueimmediately ifcheck()has already succeeded for this instance, avoiding redundant DB round-trips when_fetch_requirements()iterates requirements on every_compute()call. - Per-thread caching — when
_check_cache_key()returns a non-None key, the result produced by_do_check()is stored in a thread-local cache and reused by subsequent instances in the same thread with the same key. Because the cache is never shared across threads, no locking is needed and concurrent Polars operations inside_do_checkcannot deadlock.
The optional guard is intentionally delegated to _do_check() because
different subclasses have different optional semantics (see _do_check docs).
Returns:
-
bool–True if the requirement is met; raises on unmet non-optional requirements.
Source code in echo_energycalc/calculation_requirements_core.py
def check(self) -> bool:
"""
Check that the requirement is met.
This concrete implementation handles two concerns automatically so that
subclasses only need to implement ``_do_check()``:
1. **Already-checked guard** — returns ``True`` immediately if ``check()`` has
already succeeded for this instance, avoiding redundant DB round-trips when
``_fetch_requirements()`` iterates requirements on every ``_compute()`` call.
2. **Per-thread caching** — when ``_check_cache_key()`` returns a non-None key,
the result produced by ``_do_check()`` is stored in a thread-local cache and
reused by subsequent instances in the same thread with the same key. Because
the cache is never shared across threads, no locking is needed and concurrent
Polars operations inside ``_do_check`` cannot deadlock.
The **optional guard** is intentionally delegated to ``_do_check()`` because
different subclasses have different optional semantics (see ``_do_check`` docs).
Returns
-------
bool
True if the requirement is met; raises on unmet non-optional requirements.
"""
if self._checked:
return True
cache_key = self._check_cache_key()
if cache_key is not None:
_tl = type(self)._cache_local # noqa: SLF001
if not hasattr(_tl, "cache"):
_tl.cache = {}
cached = _tl.cache.get(cache_key)
if cached is None:
self._do_check()
_tl.cache[cache_key] = self._get_cache_value()
cached = _tl.cache[cache_key]
else:
logger.debug("Cache hit for %s (key=%s)", type(self).__name__, cache_key)
self._set_from_cache(cached)
else:
self._do_check()
self._checked = True
return True
get_data(period, reindex='infer', cached_data=None, round_timestamps=None, output_type='pl.DataFrame', **kwargs)
¶
Method used to get the data for the required features from performance_db.
All internal data handling is done with Polars for performance; the result is
converted to pandas at the end when output_type="DataFrame".
This will only get the features that are not present in the cached_data DataFrame.
Parameters:
-
(period¶DateTimeRange) –Desired period for the features.
-
(reindex¶str, default:'infer') –Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.
-
(cached_data¶DataFrame | None, default:None) –Cached data as a Polars DataFrame with flat
"object@feature"columns (the format returned byperfdb.features.values.series.get(output_type="pl.DataFrame")). Used to avoid re-querying performance_db for data already in memory. By default None. -
(round_timestamps¶RoundTimeStampsDict | None, default:None) –Dictionary used to round timestamps. Only applicable to bazefield features. By default None
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output format.
"pl.DataFrame"returns a Polars DataFrame with flat"object@feature"columns."DataFrame"returns a pandas DataFrame with a MultiIndex. By default"pl.DataFrame".
Returns:
-
DataFrame | DataFrame–DataFrame with the required features for the desired period.
Source code in echo_energycalc/calculation_requirement_features.py
@validate_call
def get_data(
self,
period: DateTimeRange,
reindex: str | None = "infer",
cached_data: pl.DataFrame | None = None,
round_timestamps: RoundTimeStampsDict | None = None,
output_type: Literal["DataFrame", "pl.DataFrame"] = "pl.DataFrame",
**kwargs, # noqa: ARG002
) -> DataFrame | pl.DataFrame:
"""
Method used to get the data for the required features from performance_db.
All internal data handling is done with Polars for performance; the result is
converted to pandas at the end when ``output_type="DataFrame"``.
This will only get the features that are not present in the cached_data DataFrame.
Parameters
----------
period : DateTimeRange
Desired period for the features.
reindex : str, optional
Frequency to reindex the required data. If set to None no reindexing is done. By default "infer" to infer the frequency from the data.
cached_data : pl.DataFrame | None, optional
Cached data as a Polars DataFrame with flat ``"object@feature"`` columns
(the format returned by ``perfdb.features.values.series.get(output_type="pl.DataFrame")``).
Used to avoid re-querying performance_db for data already in memory.
By default None.
round_timestamps : RoundTimeStampsDict | None, optional
Dictionary used to round timestamps. Only applicable to bazefield features.
By default None
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Output format. ``"pl.DataFrame"`` returns a Polars DataFrame with flat
``"object@feature"`` columns. ``"DataFrame"`` returns a pandas DataFrame with
a MultiIndex. By default ``"pl.DataFrame"``.
Returns
-------
DataFrame | pl.DataFrame
DataFrame with the required features for the desired period.
"""
# check if requirement has been checked
if not self._checked:
self.check()
try:
cached_pl: pl.DataFrame | None = cached_data if (cached_data is not None and not cached_data.is_empty()) else None
# ------------------------------------------------------------------
# Determine which features can be served from cache vs must be fetched
# ------------------------------------------------------------------
features_to_fetch = copy.deepcopy(self.features)
features_from_cache: dict[str, list[str]] = {}
if cached_pl is not None:
for obj in list(features_to_fetch.keys()):
for feat in list(features_to_fetch[obj]):
col = encode_col(obj, feat)
if col in cached_pl.columns:
non_null_ts = cached_pl.filter(pl.col(col).is_not_null()).select("timestamp")
if len(non_null_ts) > 0:
ts_min = non_null_ts["timestamp"].min()
ts_max = non_null_ts["timestamp"].max()
# period is covered by cache — skip DB fetch for this feature
if period.start >= ts_min and period.end <= ts_max:
features_to_fetch[obj].remove(feat)
features_from_cache.setdefault(obj, []).append(feat)
if not features_to_fetch[obj]:
del features_to_fetch[obj]
# ------------------------------------------------------------------
# Fetch uncovered features from databases (polars throughout)
# ------------------------------------------------------------------
fetched_pl: pl.DataFrame | None = None
if features_to_fetch:
baze_features = {obj: [f for f in feats if f.endswith("_b#")] for obj, feats in features_to_fetch.items()}
perfdb_features = {obj: [f for f in feats if not f.endswith("_b#")] for obj, feats in features_to_fetch.items()}
baze_features = {obj: feats for obj, feats in baze_features.items() if feats}
perfdb_features = {obj: feats for obj, feats in perfdb_features.items() if feats}
dfs_pl: list[pl.DataFrame] = []
if baze_features:
# Baze still returns pandas MultiIndex — convert to polars
baze_clean = {obj: [f.removesuffix("_b#") for f in feats] for obj, feats in baze_features.items()}
baze_pd = self._baze.points.values.series.get(
points=baze_clean,
period=period,
reindex=reindex,
round_timestamps=round_timestamps,
)
baze_pd.columns = MultiIndex.from_tuples([(col[0], col[1] + "_b#") for col in baze_pd.columns])
baze_pd.columns.names = ["object", "feature"]
dfs_pl.append(pandas_mi_to_polars(baze_pd))
if perfdb_features:
# Native polars output — columns already in "object@feature" format
perfdb_pl = self._perfdb.features.values.series.get(
features=perfdb_features,
period=period,
reindex=reindex,
output_type="pl.DataFrame",
)
dfs_pl.append(perfdb_pl)
if dfs_pl:
# Normalize timestamp precision to ms before joining — baze returns
# datetime[μs] while perfdb returns datetime[ms]; mismatched units
# cause a SchemaError on join.
dfs_pl = [df.with_columns(pl.col("timestamp").cast(pl.Datetime("ms"))) for df in dfs_pl]
fetched_pl = dfs_pl[0]
for extra_df in dfs_pl[1:]:
fetched_pl = fetched_pl.join(extra_df, on="timestamp", how="full", coalesce=True)
# ------------------------------------------------------------------
# Merge fetched data with cached slices
# ------------------------------------------------------------------
if cached_pl is not None and features_from_cache:
cache_cols = ["timestamp"] + [encode_col(obj, feat) for obj, feats in features_from_cache.items() for feat in feats]
cache_cols = [c for c in cache_cols if c in cached_pl.columns]
cached_slice_pl = cached_pl.select(cache_cols)
if fetched_pl is None:
main_pl = cached_slice_pl
else:
fresh_data_cols = set(fetched_pl.columns) - {"timestamp"}
cached_data_cols = set(cached_slice_pl.columns) - {"timestamp"}
intersection_cols = fresh_data_cols & cached_data_cols
main_pl = fetched_pl.join(cached_slice_pl, on="timestamp", how="full", coalesce=True, suffix="_cached")
# For columns present in both: prefer cached (non-null) over freshly fetched
for col in intersection_cols:
col_cached = f"{col}_cached"
if col_cached in main_pl.columns:
main_pl = main_pl.with_columns(
pl.coalesce([col_cached, col]).alias(col),
).drop(col_cached)
else:
main_pl = fetched_pl
# Empty result
if main_pl is None:
main_pl = pl.DataFrame({"timestamp": pl.Series([], dtype=pl.Datetime("ms"))})
# Sort by timestamp; sort data columns alphabetically
main_pl = main_pl.sort("timestamp")
data_cols = sorted(c for c in main_pl.columns if c != "timestamp")
main_pl = main_pl.select(["timestamp", *data_cols])
# Store as polars internally; convert to pandas if caller wants pandas
if output_type == "pl.DataFrame":
self._data = main_pl
else:
self._data = polars_to_pandas_mi(main_pl)
except Exception as e:
if self.optional:
self._data = None
else:
raise e
finally:
self._fetched = True
return self.data