|
| 1 | +# License: MIT |
| 2 | +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH |
| 3 | +"""Helper function for Microgrid Data Processing Utilities. |
| 4 | +
|
| 5 | +This module provides utility functions for preprocessing and analyzing microgrid |
| 6 | +data represented in pandas DataFrames. It standardizes column names, handles |
| 7 | +timezone conversions, computes grid imports, derives photovoltaic (PV) energy flows, |
| 8 | +and renames component-specific columns based on a MicrogridConfig. |
| 9 | +
|
| 10 | +Key Features |
| 11 | +------------ |
| 12 | +- Timezone Conversion |
| 13 | + Ensures all timestamps are consistently localized |
| 14 | + (default: UTC → Europe/Berlin). |
| 15 | +
|
| 16 | +- Grid Data Processing |
| 17 | + Extracts net grid import by filtering positive values |
| 18 | + from grid connection signals. |
| 19 | +
|
| 20 | +- PV Energy Flow Calculations |
| 21 | + Derives PV production, excess, self-consumption, battery charging, and |
| 22 | + grid feed-in metrics, including PV self-consumption share. |
| 23 | +
|
| 24 | +- Component Renaming |
| 25 | + Maps numeric string component IDs to human-readable labels |
| 26 | + (e.g., "Battery #14", "PV #7") using the provided MicrogridConfig. |
| 27 | +
|
| 28 | +- Reporting Column Assembly |
| 29 | + Builds the column sets required for downstream energy reports |
| 30 | + based on the available component types. |
| 31 | +
|
| 32 | +Usage |
| 33 | +----- |
| 34 | +These functions serve as building blocks for energy reporting, data pipelines, |
| 35 | +and dashboards that analyze microgrid performance, particularly in hybrid systems |
| 36 | +with PV, batteries, and grid interactions. |
| 37 | +""" |
| 38 | + |
| 39 | +from typing import Any, Dict, List, Tuple |
| 40 | + |
| 41 | +import pandas as pd |
| 42 | +import yaml |
| 43 | + |
| 44 | +from frequenz.data.microgrid.config import MicrogridConfig |
| 45 | + |
| 46 | + |
| 47 | +def load_config(path: str) -> Dict[str, Any]: |
| 48 | + """ |
| 49 | + Load a YAML config file and return it as a dictionary. |
| 50 | +
|
| 51 | + Args: |
| 52 | + path: Path to the YAML file. |
| 53 | +
|
| 54 | + Returns: |
| 55 | + Configuration values as a dictionary. |
| 56 | +
|
| 57 | + Raises: |
| 58 | + TypeError: If the YAML root element is not a mapping (dict). |
| 59 | + """ |
| 60 | + with open(path, "r", encoding="utf-8") as f: |
| 61 | + data = yaml.safe_load(f) |
| 62 | + |
| 63 | + if not isinstance(data, dict): |
| 64 | + raise TypeError("YAML root must be a mapping (dict).") |
| 65 | + |
| 66 | + return data |
| 67 | + |
| 68 | + |
| 69 | +def _fmt_de(x: float) -> str: |
| 70 | + """Format a number using German-style decimal and thousands separators. |
| 71 | +
|
| 72 | + The function formats the number with two decimal places, using a comma |
| 73 | + as the decimal separator and a dot as the thousands separator. |
| 74 | +
|
| 75 | + Args: |
| 76 | + x: The number to format. |
| 77 | +
|
| 78 | + Returns: |
| 79 | + The formatted string with German number formatting applied. |
| 80 | +
|
| 81 | + Example: |
| 82 | + >>> _fmt_de(12345.6789) |
| 83 | + '12.345,68' |
| 84 | + """ |
| 85 | + return f"{x:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") |
| 86 | + |
| 87 | + |
| 88 | +def _convert_timezone( |
| 89 | + df: pd.DataFrame, |
| 90 | + column_timestamp: str, |
| 91 | + target_tz: str = "Europe/Berlin", |
| 92 | + assume_tz: str = "UTC", |
| 93 | +) -> pd.DataFrame: |
| 94 | + """Convert a datetime column in a DataFrame to a target timezone. |
| 95 | +
|
| 96 | + If the column contains timezone-naive datetimes, they are first localized to |
| 97 | + ``assume_tz`` before being converted to ``target_tz``. |
| 98 | +
|
| 99 | + Args: |
| 100 | + df: Input DataFrame containing the datetime column. |
| 101 | + column_timestamp: Name of the datetime column in ``df`` to convert. |
| 102 | + target_tz: Timezone name to convert the column to. |
| 103 | + Defaults to ``"Europe/Berlin"``. |
| 104 | + assume_tz: Timezone to assume for naive datetimes. |
| 105 | + Defaults to ``"UTC"``. |
| 106 | +
|
| 107 | + Returns: |
| 108 | + pd.DataFrame: A copy of the DataFrame with the converted datetime column. |
| 109 | +
|
| 110 | + Raises: |
| 111 | + ValueError: If ``column_timestamp`` is not present in ``df``. |
| 112 | + """ |
| 113 | + if column_timestamp not in df: |
| 114 | + raise ValueError(f"{column_timestamp} column not in df") |
| 115 | + |
| 116 | + ts = df[column_timestamp] |
| 117 | + |
| 118 | + if ts.dt.tz is None: |
| 119 | + # Assume naïve datetimes are in `assume_tz` |
| 120 | + ts = ts.dt.tz_localize(assume_tz) |
| 121 | + |
| 122 | + df[column_timestamp] = ts.dt.tz_convert(target_tz) |
| 123 | + return df |
| 124 | + |
| 125 | + |
| 126 | +def add_net_grid_import( |
| 127 | + df: pd.DataFrame, |
| 128 | + column_grid: str, |
| 129 | + column_net_import: str, |
| 130 | +) -> pd.DataFrame: |
| 131 | + """Calculate grid consumption and add it as ``column_net_import``. |
| 132 | +
|
| 133 | + Grid consumption is defined as the positive part of ``column_grid``. |
| 134 | + Negative values are replaced with 0. |
| 135 | +
|
| 136 | + Args: |
| 137 | + df: Input DataFrame containing the grid data. |
| 138 | + column_grid: Name of the column in ``df`` that contains grid values. |
| 139 | + column_net_import: Name of the output column to store the computed |
| 140 | + net import values. |
| 141 | +
|
| 142 | + Returns: |
| 143 | + pd.DataFrame: The DataFrame with a new or updated ``column_net_import`` column. |
| 144 | +
|
| 145 | + Raises: |
| 146 | + ValueError: If ``column_grid`` is not present in ``df``. |
| 147 | + """ |
| 148 | + if column_grid not in df: |
| 149 | + raise ValueError(f"{column_grid} column not in df") |
| 150 | + |
| 151 | + df[column_net_import] = df[column_grid].apply(lambda x: x if x > 0 else 0) |
| 152 | + return df |
| 153 | + |
| 154 | + |
| 155 | +# pylint: disable=too-many-arguments, too-many-positional-arguments |
| 156 | +def label_component_columns( |
| 157 | + df: pd.DataFrame, |
| 158 | + mcfg: MicrogridConfig, |
| 159 | + column_battery: str = "battery", |
| 160 | + column_pv: str = "pv", |
| 161 | + column_chp: str = "chp", |
| 162 | + column_ev: str = "ev", |
| 163 | +) -> Tuple[pd.DataFrame, List[str]]: |
| 164 | + """Rename numeric single-component columns to labeled names. |
| 165 | +
|
| 166 | + Numeric string column names like ``"14"`` are converted to |
| 167 | + ``"Battery #14"``, ``"PV #14"``, ``"CHP #14"`` or ``"EV #14"`` based on |
| 168 | + the component IDs provided by ``mcfg.component_type_ids(...)`` |
| 169 | +
|
| 170 | + Args: |
| 171 | + df: Input DataFrame with numeric string column names. |
| 172 | + mcfg: Configuration with ``_component_types_cfg`` mapping component types to a |
| 173 | + ``meter`` iterable of numeric IDs. |
| 174 | + column_battery: Key name for battery component type. |
| 175 | + column_pv: Key name for PV component type. |
| 176 | + column_chp: Key name for CHP component type. |
| 177 | + column_ev: Key name for EV component type |
| 178 | + Returns: |
| 179 | + Tuple containing the renamed DataFrame and the list of applied labels |
| 180 | + """ |
| 181 | + # Numeric component columns present in df |
| 182 | + single_components = [str(c) for c in df.columns if str(c).isdigit()] |
| 183 | + available_types = set(mcfg.component_types()) |
| 184 | + |
| 185 | + # From config (empty set if missing) |
| 186 | + def ids_if_available(t: str) -> set[str]: |
| 187 | + return ( |
| 188 | + {str(x) for x in mcfg.component_type_ids(t)} |
| 189 | + if t in available_types |
| 190 | + else set() |
| 191 | + ) |
| 192 | + |
| 193 | + battery_ids = ids_if_available(column_battery) |
| 194 | + pv_ids = ids_if_available(column_pv) |
| 195 | + chp_ids = ids_if_available(column_chp) |
| 196 | + ev_ids = ids_if_available(column_ev) |
| 197 | + |
| 198 | + rename: Dict[str, str] = {} |
| 199 | + rename.update( |
| 200 | + { |
| 201 | + c: f"{column_battery.capitalize()} #{c}" |
| 202 | + for c in single_components |
| 203 | + if c in battery_ids |
| 204 | + } |
| 205 | + ) |
| 206 | + rename.update( |
| 207 | + {c: f"{column_pv.upper()} #{c}" for c in single_components if c in pv_ids} |
| 208 | + ) |
| 209 | + rename.update( |
| 210 | + {c: f"{column_ev.upper()} #{c}" for c in single_components if c in ev_ids} |
| 211 | + ) |
| 212 | + rename.update( |
| 213 | + {c: f"{column_chp.upper()} #{c}" for c in single_components if c in chp_ids} |
| 214 | + ) |
| 215 | + |
| 216 | + return df.rename(columns=rename), list(rename.values()) |
| 217 | + |
| 218 | + |
| 219 | +def _add_pv_energy_flows(df: pd.DataFrame) -> pd.DataFrame: |
| 220 | + """Add PV-related energy flow columns to ``df`` if PV data is present. |
| 221 | +
|
| 222 | + Derives photovoltaic (PV) energy-flow metrics from existing columns. If no PV |
| 223 | + signal is present (i.e., the negative PV column is missing or all zeros), the |
| 224 | + DataFrame is returned unchanged. |
| 225 | +
|
| 226 | + Args: |
| 227 | + df: Input DataFrame. If present, uses columns ``pv_neg``, |
| 228 | + ``consumption``, and ``COLUMN_BATTERY_POS``. Missing columns are |
| 229 | + treated as zeros. |
| 230 | +
|
| 231 | + Returns: |
| 232 | + The DataFrame with added PV flow columns (or unchanged if no PV signal). |
| 233 | +
|
| 234 | + Notes: |
| 235 | + Newly created/updated columns: |
| 236 | + - ``COLUMN_PV_PROD``: PV production as a positive series (negated/clipped from |
| 237 | + ``pv_neg``). |
| 238 | + - ``COLUMN_PV_EXCESS``: Excess PV after subtracting household consumption. |
| 239 | + - ``COLUMN_PV_BAT``: Portion of PV excess routed into the battery (bounded by |
| 240 | + battery charge). |
| 241 | + - ``COLUMN_PV_FEEDIN``: PV fed into the grid after battery charging. |
| 242 | + - ``COLUMN_PV_SELF``: Self-consumed PV (production minus excess). |
| 243 | + - ``COLUMN_PV_SHARE``: Share of consumption covered by self-consumed PV (NaN |
| 244 | + when consumption is 0). |
| 245 | + """ |
| 246 | + # Safe inputs (0 if missing) |
| 247 | + df_with_pv_flows = df.copy() |
| 248 | + zeros = pd.Series(0, index=df_with_pv_flows.index) |
| 249 | + pv_neg = df_with_pv_flows.get("pv_neg", zeros) |
| 250 | + consumption = df_with_pv_flows.get("consumption", zeros) |
| 251 | + battery_pos = df_with_pv_flows.get("battery_pos", zeros) |
| 252 | + |
| 253 | + # Only compute PV features if there is any PV signal |
| 254 | + has_pv = isinstance(pv_neg, pd.Series) and (pv_neg != 0).any() |
| 255 | + if not has_pv: |
| 256 | + return df_with_pv_flows |
| 257 | + |
| 258 | + df_with_pv_flows["pv_prod"] = (-pv_neg).clip(lower=0) |
| 259 | + df_with_pv_flows["pv_excess"] = (df_with_pv_flows["pv_prod"] - consumption).clip( |
| 260 | + lower=0 |
| 261 | + ) |
| 262 | + |
| 263 | + # This naturally becomes 0 when there's no battery_pos column |
| 264 | + df_with_pv_flows["pv_bat"] = pd.concat( |
| 265 | + [df_with_pv_flows["pv_excess"], battery_pos], axis=1 |
| 266 | + ).min(axis=1) |
| 267 | + |
| 268 | + df_with_pv_flows["pv_feedin"] = ( |
| 269 | + df_with_pv_flows["pv_excess"] - df_with_pv_flows["pv_bat"] |
| 270 | + ) |
| 271 | + df_with_pv_flows["pv_self"] = ( |
| 272 | + df_with_pv_flows["pv_prod"] - df_with_pv_flows["pv_excess"] |
| 273 | + ).clip(lower=0) |
| 274 | + |
| 275 | + denom = consumption.replace(0, pd.NA) |
| 276 | + df_with_pv_flows["pv_share"] = df_with_pv_flows["pv_self"] / denom |
| 277 | + |
| 278 | + return df_with_pv_flows |
| 279 | + |
| 280 | + |
| 281 | +def get_energy_report_columns( |
| 282 | + component_types: List[str], single_components: List[str] |
| 283 | +) -> List[str]: |
| 284 | + """Build the list of dataframe columns for the energy report. |
| 285 | +
|
| 286 | + The selected columns depend on the available component types. |
| 287 | +
|
| 288 | + Args: |
| 289 | + component_types: List of component types (e.g. ["pv", "battery"]) |
| 290 | + single_components: Extra component columns to always include. |
| 291 | +
|
| 292 | + Returns: |
| 293 | + The full list of dataframe columns. |
| 294 | + """ |
| 295 | + # Base columns |
| 296 | + energy_report_df_cols = [ |
| 297 | + "timestamp", |
| 298 | + "grid", |
| 299 | + "net_import", |
| 300 | + "net_consumption", |
| 301 | + ] + single_components |
| 302 | + |
| 303 | + # Map component types to the columns they enable |
| 304 | + component_column_map = { |
| 305 | + "battery": ["battery_throughput"], |
| 306 | + "pv": [ |
| 307 | + "pv_throughput", |
| 308 | + "pv_prod", |
| 309 | + "pv_self", |
| 310 | + "pv_feedin", |
| 311 | + ], |
| 312 | + } |
| 313 | + |
| 314 | + # Define columns that require both PV and Battery |
| 315 | + pv_battery_cols = [ |
| 316 | + "pv_in_bat", |
| 317 | + "pv_share", |
| 318 | + ] |
| 319 | + |
| 320 | + # Add component-specific columns |
| 321 | + for component, columns in component_column_map.items(): |
| 322 | + if component in component_types: |
| 323 | + energy_report_df_cols.extend(columns) |
| 324 | + |
| 325 | + # Add combined PV + Battery columns |
| 326 | + if "pv" in component_types and "battery" in component_types: |
| 327 | + energy_report_df_cols.extend(pv_battery_cols) |
| 328 | + |
| 329 | + return energy_report_df_cols |
0 commit comments