|
| 1 | +# License: MIT |
| 2 | +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH |
| 3 | + |
| 4 | +"""Data processing functions for the reporting module.""" |
| 5 | + |
| 6 | +from datetime import datetime |
| 7 | +from typing import Any, Dict, List, Tuple, Union |
| 8 | +from zoneinfo import ZoneInfo |
| 9 | + |
| 10 | +import numpy as np |
| 11 | +import pandas as pd |
| 12 | + |
| 13 | + |
| 14 | +def convert_timezone(df: pd.DataFrame) -> pd.DataFrame: |
| 15 | + """Convert 'timestamp' column to Europe/Berlin timezone.""" |
| 16 | + assert "timestamp" in df.columns, df |
| 17 | + if df["timestamp"].dt.tz is None: |
| 18 | + df["timestamp"] = df["timestamp"].dt.tz_localize("UTC") |
| 19 | + df["timestamp"] = df["timestamp"].dt.tz_convert("Europe/Berlin") |
| 20 | + return df |
| 21 | + |
| 22 | + |
| 23 | +def process_grid_data(df: pd.DataFrame) -> pd.DataFrame: |
| 24 | + """Add 'Netzbezug' column for positive grid consumption.""" |
| 25 | + df["Netzbezug"] = df["Netzanschluss"].apply(lambda x: x if x > 0 else 0) |
| 26 | + return df |
| 27 | + |
| 28 | + |
| 29 | +def compute_pv_metrics(df: pd.DataFrame, component_types: List[str]) -> pd.DataFrame: |
| 30 | + """Compute PV-related metrics and add them to the DataFrame.""" |
| 31 | + df["pv_prod"] = -df["pv_neg"] |
| 32 | + df["pv_excess"] = (df["pv_prod"] - df["consumption"]).clip(lower=0) |
| 33 | + if "battery" in component_types: |
| 34 | + df["pv_bat"] = df[["pv_excess", "battery_pos"]].min(axis=1) |
| 35 | + else: |
| 36 | + df["pv_bat"] = 0 |
| 37 | + df["pv_feedin"] = df["pv_excess"] - df["pv_bat"] |
| 38 | + df["pv_self"] = (df["pv_prod"] - df["pv_excess"]).clip(lower=0) |
| 39 | + df["pv_self_consumption_share"] = df["pv_self"] / df["consumption"].replace( |
| 40 | + 0, pd.NA |
| 41 | + ) |
| 42 | + return df |
| 43 | + |
| 44 | + |
| 45 | +def rename_component_columns( |
| 46 | + df: pd.DataFrame, component_types: List[str], mcfg: Any |
| 47 | +) -> pd.DataFrame: |
| 48 | + """Rename component columns based on configuration.""" |
| 49 | + single_comp = [col for col in df.columns if col.isdigit()] |
| 50 | + rename_comp: Dict[str, str] = {} |
| 51 | + if "battery" in component_types: |
| 52 | + # pylint: disable=protected-access |
| 53 | + battery_ids = {str(i) for i in mcfg._component_types_cfg["battery"].meter} |
| 54 | + # pylint: enable=protected-access |
| 55 | + rename_comp.update( |
| 56 | + {col: f"Batterie #{col}" for col in single_comp if col in battery_ids} |
| 57 | + ) |
| 58 | + if "pv" in component_types: |
| 59 | + # pylint: disable=protected-access |
| 60 | + pv_ids = {str(i) for i in mcfg._component_types_cfg["pv"].meter} |
| 61 | + # pylint: enable=protected-access |
| 62 | + rename_comp.update({col: f"PV #{col}" for col in single_comp if col in pv_ids}) |
| 63 | + return df.rename(columns=rename_comp) |
| 64 | + |
| 65 | + |
| 66 | +def create_master_dfs( |
| 67 | + df: pd.DataFrame, component_types: List[str], mcfg: Any |
| 68 | +) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| 69 | + """Create master DataFrame and renamed DataFrame from raw data.""" |
| 70 | + df = df.reset_index() |
| 71 | + df = convert_timezone(df) |
| 72 | + rename_map: Dict[str, str] = { |
| 73 | + "timestamp": "Zeitpunkt", |
| 74 | + "grid": "Netzanschluss", |
| 75 | + "consumption": "Netto Gesamtverbrauch", |
| 76 | + } |
| 77 | + if "battery" in component_types: |
| 78 | + rename_map["battery"] = "Batterie Durchsatz" |
| 79 | + if "pv" in component_types: |
| 80 | + rename_map.update( |
| 81 | + { |
| 82 | + "pv": "PV Durchsatz", |
| 83 | + "pv_prod": "PV Produktion", |
| 84 | + "pv_self": "PV Eigenverbrauch", |
| 85 | + "pv_bat": "PV in Batterie", |
| 86 | + "pv_feedin": "PV Einspeisung", |
| 87 | + "pv_self_consumption_share": "PV Eigenverbrauchsanteil", |
| 88 | + } |
| 89 | + ) |
| 90 | + df_renamed = df.rename(columns=rename_map) |
| 91 | + df_renamed = process_grid_data(df_renamed) |
| 92 | + df_renamed = rename_component_columns(df_renamed, component_types, mcfg) |
| 93 | + single_components = [c for c in df_renamed.columns if "#" in c] |
| 94 | + cols = [ |
| 95 | + "Zeitpunkt", |
| 96 | + "Netzanschluss", |
| 97 | + "Netzbezug", |
| 98 | + "Netto Gesamtverbrauch", |
| 99 | + ] + single_components |
| 100 | + if "battery" in component_types: |
| 101 | + cols.append("Batterie Durchsatz") |
| 102 | + if "pv" in component_types: |
| 103 | + cols += [ |
| 104 | + "PV Durchsatz", |
| 105 | + "PV Produktion", |
| 106 | + "PV Eigenverbrauch", |
| 107 | + "PV Einspeisung", |
| 108 | + "PV in Batterie", |
| 109 | + "PV Eigenverbrauchsanteil", |
| 110 | + ] |
| 111 | + elif "pv" in component_types: |
| 112 | + cols += ["PV Durchsatz", "PV Produktion", "PV Eigenverbrauch", "PV Einspeisung"] |
| 113 | + master_df = df_renamed[cols] |
| 114 | + return master_df, df_renamed |
| 115 | + |
| 116 | + |
| 117 | +def create_overview_df( |
| 118 | + master_df: pd.DataFrame, component_types: List[str] |
| 119 | +) -> pd.DataFrame: |
| 120 | + """Create an overview dataframe with selected columns based on component types.""" |
| 121 | + if "pv" in component_types and "battery" in component_types: |
| 122 | + return master_df[ |
| 123 | + [ |
| 124 | + "Zeitpunkt", |
| 125 | + "Netzbezug", |
| 126 | + "Netto Gesamtverbrauch", |
| 127 | + "PV Produktion", |
| 128 | + "PV Einspeisung", |
| 129 | + "Batterie Durchsatz", |
| 130 | + ] |
| 131 | + ] |
| 132 | + if "battery" in component_types: |
| 133 | + return master_df[ |
| 134 | + ["Zeitpunkt", "Netzbezug", "Netto Gesamtverbrauch", "Batterie Durchsatz"] |
| 135 | + ] |
| 136 | + if "pv" in component_types: |
| 137 | + return master_df[ |
| 138 | + [ |
| 139 | + "Zeitpunkt", |
| 140 | + "Netzbezug", |
| 141 | + "Netto Gesamtverbrauch", |
| 142 | + "PV Produktion", |
| 143 | + "PV Einspeisung", |
| 144 | + ] |
| 145 | + ] |
| 146 | + return master_df[["Zeitpunkt", "Netzbezug", "Netto Gesamtverbrauch"]] |
| 147 | + |
| 148 | + |
| 149 | +def compute_power_df( |
| 150 | + master_df: pd.DataFrame, resolution: Union[str, pd.Timedelta] |
| 151 | +) -> pd.DataFrame: |
| 152 | + """Compute energy mix (PV vs grid) and return power dataframe.""" |
| 153 | + resolution = pd.to_timedelta(resolution) |
| 154 | + hours = resolution.total_seconds() / 3600 |
| 155 | + grid_kwh = round(master_df["Netzbezug"].sum() * hours, 2) |
| 156 | + if "PV Eigenverbrauch" in master_df.columns: |
| 157 | + pv_self_kwh = round(master_df["PV Eigenverbrauch"].sum() * hours, 2) |
| 158 | + total = pv_self_kwh + grid_kwh |
| 159 | + energy = [pv_self_kwh, grid_kwh] |
| 160 | + return pd.DataFrame( |
| 161 | + { |
| 162 | + "Energiebezug": ["PV", "Netz"], |
| 163 | + "Energie [kWh]": energy, |
| 164 | + "Energie %": [round(e / total * 100, 2) for e in energy], |
| 165 | + "Energie [kW]": [ |
| 166 | + round(e * 3600 / resolution.total_seconds(), 2) for e in energy |
| 167 | + ], |
| 168 | + } |
| 169 | + ) |
| 170 | + return pd.DataFrame( |
| 171 | + { |
| 172 | + "Energiebezug": ["Netz"], |
| 173 | + "Energie [kWh]": [grid_kwh], |
| 174 | + "Energie %": [100.0], |
| 175 | + "Energie [kW]": [round(grid_kwh * 3600 / resolution.total_seconds(), 2)], |
| 176 | + } |
| 177 | + ) |
| 178 | + |
| 179 | + |
| 180 | +def compute_pv_statistics( |
| 181 | + master_df: pd.DataFrame, component_types: List[str], resolution: pd.Timedelta |
| 182 | +) -> Dict[str, Union[int, float]]: |
| 183 | + """Compute PV-related statistics.""" |
| 184 | + hours = resolution.total_seconds() / 3600 |
| 185 | + stats: Dict[str, float] = { |
| 186 | + "pv_feed_in_sum": 0.0, |
| 187 | + "pv_production_sum": 0.0, |
| 188 | + "pv_self_consumption_sum": 0.0, |
| 189 | + "pv_bat_sum": 0.0, |
| 190 | + "pv_self_consumption_share": 0.0, |
| 191 | + "pv_total_consumption_share": 0.0, |
| 192 | + } |
| 193 | + if "pv" not in component_types: |
| 194 | + return stats |
| 195 | + pv_prod = master_df.get("PV Produktion", pd.Series(dtype=float)) |
| 196 | + if pv_prod.sum() <= 0: |
| 197 | + return stats |
| 198 | + stats["pv_feed_in_sum"] = round((master_df["PV Einspeisung"] * hours).sum(), 2) |
| 199 | + stats["pv_production_sum"] = round((pv_prod * hours).sum(), 2) |
| 200 | + stats["pv_self_consumption_sum"] = round( |
| 201 | + (master_df["PV Eigenverbrauch"] * hours).sum(), 2 |
| 202 | + ) |
| 203 | + if "battery" in component_types: |
| 204 | + stats["pv_bat_sum"] = round((master_df["PV in Batterie"] * hours).sum(), 2) |
| 205 | + if stats["pv_production_sum"] > 0: |
| 206 | + stats["pv_self_consumption_share"] = round( |
| 207 | + stats["pv_self_consumption_sum"] / stats["pv_production_sum"], 4 |
| 208 | + ) |
| 209 | + total_consumed = stats["pv_self_consumption_sum"] + round( |
| 210 | + master_df["Netzbezug"].sum() * hours, 2 |
| 211 | + ) |
| 212 | + if total_consumed > 0: |
| 213 | + stats["pv_total_consumption_share"] = round( |
| 214 | + stats["pv_self_consumption_sum"] / total_consumed, 4 |
| 215 | + ) |
| 216 | + return stats |
| 217 | + |
| 218 | + |
| 219 | +def compute_peak_usage( |
| 220 | + master_df: pd.DataFrame, resolution: pd.Timedelta |
| 221 | +) -> Dict[str, Union[str, float]]: |
| 222 | + """Get peak grid usage, corresponding date, and net site consumption sum.""" |
| 223 | + peak = round(master_df["Netzbezug"].max(), 2) |
| 224 | + peak_row = master_df.loc[master_df["Netzbezug"].idxmax()] |
| 225 | + timestamp = peak_row["Zeitpunkt"] |
| 226 | + if isinstance(timestamp, datetime) and timestamp.tzinfo is not None: |
| 227 | + peak_date_str = ( |
| 228 | + timestamp.astimezone(ZoneInfo("CET")).date().strftime("%d.%m.%Y") |
| 229 | + ) |
| 230 | + else: |
| 231 | + peak_date_str = timestamp.strftime("%d.%m.%Y") # fallback |
| 232 | + hours = resolution.total_seconds() / 3600 |
| 233 | + return { |
| 234 | + "peak": peak, |
| 235 | + "peak_date": peak_date_str, |
| 236 | + "net_site_consumption_sum": round( |
| 237 | + master_df["Netto Gesamtverbrauch"].sum() * hours, 2 |
| 238 | + ), |
| 239 | + "grid_consumption_sum": round(master_df["Netzbezug"].sum() * hours, 2), |
| 240 | + } |
| 241 | + |
| 242 | + |
| 243 | +def filter_overview_df( |
| 244 | + overview_df: pd.DataFrame, overview_filter: pd.DataFrame |
| 245 | +) -> pd.DataFrame: |
| 246 | + """Filter overview dataframe based on selected columns.""" |
| 247 | + if "Alle" not in overview_filter: |
| 248 | + for column in overview_df.columns: |
| 249 | + display_name = "Gesamtverbrauch" if column == "Netzbezug" else column |
| 250 | + if display_name not in overview_filter and column != "Zeitpunkt": |
| 251 | + overview_df = overview_df.copy() |
| 252 | + overview_df[column] = np.nan |
| 253 | + return overview_df |
| 254 | + |
| 255 | + |
| 256 | +def print_pv_sums( |
| 257 | + master_df: pd.DataFrame, resolution: pd.Timedelta, pv_columns: List[str] |
| 258 | +) -> None: |
| 259 | + """Print formatted sums for each PV column.""" |
| 260 | + for pv in pv_columns: |
| 261 | + pv_sum = round(master_df[pv].sum() * (resolution.seconds / 3600) * -1, 2) |
| 262 | + formatted_sum = ( |
| 263 | + f"{pv_sum:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") |
| 264 | + ) |
| 265 | + print(f"{pv:<7}: {formatted_sum} kWh") |
| 266 | + |
| 267 | + |
| 268 | +def create_pv_analyse_df( |
| 269 | + master_df: pd.DataFrame, |
| 270 | + pv_filter: List[str], |
| 271 | + pvgrid_filter: str, |
| 272 | + pv_grid_filter_options: List[str], |
| 273 | +) -> pd.DataFrame: |
| 274 | + """Create a DataFrame for PV analysis based on selected filters.""" |
| 275 | + if pvgrid_filter == pv_grid_filter_options[1]: |
| 276 | + pv_columns = ( |
| 277 | + [col for col in master_df.columns if "PV #" in col] |
| 278 | + if "Alle" in pv_filter |
| 279 | + else [f"PV {pv}" for pv in pv_filter] |
| 280 | + ) |
| 281 | + df = master_df[["Zeitpunkt"] + pv_columns].copy() |
| 282 | + df = pd.melt( |
| 283 | + df, |
| 284 | + id_vars=["Zeitpunkt"], |
| 285 | + value_vars=pv_columns, |
| 286 | + var_name="PV", |
| 287 | + value_name="PV Einspeisung", |
| 288 | + ) |
| 289 | + df["PV Einspeisung"] *= -1 |
| 290 | + df["PV"] = df["PV"].str[3:] |
| 291 | + |
| 292 | + elif pvgrid_filter == pv_grid_filter_options[2]: |
| 293 | + df = master_df[["Zeitpunkt", "Netzanschluss"]].copy() |
| 294 | + df["PV"] = "#" |
| 295 | + |
| 296 | + else: |
| 297 | + pv_columns = ( |
| 298 | + [col for col in master_df.columns if "PV #" in col] |
| 299 | + if "Alle" in pv_filter |
| 300 | + else [f"PV {pv}" for pv in pv_filter] |
| 301 | + ) |
| 302 | + df = master_df[["Zeitpunkt"] + pv_columns + ["Netzanschluss"]].copy() |
| 303 | + df = pd.melt( |
| 304 | + df, |
| 305 | + id_vars=["Zeitpunkt", "Netzanschluss"], |
| 306 | + value_vars=pv_columns, |
| 307 | + var_name="PV", |
| 308 | + value_name="PV Einspeisung", |
| 309 | + ) |
| 310 | + df["Netzanschluss"] /= len(pv_columns) |
| 311 | + df["PV Einspeisung"] *= -1 |
| 312 | + df["PV"] = df["PV"].str[3:] |
| 313 | + |
| 314 | + return df |
| 315 | + |
| 316 | + |
| 317 | +def create_battery_analyse_df(master_df: pd.DataFrame, bat_filter: str) -> pd.DataFrame: |
| 318 | + """Create a DataFrame for battery analysis based on selected filters.""" |
| 319 | + bat_columns = ( |
| 320 | + [col for col in master_df.columns if "Batterie #" in col] |
| 321 | + if "Alle" in bat_filter |
| 322 | + else [f"Batterie {i}" for i in bat_filter] |
| 323 | + ) |
| 324 | + df = master_df[bat_columns].copy() |
| 325 | + df["Zeitpunkt"] = df.index |
| 326 | + df = pd.melt( |
| 327 | + df, |
| 328 | + id_vars=["Zeitpunkt"], |
| 329 | + value_vars=bat_columns, |
| 330 | + var_name="Batterie", |
| 331 | + value_name="Batterie Durchsatz", |
| 332 | + ) |
| 333 | + df["Batterie"] = df["Batterie"].str[9:] |
| 334 | + |
| 335 | + return df |
0 commit comments