Skip to content

Commit 8b30ea6

Browse files
Add refactored data processing module to reporting
Signed-off-by: Flora <[email protected]>
1 parent 7dc2f2b commit 8b30ea6

File tree

2 files changed

+362
-0
lines changed

2 files changed

+362
-0
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- Added consistent logger setup across all modules for structured logging and improved observability. Example notebooks updated to demonstrate logger usage.
1414
- The signature for passing config files MicrogridConfig.load_config() has been changed to accept a path a list of paths and a directory containing the config files.
1515
- `MicrogridData` class needs to be initialized with a `MicrogridConfig` object instead of a path to config file(s).
16+
- Added a `data_processing` module to `reporting` that provides a set of functions for processing, enriching, and analyzing time-series energy data from microgrid systems.
1617

1718
## Bug Fixes
1819

Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Data processing utilities for microgrid energy reporting.
5+
6+
This module provides a set of functions for processing, enriching, and analyzing
7+
time-series energy data from microgrid systems. It focuses on preparing data for
8+
PV (photovoltaic), battery, and grid energy flows, transforming it into a consistent
9+
structure for visualization, reporting, and analysis.
10+
11+
Features
12+
--------
13+
- Enriches raw energy data with derived columns such as:
14+
- PV production, self-consumption, feed-in, and battery charging.
15+
- Net grid import and PV self-consumption share.
16+
- Handles time zone localization and conversion to Europe/Berlin.
17+
- Dynamically renames columns to more descriptive names, including
18+
mapping component IDs (e.g., "PV #1", "Batterie #2").
19+
- Provides summary energy mix breakdowns (PV vs grid) in kWh, % share, and average kW.
20+
- Prepares tailored DataFrames for PV and battery analysis, supporting flexible
21+
filtering by component.
22+
23+
Main Functions
24+
--------------
25+
- `transform_energy_dataframe(df, component_types, mcfg)`:
26+
Transforms a raw DataFrame with energy metrics into an enriched,
27+
user-friendly format, adding PV, battery, and grid metrics.
28+
29+
- `compute_power_df(main_df, resolution)`:
30+
Computes total energy drawn from PV and grid sources over the given resolution,
31+
returning a summary DataFrame with kWh, percentage, and average kW.
32+
33+
- `print_pv_sums(main_df, resolution)`:
34+
Prints total PV feed-in sums for each individual PV component
35+
in a localized numeric format.
36+
37+
- `create_pv_analysis_df(main_df, pv_filter, pvgrid_filter, pv_grid_filter_options)`:
38+
Generates a DataFrame for PV analysis based on selected PV components
39+
and whether to analyze PV alone, grid alone, or a grid/PV split.
40+
41+
- `create_battery_analysis_df(main_df, bat_filter)`:
42+
Creates a DataFrame for analyzing battery throughput, reshaping
43+
it to long format for multi-battery analysis.
44+
45+
Usage
46+
-----
47+
Typical usage involves:
48+
1. Loading a raw DataFrame with time-indexed energy measurements.
49+
2. Calling `transform_energy_dataframe` to process and enrich it.
50+
3. Using the resulting DataFrames to generate summaries,
51+
for example with `compute_power_df`, `create_pv_analysis_df`, or
52+
`create_battery_analysis_df` for visualization.
53+
"""
54+
55+
from typing import Any, Dict, Iterable, List, Tuple, Union
56+
57+
import pandas as pd
58+
59+
# Constants
60+
TZ_NAME = "Europe/Berlin"
61+
COLUMN_TIMESTAMP = "timestamp"
62+
COLUMN_TIMESTAMP_NAMED = "Zeitpunkt"
63+
COLUMN_GRID = "grid"
64+
COLUMN_GRID_NAMED = "Netzanschluss"
65+
COLUMN_NET_IMPORT = "Netzbezug"
66+
COLUMN_CONSUMPTION = "consumption"
67+
COLUMN_CONSUMPTION_NAMED = "Brutto Gesamtverbrauch"
68+
COLUMN_BATTERY = "battery"
69+
COLUMN_BATTERY_POS = "battery_pos"
70+
COLUMN_BATTERY_NAMED = "Batterie Durchsatz"
71+
COLUMN_PV = "pv"
72+
COLUMN_PV_PROD = "PV Produktion"
73+
COLUMN_PV_NEG = "pv_neg"
74+
COLUMN_PV_EXCESS = "pv_excess"
75+
COLUMN_PV_FEEDIN = "PV Einspeisung"
76+
COLUMN_PV_SELF = "PV Eigenverbrauch"
77+
COLUMN_PV_BAT = "pv_bat"
78+
COLUMN_PV_IN_BAT = "PV in Batterie"
79+
COLUMN_PV_SHARE = "PV Eigenverbrauchsanteil"
80+
COLUMN_PV_THROUGHPUT = "PV Durchsatz"
81+
82+
83+
def transform_energy_dataframe(
84+
df: pd.DataFrame,
85+
component_types: List[str],
86+
mcfg: Any,
87+
) -> Tuple[pd.DataFrame, pd.DataFrame]:
88+
"""Transform and enrich energy dataframe.
89+
90+
This function processes a raw DataFrame containing energy metrics,
91+
adding derived columns for PV production, battery throughput, and grid metrics.
92+
93+
Args:
94+
df: Raw DataFrame with energy metrics, expected to have a datetime index.
95+
component_types: List of component types present in the DataFrame (e.g., ["pv", "battery"]).
96+
mcfg: Microgrid configuration object providing component type IDs and other metadata.
97+
98+
Returns:
99+
A tuple containing:
100+
- `main_df`: A DataFrame with main columns for visualization and reporting.
101+
- `df_renamed`: A fully enriched DataFrame.
102+
"""
103+
# Ensure the DataFrame has a datetime index
104+
df = df.reset_index()
105+
106+
# Enrich with PV-related columns
107+
if "pv" in component_types:
108+
df[COLUMN_PV_PROD] = -df.get(COLUMN_PV_NEG, 0)
109+
df[COLUMN_PV_EXCESS] = (df[COLUMN_PV_PROD] - df[COLUMN_CONSUMPTION]).clip(
110+
lower=0
111+
)
112+
113+
if "battery" in component_types:
114+
df[COLUMN_PV_IN_BAT] = df[[COLUMN_PV_EXCESS, COLUMN_BATTERY_POS]].min(
115+
axis=1
116+
)
117+
else:
118+
df[COLUMN_PV_IN_BAT] = 0
119+
120+
df[COLUMN_PV_FEEDIN] = df[COLUMN_PV_EXCESS] - df[COLUMN_PV_IN_BAT]
121+
df[COLUMN_PV_SELF] = (df[COLUMN_PV_PROD] - df[COLUMN_PV_EXCESS]).clip(lower=0)
122+
df[COLUMN_PV_SHARE] = df[COLUMN_PV_SELF] / df[COLUMN_CONSUMPTION].replace(
123+
0, pd.NA
124+
)
125+
126+
# Convert timestamp to Berlin time
127+
if df[COLUMN_TIMESTAMP].dt.tz is None:
128+
df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_localize("UTC")
129+
df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_convert(TZ_NAME)
130+
131+
# Basic renaming
132+
rename_map: Dict[str, str] = {
133+
COLUMN_TIMESTAMP: COLUMN_TIMESTAMP_NAMED,
134+
COLUMN_GRID: COLUMN_GRID_NAMED,
135+
COLUMN_CONSUMPTION: COLUMN_CONSUMPTION_NAMED,
136+
}
137+
138+
if "battery" in component_types:
139+
rename_map[COLUMN_BATTERY] = COLUMN_BATTERY_NAMED
140+
141+
if "pv" in component_types:
142+
rename_map.update(
143+
{
144+
"pv": COLUMN_PV_THROUGHPUT,
145+
COLUMN_PV_PROD: COLUMN_PV_PROD,
146+
COLUMN_PV_SELF: COLUMN_PV_SELF,
147+
COLUMN_PV_FEEDIN: COLUMN_PV_FEEDIN,
148+
COLUMN_PV_SHARE: COLUMN_PV_SHARE,
149+
}
150+
)
151+
if "battery" in component_types:
152+
rename_map[COLUMN_PV_BAT] = COLUMN_PV_IN_BAT
153+
154+
# Rename individual component IDs
155+
single_comp = [col for col in df.columns if col.isdigit()]
156+
157+
if "battery" in component_types:
158+
battery_ids = {
159+
str(i) for i in mcfg.component_type_ids(component_type="battery")
160+
}
161+
rename_map.update(
162+
{col: f"Batterie #{col}" for col in single_comp if col in battery_ids}
163+
)
164+
165+
if "pv" in component_types:
166+
pv_ids = {str(i) for i in mcfg.component_type_ids(component_type="pv")}
167+
rename_map.update({col: f"PV #{col}" for col in single_comp if col in pv_ids})
168+
169+
df_renamed = df.rename(columns=rename_map)
170+
171+
# Add derived net import column
172+
df_renamed[COLUMN_NET_IMPORT] = df_renamed[COLUMN_GRID_NAMED].clip(lower=0)
173+
174+
# Select main columns for compact display
175+
def _get_main_columns(
176+
columns: Iterable[str], component_types: List[str]
177+
) -> List[str]:
178+
base = {
179+
COLUMN_TIMESTAMP_NAMED,
180+
COLUMN_GRID_NAMED,
181+
COLUMN_NET_IMPORT,
182+
COLUMN_CONSUMPTION_NAMED,
183+
}
184+
185+
if "battery" in component_types:
186+
base.add(COLUMN_BATTERY_NAMED)
187+
188+
if "pv" in component_types:
189+
base.update(
190+
{
191+
COLUMN_PV_THROUGHPUT,
192+
COLUMN_PV_PROD,
193+
COLUMN_PV_SELF,
194+
COLUMN_PV_FEEDIN,
195+
}
196+
)
197+
if "battery" in component_types:
198+
base.update({COLUMN_PV_IN_BAT, COLUMN_PV_SHARE})
199+
200+
# Add individual component columns like "PV #1", "Batterie #3", etc.
201+
base.update({col for col in columns if "#" in col})
202+
203+
return [col for col in columns if col in base]
204+
205+
main_df = df_renamed[_get_main_columns(df_renamed.columns, component_types)]
206+
207+
return main_df, df_renamed
208+
209+
210+
def compute_power_df(
211+
main_df: pd.DataFrame, resolution: Union[str, pd.Timedelta]
212+
) -> pd.DataFrame:
213+
"""Compute energy mix (PV vs grid) and return a summary power DataFrame.
214+
215+
Args:
216+
main_df: DataFrame with energy data, including 'Netzbezug'
217+
and optionally 'PV Eigenverbrauch'.
218+
resolution: Time resolution of each row in the DataFrame (e.g., "15min").
219+
220+
Returns:
221+
A DataFrame summarizing the energy source mix in kWh, %, and average kW.
222+
"""
223+
resolution = pd.to_timedelta(resolution)
224+
hours = resolution.total_seconds() / 3600
225+
226+
# Calculate energy from grid
227+
grid_kwh = round(main_df[COLUMN_NET_IMPORT].sum() * hours, 2)
228+
229+
if COLUMN_PV_SELF in main_df.columns:
230+
# Calculate energy from PV
231+
pv_self_kwh = round(main_df[COLUMN_PV_SELF].sum() * hours, 2)
232+
total_kwh = pv_self_kwh + grid_kwh
233+
234+
energy_kwh = [pv_self_kwh, grid_kwh]
235+
energy_labels = ["PV", "Netz"]
236+
237+
return pd.DataFrame(
238+
{
239+
"Energiebezug": energy_labels,
240+
"Energie [kWh]": energy_kwh,
241+
"Energie %": [round(e / total_kwh * 100, 2) for e in energy_kwh],
242+
"Energie [kW]": [round(e / hours, 2) for e in energy_kwh],
243+
}
244+
)
245+
246+
# Only grid consumption available
247+
return pd.DataFrame(
248+
{
249+
"Energiebezug": ["Netz"],
250+
"Energie [kWh]": [grid_kwh],
251+
"Energie %": [100.0],
252+
"Energie [kW]": [round(grid_kwh / hours, 2)],
253+
}
254+
)
255+
256+
257+
def print_pv_sums(main_df: pd.DataFrame, resolution: pd.Timedelta) -> None:
258+
"""Print formatted sums for each PV column.
259+
260+
Args:
261+
main_df: DataFrame containing PV columns with energy data.
262+
resolution: Time resolution of each row in the DataFrame (e.g., "15min").
263+
"""
264+
pv_columns = [col for col in main_df.columns.tolist() if "PV #" in col]
265+
266+
for pv in pv_columns:
267+
pv_sum = round(main_df[pv].sum() * resolution * -1, 2)
268+
formatted_sum = (
269+
f"{pv_sum:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
270+
)
271+
print(f"{pv:<7}: {formatted_sum} kWh")
272+
273+
274+
def create_pv_analysis_df(
275+
main_df: pd.DataFrame,
276+
pv_filter: List[str],
277+
pvgrid_filter: str,
278+
pv_grid_filter_options: List[str],
279+
) -> pd.DataFrame:
280+
"""Create a DataFrame for PV analysis based on selected filters.
281+
282+
Args:
283+
main_df: DataFrame containing PV and grid data.
284+
pv_filter: List of PV components to include (e.g., ["1", "2"] or ["Alle"]).
285+
pvgrid_filter: Filter option for PV and grid analysis (e.g., "PV", "Grid", "PV + Grid").
286+
pv_grid_filter_options: List of available filter options for PV and grid.
287+
Returns:
288+
A DataFrame with PV feed-in data, reshaped for analysis.
289+
"""
290+
# Case 1: Only PV
291+
if pvgrid_filter == pv_grid_filter_options[1]:
292+
pv_columns = (
293+
[col for col in main_df.columns if "PV #" in col]
294+
if "Alle" in pv_filter
295+
else [f"PV {pv}" for pv in pv_filter]
296+
)
297+
df = main_df[[COLUMN_TIMESTAMP_NAMED] + pv_columns].copy()
298+
df = df.melt(
299+
id_vars=[COLUMN_TIMESTAMP_NAMED],
300+
value_vars=pv_columns,
301+
var_name="PV",
302+
value_name=COLUMN_PV_FEEDIN,
303+
)
304+
df[COLUMN_PV_FEEDIN] *= -1
305+
df["PV"] = df["PV"].str[3:]
306+
307+
# Case 2: Only Grid
308+
elif pvgrid_filter == pv_grid_filter_options[2]:
309+
df = main_df[[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED]].copy()
310+
df["PV"] = "#"
311+
312+
# Case 3: Grid + PV split
313+
else:
314+
pv_columns = (
315+
[col for col in main_df.columns if "PV #" in col]
316+
if "Alle" in pv_filter
317+
else [f"PV {pv}" for pv in pv_filter]
318+
)
319+
df = main_df[[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED] + pv_columns].copy()
320+
df = df.melt(
321+
id_vars=[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED],
322+
value_vars=pv_columns,
323+
var_name="PV",
324+
value_name=COLUMN_PV_FEEDIN,
325+
)
326+
df[COLUMN_GRID_NAMED] /= len(pv_columns)
327+
df[COLUMN_PV_FEEDIN] *= -1
328+
df["PV"] = df["PV"].str[3:]
329+
330+
return df
331+
332+
333+
def create_battery_analysis_df(
334+
main_df: pd.DataFrame, bat_filter: List[str]
335+
) -> pd.DataFrame:
336+
"""Create a DataFrame for battery analysis based on selected filters.
337+
338+
Args:
339+
main_df: DataFrame containing battery data.
340+
bat_filter: List of battery components to include (e.g., ["1", "2"] or ["Alle"]).
341+
Returns:
342+
A DataFrame with battery throughput data, reshaped for analysis.
343+
"""
344+
bat_columns = (
345+
[col for col in main_df.columns if "Batterie #" in col]
346+
if "Alle" in bat_filter
347+
else [f"Batterie {i}" for i in bat_filter]
348+
)
349+
350+
df = main_df[bat_columns].copy()
351+
df[COLUMN_TIMESTAMP_NAMED] = main_df.index
352+
353+
df = df.melt(
354+
id_vars=[COLUMN_TIMESTAMP_NAMED],
355+
value_vars=bat_columns,
356+
var_name="Batterie",
357+
value_name=COLUMN_BATTERY_NAMED,
358+
)
359+
df["Batterie"] = df["Batterie"].str[9:]
360+
361+
return df

0 commit comments

Comments
 (0)