|
| 1 | +# License: MIT |
| 2 | +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH |
| 3 | + |
| 4 | +"""Data processing utilities for microgrid energy reporting. |
| 5 | +
|
| 6 | +Overview: |
| 7 | +--------- |
| 8 | +This module provides reusable data transformation functions for generating |
| 9 | +energy reports from microgrid component data. It supports components such as |
| 10 | +photovoltaic systems (PV), batteries, and grid meters. |
| 11 | +
|
| 12 | +Functions in this module follow a modular, composable pipeline and are typically |
| 13 | +called in a sequence within reporting notebooks. Each function accepts structured |
| 14 | +data and configuration, then returns a modified DataFrame or summary statistics. |
| 15 | +
|
| 16 | +Core responsibilities: |
| 17 | +- Timezone localization to a unified timezone |
| 18 | +- Enrichment of data with derived metrics (e.g., PV self-consumption) |
| 19 | +- Renaming of technical column names based on configuration |
| 20 | +- Preparation of summary and analysis-ready DataFrames |
| 21 | +
|
| 22 | +Assumptions: |
| 23 | +------------ |
| 24 | +- The input DataFrame contains at minimum: |
| 25 | + - A timestamp column (`"timestamp"`) |
| 26 | + - Grid and consumption data columns (e.g., `"grid"`, `"consumption"`) |
| 27 | +- PV and battery metrics require additional fields |
| 28 | + (e.g., `"pv_neg"`, `"battery_pos"`) |
| 29 | +- The timestamp column must be timezone-aware or UTC-naive |
| 30 | +- A configuration object (`mcfg`) must provide `component_type_ids(...)` |
| 31 | +- `component_types` must be a list containing any of: |
| 32 | + `"grid"`, `"consumption"`, `"pv"`, `"battery"`, `"chp"`, `"ev"` |
| 33 | +
|
| 34 | +Outputs: |
| 35 | +-------- |
| 36 | +Each function returns one of: |
| 37 | +- A mutated or extended `pd.DataFrame` |
| 38 | +- A tuple of DataFrames for staged reporting |
| 39 | +- A dictionary of summary statistics |
| 40 | +- A long-format DataFrame for visualization or further analysis |
| 41 | +""" |
| 42 | + |
| 43 | + |
| 44 | +from typing import Any, Dict, List, Tuple |
| 45 | + |
| 46 | +import pandas as pd |
| 47 | + |
| 48 | +# Constants for column names |
| 49 | +TZ_NAME = "Europe/Berlin" |
| 50 | +COLUMN_TIMESTAMP = "timestamp" |
| 51 | +COLUMN_GRID = "grid" |
| 52 | +COLUMN_GRID_NAMED = "Netzanschluss" |
| 53 | +COLUMN_NET_IMPORT = "Netzbezug" |
| 54 | +COLUMN_CONSUMPTION = "consumption" |
| 55 | +COLUMN_CONSUMPTION_NAMED = "Brutto Gesamtverbrauch" |
| 56 | +COLUMN_BATTERY = "battery" |
| 57 | +COLUMN_BATTERY_NAMED = "Batterie Durchsatz" |
| 58 | +COLUMN_PV_PROD = "PV Produktion" |
| 59 | +COLUMN_PV_FEEDIN = "PV Einspeisung" |
| 60 | +COLUMN_PV_SELF = "PV Eigenverbrauch" |
| 61 | +COLUMN_PV_BAT = "PV in Batterie" |
| 62 | +COLUMN_PV_SHARE = "PV Eigenverbrauchsanteil" |
| 63 | + |
| 64 | + |
| 65 | +def _get_rename_map(component_types: List[str]) -> Dict[str, str]: |
| 66 | + """Return a mapping from raw column names to human-readable German names. |
| 67 | +
|
| 68 | + Args: |
| 69 | + component_types: List of component types such as 'pv', 'battery'. |
| 70 | +
|
| 71 | + Returns: |
| 72 | + A dictionary mapping internal column names to localized names. |
| 73 | + """ |
| 74 | + rename_map: Dict[str, str] = { |
| 75 | + COLUMN_TIMESTAMP: "Zeitpunkt", |
| 76 | + COLUMN_GRID: COLUMN_GRID_NAMED, |
| 77 | + COLUMN_CONSUMPTION: COLUMN_CONSUMPTION_NAMED, |
| 78 | + } |
| 79 | + |
| 80 | + if "battery" in component_types: |
| 81 | + rename_map[COLUMN_BATTERY] = COLUMN_BATTERY_NAMED |
| 82 | + |
| 83 | + if "pv" in component_types: |
| 84 | + rename_map.update( |
| 85 | + { |
| 86 | + "pv": "PV Durchsatz", |
| 87 | + "pv_prod": COLUMN_PV_PROD, |
| 88 | + "pv_self": COLUMN_PV_SELF, |
| 89 | + "pv_bat": COLUMN_PV_BAT, |
| 90 | + "pv_feedin": COLUMN_PV_FEEDIN, |
| 91 | + "pv_self_consumption_share": COLUMN_PV_SHARE, |
| 92 | + } |
| 93 | + ) |
| 94 | + |
| 95 | + return rename_map |
| 96 | + |
| 97 | + |
| 98 | +def convert_timezone(df: pd.DataFrame) -> pd.DataFrame: |
| 99 | + """Convert 'timestamp' column to Europe/Berlin timezone. |
| 100 | +
|
| 101 | + Args: |
| 102 | + df: DataFrame with a 'timestamp' column. |
| 103 | +
|
| 104 | + Returns: |
| 105 | + A copy of the DataFrame with localized timezone. |
| 106 | + """ |
| 107 | + df = df.copy() |
| 108 | + assert COLUMN_TIMESTAMP in df.columns, df |
| 109 | + if df[COLUMN_TIMESTAMP].dt.tz is None: |
| 110 | + df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_localize("UTC") |
| 111 | + df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_convert(TZ_NAME) |
| 112 | + return df |
| 113 | + |
| 114 | + |
| 115 | +def process_grid_data(df: pd.DataFrame) -> pd.DataFrame: |
| 116 | + """Add 'Netzbezug' column for positive grid consumption. |
| 117 | +
|
| 118 | + Args: |
| 119 | + df: DataFrame with grid consumption values. |
| 120 | +
|
| 121 | + Returns: |
| 122 | + DataFrame with an additional column for clipped grid import. |
| 123 | + """ |
| 124 | + df = df.copy() |
| 125 | + df[COLUMN_NET_IMPORT] = df[COLUMN_GRID_NAMED].clip(lower=0) |
| 126 | + return df |
| 127 | + |
| 128 | + |
| 129 | +def compute_pv_metrics(df: pd.DataFrame, component_types: List[str]) -> pd.DataFrame: |
| 130 | + """Compute PV-related metrics and add them to the DataFrame. |
| 131 | +
|
| 132 | + Args: |
| 133 | + df: Input DataFrame with PV and consumption data. |
| 134 | + component_types: List of present component types. |
| 135 | +
|
| 136 | + Returns: |
| 137 | + DataFrame with additional columns for PV metrics. |
| 138 | + """ |
| 139 | + df = df.copy() |
| 140 | + df["pv_prod"] = -df["pv_neg"] |
| 141 | + df["pv_excess"] = (df["pv_prod"] - df[COLUMN_CONSUMPTION]).clip(lower=0) |
| 142 | + if "battery" in component_types: |
| 143 | + df["pv_bat"] = df[["pv_excess", "battery_pos"]].min(axis=1) |
| 144 | + else: |
| 145 | + df["pv_bat"] = 0 |
| 146 | + df["pv_feedin"] = df["pv_excess"] - df["pv_bat"] |
| 147 | + df["pv_self"] = (df["pv_prod"] - df["pv_excess"]).clip(lower=0) |
| 148 | + df["pv_self_consumption_share"] = df["pv_self"] / df[COLUMN_CONSUMPTION].replace( |
| 149 | + 0, pd.NA |
| 150 | + ) |
| 151 | + return df |
| 152 | + |
| 153 | + |
| 154 | +def apply_renaming( |
| 155 | + df: pd.DataFrame, component_types: List[str], mcfg: Any |
| 156 | +) -> pd.DataFrame: |
| 157 | + """Apply full renaming: static columns and dynamic component columns. |
| 158 | +
|
| 159 | + Args: |
| 160 | + df: Input DataFrame. |
| 161 | + component_types: List of present component types. |
| 162 | + mcfg: Configuration object with component metadata. |
| 163 | +
|
| 164 | + Returns: |
| 165 | + DataFrame with renamed columns. |
| 166 | + """ |
| 167 | + df = df.copy() |
| 168 | + rename_map = _get_rename_map(component_types) |
| 169 | + |
| 170 | + single_comp = [col for col in df.columns if col.isdigit()] |
| 171 | + if "battery" in component_types: |
| 172 | + battery_ids = { |
| 173 | + str(i) |
| 174 | + for i in mcfg.component_type_ids( |
| 175 | + component_type="battery", component_category="meter" |
| 176 | + ) |
| 177 | + } |
| 178 | + rename_map.update( |
| 179 | + {col: f"Batterie #{col}" for col in single_comp if col in battery_ids} |
| 180 | + ) |
| 181 | + |
| 182 | + if "pv" in component_types: |
| 183 | + pv_ids = { |
| 184 | + str(i) |
| 185 | + for i in mcfg.component_type_ids( |
| 186 | + component_type="pv", component_category="meter" |
| 187 | + ) |
| 188 | + } |
| 189 | + rename_map.update({col: f"PV #{col}" for col in single_comp if col in pv_ids}) |
| 190 | + |
| 191 | + return df.rename(columns=rename_map) |
| 192 | + |
| 193 | + |
| 194 | +def prepare_reporting_dfs( |
| 195 | + df: pd.DataFrame, component_types: List[str], mcfg: Any |
| 196 | +) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| 197 | + """Create master and renamed DataFrames based on component types and config. |
| 198 | +
|
| 199 | + Args: |
| 200 | + df: Input data frame with raw microgrid data. |
| 201 | + component_types: List of component types present. |
| 202 | + mcfg: Configuration object with component metadata. |
| 203 | +
|
| 204 | + Returns: |
| 205 | + Tuple of (master DataFrame, fully renamed DataFrame). |
| 206 | + """ |
| 207 | + df = df.reset_index(drop=True) |
| 208 | + df = convert_timezone(df) |
| 209 | + df_renamed = apply_renaming(df, component_types, mcfg) |
| 210 | + df_renamed = process_grid_data(df_renamed) |
| 211 | + |
| 212 | + master_df = df_renamed[_get_master_columns(df_renamed.columns, component_types)] |
| 213 | + return master_df, df_renamed |
| 214 | + |
| 215 | + |
| 216 | +def _get_master_columns( |
| 217 | + columns: pd.Index[str], component_types: List[str] |
| 218 | +) -> List[str]: |
| 219 | + """Determine relevant columns for the master DataFrame based on component types. |
| 220 | +
|
| 221 | + Args: |
| 222 | + columns: List of column names. |
| 223 | + component_types: List of present component types. |
| 224 | +
|
| 225 | + Returns: |
| 226 | + List of relevant column names for the master DataFrame. |
| 227 | + """ |
| 228 | + cols = [ |
| 229 | + "Zeitpunkt", |
| 230 | + COLUMN_GRID_NAMED, |
| 231 | + COLUMN_NET_IMPORT, |
| 232 | + COLUMN_CONSUMPTION_NAMED, |
| 233 | + ] |
| 234 | + |
| 235 | + if "battery" in component_types: |
| 236 | + cols.append(COLUMN_BATTERY_NAMED) |
| 237 | + |
| 238 | + if "pv" in component_types: |
| 239 | + cols += [ |
| 240 | + "PV Durchsatz", |
| 241 | + COLUMN_PV_PROD, |
| 242 | + COLUMN_PV_SELF, |
| 243 | + COLUMN_PV_FEEDIN, |
| 244 | + ] |
| 245 | + if "battery" in component_types: |
| 246 | + cols += [COLUMN_PV_BAT, COLUMN_PV_SHARE] |
| 247 | + |
| 248 | + # Add individual component columns like "PV #1", "Batterie #3", etc. |
| 249 | + cols += [col for col in columns if "#" in col] |
| 250 | + |
| 251 | + return cols |
0 commit comments