|
| 1 | +# License: MIT |
| 2 | +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH |
| 3 | + |
| 4 | +"""Data processing functions for the reporting module.""" |
| 5 | + |
| 6 | +from datetime import datetime |
| 7 | +from typing import Any, Dict, List, Tuple, Union |
| 8 | +from zoneinfo import ZoneInfo |
| 9 | + |
| 10 | +import numpy as np |
| 11 | +import pandas as pd |
| 12 | + |
| 13 | + |
| 14 | +def convert_timezone(df: pd.DataFrame) -> pd.DataFrame: |
| 15 | + """Convert 'timestamp' column to Europe/Berlin timezone.""" |
| 16 | + assert "timestamp" in df.columns, df |
| 17 | + if df["timestamp"].dt.tz is None: |
| 18 | + df["timestamp"] = df["timestamp"].dt.tz_localize("UTC") |
| 19 | + df["timestamp"] = df["timestamp"].dt.tz_convert("Europe/Berlin") |
| 20 | + return df |
| 21 | + |
| 22 | + |
| 23 | +def process_grid_data(df: pd.DataFrame) -> pd.DataFrame: |
| 24 | + """Add 'Netzbezug' column for positive grid consumption.""" |
| 25 | + df["Netzbezug"] = df["Netzanschluss"].clip(lower=0) |
| 26 | + return df |
| 27 | + |
| 28 | + |
| 29 | +def compute_pv_metrics(df: pd.DataFrame, component_types: List[str]) -> pd.DataFrame: |
| 30 | + """Compute PV-related metrics and add them to the DataFrame.""" |
| 31 | + df["pv_prod"] = -df["pv_neg"] |
| 32 | + df["pv_excess"] = (df["pv_prod"] - df["consumption"]).clip(lower=0) |
| 33 | + if "battery" in component_types: |
| 34 | + df["pv_bat"] = df[["pv_excess", "battery_pos"]].min(axis=1) |
| 35 | + else: |
| 36 | + df["pv_bat"] = 0 |
| 37 | + df["pv_feedin"] = df["pv_excess"] - df["pv_bat"] |
| 38 | + df["pv_self"] = (df["pv_prod"] - df["pv_excess"]).clip(lower=0) |
| 39 | + df["pv_self_consumption_share"] = df["pv_self"] / df["consumption"].replace( |
| 40 | + 0, pd.NA |
| 41 | + ) |
| 42 | + return df |
| 43 | + |
| 44 | + |
| 45 | +def rename_component_columns( |
| 46 | + df: pd.DataFrame, component_types: List[str], mcfg: Any |
| 47 | +) -> pd.DataFrame: |
| 48 | + """Rename component columns based on configuration.""" |
| 49 | + single_comp = [col for col in df.columns if col.isdigit()] |
| 50 | + rename_comp: Dict[str, str] = {} |
| 51 | + if "battery" in component_types: |
| 52 | + battery_ids = { |
| 53 | + str(i) |
| 54 | + for i in mcfg.component_type_ids( |
| 55 | + component_type="battery", component_category="meter" |
| 56 | + ) |
| 57 | + } |
| 58 | + rename_comp.update( |
| 59 | + {col: f"Batterie #{col}" for col in single_comp if col in battery_ids} |
| 60 | + ) |
| 61 | + if "pv" in component_types: |
| 62 | + pv_ids = { |
| 63 | + str(i) |
| 64 | + for i in mcfg.component_type_ids( |
| 65 | + component_type="pv", component_category="meter" |
| 66 | + ) |
| 67 | + } |
| 68 | + rename_comp.update({col: f"PV #{col}" for col in single_comp if col in pv_ids}) |
| 69 | + return df.rename(columns=rename_comp) |
| 70 | + |
| 71 | + |
| 72 | +def create_master_dfs( |
| 73 | + df: pd.DataFrame, component_types: List[str], mcfg: Any |
| 74 | +) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| 75 | + """Create master DataFrame and renamed DataFrame from raw data.""" |
| 76 | + df = df.reset_index() |
| 77 | + df = convert_timezone(df) |
| 78 | + rename_map: Dict[str, str] = { |
| 79 | + "timestamp": "Zeitpunkt", |
| 80 | + "grid": "Netzanschluss", |
| 81 | + "consumption": "Netto Gesamtverbrauch", |
| 82 | + } |
| 83 | + if "battery" in component_types: |
| 84 | + rename_map["battery"] = "Batterie Durchsatz" |
| 85 | + if "pv" in component_types: |
| 86 | + rename_map.update( |
| 87 | + { |
| 88 | + "pv": "PV Durchsatz", |
| 89 | + "pv_prod": "PV Produktion", |
| 90 | + "pv_self": "PV Eigenverbrauch", |
| 91 | + "pv_bat": "PV in Batterie", |
| 92 | + "pv_feedin": "PV Einspeisung", |
| 93 | + "pv_self_consumption_share": "PV Eigenverbrauchsanteil", |
| 94 | + } |
| 95 | + ) |
| 96 | + df_renamed = df.rename(columns=rename_map) |
| 97 | + df_renamed = process_grid_data(df_renamed) |
| 98 | + df_renamed = rename_component_columns(df_renamed, component_types, mcfg) |
| 99 | + single_components = [c for c in df_renamed.columns if "#" in c] |
| 100 | + cols = [ |
| 101 | + "Zeitpunkt", |
| 102 | + "Netzanschluss", |
| 103 | + "Netzbezug", |
| 104 | + "Netto Gesamtverbrauch", |
| 105 | + ] + single_components |
| 106 | + if "battery" in component_types: |
| 107 | + cols.append("Batterie Durchsatz") |
| 108 | + if "pv" in component_types: |
| 109 | + cols += [ |
| 110 | + "PV Durchsatz", |
| 111 | + "PV Produktion", |
| 112 | + "PV Eigenverbrauch", |
| 113 | + "PV Einspeisung", |
| 114 | + "PV in Batterie", |
| 115 | + "PV Eigenverbrauchsanteil", |
| 116 | + ] |
| 117 | + elif "pv" in component_types: |
| 118 | + cols += ["PV Durchsatz", "PV Produktion", "PV Eigenverbrauch", "PV Einspeisung"] |
| 119 | + master_df = df_renamed[cols] |
| 120 | + return master_df, df_renamed |
| 121 | + |
| 122 | + |
| 123 | +def create_overview_df( |
| 124 | + master_df: pd.DataFrame, component_types: List[str] |
| 125 | +) -> pd.DataFrame: |
| 126 | + """Create an overview dataframe with selected columns based on component types.""" |
| 127 | + if "pv" in component_types and "battery" in component_types: |
| 128 | + return master_df[ |
| 129 | + [ |
| 130 | + "Zeitpunkt", |
| 131 | + "Netzbezug", |
| 132 | + "Netto Gesamtverbrauch", |
| 133 | + "PV Produktion", |
| 134 | + "PV Einspeisung", |
| 135 | + "Batterie Durchsatz", |
| 136 | + ] |
| 137 | + ] |
| 138 | + if "battery" in component_types: |
| 139 | + return master_df[ |
| 140 | + ["Zeitpunkt", "Netzbezug", "Netto Gesamtverbrauch", "Batterie Durchsatz"] |
| 141 | + ] |
| 142 | + if "pv" in component_types: |
| 143 | + return master_df[ |
| 144 | + [ |
| 145 | + "Zeitpunkt", |
| 146 | + "Netzbezug", |
| 147 | + "Netto Gesamtverbrauch", |
| 148 | + "PV Produktion", |
| 149 | + "PV Einspeisung", |
| 150 | + ] |
| 151 | + ] |
| 152 | + return master_df[["Zeitpunkt", "Netzbezug", "Netto Gesamtverbrauch"]] |
| 153 | + |
| 154 | + |
| 155 | +def compute_power_df( |
| 156 | + master_df: pd.DataFrame, resolution: Union[str, pd.Timedelta] |
| 157 | +) -> pd.DataFrame: |
| 158 | + """Compute energy mix (PV vs grid) and return power dataframe.""" |
| 159 | + resolution = pd.to_timedelta(resolution) |
| 160 | + hours = resolution.total_seconds() / 3600 |
| 161 | + grid_kwh = round(master_df["Netzbezug"].sum() * hours, 2) |
| 162 | + if "PV Eigenverbrauch" in master_df.columns: |
| 163 | + pv_self_kwh = round(master_df["PV Eigenverbrauch"].sum() * hours, 2) |
| 164 | + total = pv_self_kwh + grid_kwh |
| 165 | + energy = [pv_self_kwh, grid_kwh] |
| 166 | + return pd.DataFrame( |
| 167 | + { |
| 168 | + "Energiebezug": ["PV", "Netz"], |
| 169 | + "Energie [kWh]": energy, |
| 170 | + "Energie %": [round(e / total * 100, 2) for e in energy], |
| 171 | + "Energie [kW]": [ |
| 172 | + round(e * 3600 / resolution.total_seconds(), 2) for e in energy |
| 173 | + ], |
| 174 | + } |
| 175 | + ) |
| 176 | + return pd.DataFrame( |
| 177 | + { |
| 178 | + "Energiebezug": ["Netz"], |
| 179 | + "Energie [kWh]": [grid_kwh], |
| 180 | + "Energie %": [100.0], |
| 181 | + "Energie [kW]": [round(grid_kwh * 3600 / resolution.total_seconds(), 2)], |
| 182 | + } |
| 183 | + ) |
| 184 | + |
| 185 | + |
| 186 | +def compute_pv_statistics( |
| 187 | + master_df: pd.DataFrame, component_types: List[str], resolution: pd.Timedelta |
| 188 | +) -> Dict[str, Union[int, float]]: |
| 189 | + """Compute PV-related statistics.""" |
| 190 | + hours = resolution.total_seconds() / 3600 |
| 191 | + stats: Dict[str, float] = { |
| 192 | + "pv_feed_in_sum": 0.0, |
| 193 | + "pv_production_sum": 0.0, |
| 194 | + "pv_self_consumption_sum": 0.0, |
| 195 | + "pv_bat_sum": 0.0, |
| 196 | + "pv_self_consumption_share": 0.0, |
| 197 | + "pv_total_consumption_share": 0.0, |
| 198 | + } |
| 199 | + if "pv" not in component_types: |
| 200 | + return stats |
| 201 | + pv_prod = master_df.get("PV Produktion", pd.Series(dtype=float)) |
| 202 | + if pv_prod.sum() <= 0: |
| 203 | + return stats |
| 204 | + stats["pv_feed_in_sum"] = round((master_df["PV Einspeisung"] * hours).sum(), 2) |
| 205 | + stats["pv_production_sum"] = round((pv_prod * hours).sum(), 2) |
| 206 | + stats["pv_self_consumption_sum"] = round( |
| 207 | + (master_df["PV Eigenverbrauch"] * hours).sum(), 2 |
| 208 | + ) |
| 209 | + if "battery" in component_types: |
| 210 | + stats["pv_bat_sum"] = round((master_df["PV in Batterie"] * hours).sum(), 2) |
| 211 | + if stats["pv_production_sum"] > 0: |
| 212 | + stats["pv_self_consumption_share"] = round( |
| 213 | + stats["pv_self_consumption_sum"] / stats["pv_production_sum"], 4 |
| 214 | + ) |
| 215 | + total_consumed = stats["pv_self_consumption_sum"] + round( |
| 216 | + master_df["Netzbezug"].sum() * hours, 2 |
| 217 | + ) |
| 218 | + if total_consumed > 0: |
| 219 | + stats["pv_total_consumption_share"] = round( |
| 220 | + stats["pv_self_consumption_sum"] / total_consumed, 4 |
| 221 | + ) |
| 222 | + return stats |
| 223 | + |
| 224 | + |
| 225 | +def compute_peak_usage( |
| 226 | + master_df: pd.DataFrame, resolution: pd.Timedelta |
| 227 | +) -> Dict[str, Union[str, float]]: |
| 228 | + """Get peak grid usage, corresponding date, and net site consumption sum.""" |
| 229 | + peak = round(master_df["Netzbezug"].max(), 2) |
| 230 | + peak_row = master_df.loc[master_df["Netzbezug"].idxmax()] |
| 231 | + timestamp = peak_row["Zeitpunkt"] |
| 232 | + if isinstance(timestamp, datetime) and timestamp.tzinfo is not None: |
| 233 | + peak_date_str = ( |
| 234 | + timestamp.astimezone(ZoneInfo("CET")).date().strftime("%d.%m.%Y") |
| 235 | + ) |
| 236 | + else: |
| 237 | + peak_date_str = timestamp.strftime("%d.%m.%Y") # fallback |
| 238 | + hours = resolution.total_seconds() / 3600 |
| 239 | + return { |
| 240 | + "peak": peak, |
| 241 | + "peak_date": peak_date_str, |
| 242 | + "net_site_consumption_sum": round( |
| 243 | + master_df["Netto Gesamtverbrauch"].sum() * hours, 2 |
| 244 | + ), |
| 245 | + "grid_consumption_sum": round(master_df["Netzbezug"].sum() * hours, 2), |
| 246 | + } |
| 247 | + |
| 248 | + |
| 249 | +def filter_overview_df( |
| 250 | + overview_df: pd.DataFrame, overview_filter: pd.DataFrame |
| 251 | +) -> pd.DataFrame: |
| 252 | + """Filter overview dataframe based on selected columns.""" |
| 253 | + if "Alle" not in overview_filter: |
| 254 | + filtered_df = overview_df.copy() |
| 255 | + for column in overview_df.columns: |
| 256 | + display_name = "Gesamtverbrauch" if column == "Netzbezug" else column |
| 257 | + if display_name not in overview_filter and column != "Zeitpunkt": |
| 258 | + filtered_df[column] = np.nan |
| 259 | + return filtered_df |
| 260 | + |
| 261 | + |
| 262 | +def print_pv_sums( |
| 263 | + master_df: pd.DataFrame, resolution: pd.Timedelta, pv_columns: List[str] |
| 264 | +) -> None: |
| 265 | + """Print formatted sums for each PV column.""" |
| 266 | + for pv in pv_columns: |
| 267 | + pv_sum = round( |
| 268 | + master_df[pv].sum() * (resolution.total_seconds() / 3600) * -1, 2 |
| 269 | + ) |
| 270 | + formatted_sum = ( |
| 271 | + f"{pv_sum:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") |
| 272 | + ) |
| 273 | + print(f"{pv:<7}: {formatted_sum} kWh") |
| 274 | + |
| 275 | + |
| 276 | +def create_pv_analyse_df( |
| 277 | + master_df: pd.DataFrame, |
| 278 | + pv_filter: List[str], |
| 279 | + pvgrid_filter: str, |
| 280 | + pv_grid_filter_options: List[str], |
| 281 | +) -> pd.DataFrame: |
| 282 | + """Create a DataFrame for PV analysis based on selected filters.""" |
| 283 | + if pvgrid_filter == pv_grid_filter_options[1]: |
| 284 | + pv_columns = ( |
| 285 | + [col for col in master_df.columns if "PV #" in col] |
| 286 | + if "Alle" in pv_filter |
| 287 | + else [f"PV {pv}" for pv in pv_filter] |
| 288 | + ) |
| 289 | + df = master_df[["Zeitpunkt"] + pv_columns].copy() |
| 290 | + df = pd.melt( |
| 291 | + df, |
| 292 | + id_vars=["Zeitpunkt"], |
| 293 | + value_vars=pv_columns, |
| 294 | + var_name="PV", |
| 295 | + value_name="PV Einspeisung", |
| 296 | + ) |
| 297 | + df["PV Einspeisung"] *= -1 |
| 298 | + df["PV"] = df["PV"].str[3:] |
| 299 | + |
| 300 | + elif pvgrid_filter == pv_grid_filter_options[2]: |
| 301 | + df = master_df[["Zeitpunkt", "Netzanschluss"]].copy() |
| 302 | + df["PV"] = "#" |
| 303 | + |
| 304 | + else: |
| 305 | + pv_columns = ( |
| 306 | + [col for col in master_df.columns if "PV #" in col] |
| 307 | + if "Alle" in pv_filter |
| 308 | + else [f"PV {pv}" for pv in pv_filter] |
| 309 | + ) |
| 310 | + df = master_df[["Zeitpunkt"] + pv_columns + ["Netzanschluss"]].copy() |
| 311 | + df = pd.melt( |
| 312 | + df, |
| 313 | + id_vars=["Zeitpunkt", "Netzanschluss"], |
| 314 | + value_vars=pv_columns, |
| 315 | + var_name="PV", |
| 316 | + value_name="PV Einspeisung", |
| 317 | + ) |
| 318 | + df["Netzanschluss"] /= len(pv_columns) |
| 319 | + df["PV Einspeisung"] *= -1 |
| 320 | + df["PV"] = df["PV"].str[3:] |
| 321 | + |
| 322 | + return df |
| 323 | + |
| 324 | + |
| 325 | +def create_battery_analyse_df(master_df: pd.DataFrame, bat_filter: str) -> pd.DataFrame: |
| 326 | + """Create a DataFrame for battery analysis based on selected filters.""" |
| 327 | + bat_columns = ( |
| 328 | + [col for col in master_df.columns if "Batterie #" in col] |
| 329 | + if "Alle" in bat_filter |
| 330 | + else [f"Batterie {i}" for i in bat_filter] |
| 331 | + ) |
| 332 | + df = master_df[bat_columns].copy() |
| 333 | + df["Zeitpunkt"] = df.index |
| 334 | + df = pd.melt( |
| 335 | + df, |
| 336 | + id_vars=["Zeitpunkt"], |
| 337 | + value_vars=bat_columns, |
| 338 | + var_name="Batterie", |
| 339 | + value_name="Batterie Durchsatz", |
| 340 | + ) |
| 341 | + df["Batterie"] = df["Batterie"].str[9:] |
| 342 | + |
| 343 | + return df |
0 commit comments