Skip to content

Commit 8687797

Browse files
feat: add column schema mapping
Signed-off-by: Mohammad Tayyab <[email protected]>
1 parent afcddbf commit 8687797

File tree

4 files changed

+349
-0
lines changed

4 files changed

+349
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ dependencies = [
3939
"kaleido >= 0.2.1, < 1.2.0",
4040
"frequenz-client-reporting >= 0.19.0, < 0.20.0",
4141
"frequenz-client-weather >= 0.2.3, < 0.3.0",
42+
"types-pyyaml>=6.0.12.20250915",
4243
]
4344
dynamic = ["version"]
4445

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
version: 1
2+
3+
time:
4+
tz_name: "Europe/Berlin"
5+
assume_tz: "UTC"
6+
7+
columns:
8+
timestamp:
9+
raw: "timestamp"
10+
display:
11+
en: "Timestamp"
12+
de: "Zeitpunkt"
13+
description:
14+
en: "The recorded date and time of the measurement for the components."
15+
de: "Das erfasste Datum und die Uhrzeit der Messung der Komponenten."
16+
17+
grid_load:
18+
raw: "grid"
19+
display:
20+
en: "Grid Import"
21+
de: "Netzbezug"
22+
description:
23+
en: "Electricity imported from the grid."
24+
de: "Aus dem öffentlichen Netz bezogener Strom."
25+
26+
mid_consumption:
27+
raw: "consumption" # Total consumption from all the sources (grid, pv, chp, battery)
28+
display:
29+
en: "MID Consumption"
30+
de: "MID Gesamtverbrauch"
31+
description:
32+
en: "Total electricity consumption from all available sources (grid, PV, CHP, battery)."
33+
de: "Gesamter Stromverbrauch aus allen verfügbaren Quellen (Netz, PV, BHKW, Batterie)."
34+
implementation: "reporting_metrics.py::determine_consumption"
35+
36+
battery_throughput:
37+
raw: "battery"
38+
display:
39+
en: "Battery Throughput"
40+
de: "Batterie Durchsatz"
41+
description:
42+
en: "Total amount of energy charged into and discharged from the battery."
43+
de: "Gesamtmenge der Energie, die in die Batterie geladen und aus ihr entladen wurde."
44+
45+
battery_pos:
46+
raw: "battery_pos"
47+
display:
48+
en: "Battery Throughput (positive)"
49+
de: "Batterie Durchsatz (positiv)"
50+
description:
51+
en: "Energy output from the battery (discharging)."
52+
de: "Energieabgabe der Batterie (Entladung)."
53+
54+
pv_prod:
55+
raw: "pv_prod"
56+
display:
57+
en: "PV Production"
58+
de: "PV Produktion"
59+
description:
60+
en: "Total electricity generated by the photovoltaic system."
61+
de: "Gesamte durch die Photovoltaikanlage erzeugte Strommenge."
62+
implementation: "reporting_metrics.py::asset_prod"
63+
64+
pv_neg:
65+
raw: "pv_neg"
66+
display:
67+
en: "PV (negative)"
68+
de: "PV (negativ)"
69+
description:
70+
en: "Negative PV values representing reverse flow or measurement corrections."
71+
de: "Negative PV-Werte, die Rückflüsse oder Messkorrekturen darstellen."
72+
73+
pv_excess:
74+
raw: "pv_excess"
75+
display:
76+
en: "PV Excess"
77+
de: "PV Überschuss"
78+
description:
79+
en: "Amount of PV energy produced that exceeds on-site consumption."
80+
de: "PV-Energie, die die Eigenverbrauchsmenge übersteigt."
81+
implementation: "reporting_metrics.py::prod_excess"
82+
83+
pv_in_bat:
84+
raw: "pv_bat"
85+
display:
86+
en: "PV in Battery"
87+
de: "PV in Batterie"
88+
description:
89+
en: "PV energy used to charge the battery."
90+
de: "PV-Energie, die zum Laden der Batterie verwendet wird."
91+
implementation: "reporting_metrics.py::prod_excess_in_bat"
92+
93+
pv_feedin:
94+
raw: "pv_feedin"
95+
display:
96+
en: "PV Feed-in"
97+
de: "PV Einspeisung"
98+
description:
99+
en: "PV energy exported to the public grid."
100+
de: "PV-Energie, die in das öffentliche Netz eingespeist wird."
101+
implementation: "reporting_metrics.py::grid_feed_in"
102+
103+
pv_self:
104+
raw: "pv_self"
105+
display:
106+
en: "PV Self-Consumption"
107+
de: "PV Eigenverbrauch"
108+
description:
109+
en: "Amount of PV energy consumed directly on-site."
110+
de: "Menge der PV-Energie, die direkt vor Ort verbraucht wird."
111+
implementation: "reporting_metrics.py::prod_self_consumption"
112+
113+
pv_share:
114+
raw: "pv_share"
115+
display:
116+
en: "PV Self-Consumption Share"
117+
de: "PV Eigenverbrauchsanteil"
118+
description:
119+
en: "Share of total consumption covered by PV production."
120+
de: "Anteil des Gesamtverbrauchs, der durch PV-Produktion gedeckt wird."
121+
implementation: "reporting_metrics.py::prod_self_share"
122+
123+
chp_load:
124+
raw: "chp"
125+
display:
126+
en: "CHP Output"
127+
de: "BHKW-Erzeugung"
128+
description:
129+
en: "Electrical output from the combined heat and power (CHP) unit."
130+
de: "Elektrische Erzeugung des Blockheizkraftwerks (BHKW)."
131+
132+
grid_peak:
133+
raw: "grid_peak"
134+
display:
135+
en: "Grid Peak"
136+
de: "Netzspitze"
137+
description:
138+
en: "The highest grid import power recorded within the reporting period."
139+
de: "Die höchste im Berichtszeitraum gemessene Netzbezugsleistung."
140+
141+
grid_peak_date:
142+
raw: "grid_peak_date"
143+
display:
144+
en: "Grid Peak Date"
145+
de: "Datum der Netzspitze"
146+
description:
147+
en: "The date and time when the maximum grid import occurred."
148+
de: "Das Datum und die Uhrzeit, zu denen der maximale Netzbezug aufgetreten ist."
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Utilities for energy reporting."""
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Column mapping utilities for energy reporting.
5+
6+
Keeps reporting notebooks insulated from upstream schema churn
7+
by loading the canonical schema from
8+
`src/frequenz/lib/notebooks/reporting/schema_mapping.yaml`. The class translates
9+
raw headers to canonical names, resolves locale-aware display labels.
10+
Raw names are the column labels emitted directly by the reporting API, canonical
11+
names are the identifiers used throughout this codebase, and the display names
12+
(`en`/`de`) expose English and German labels rendered in the notebooks. The
13+
mapper is designed to work hand-in-hand with the schema_mapping.yaml file so that
14+
every canonical column carries consistent units and descriptions.
15+
16+
Examples:
17+
>>> from frequenz.lib.notebooks.reporting.utils import ColumnMapper
18+
>>> mapper = ColumnMapper.from_yaml(
19+
... "src/frequenz/lib/notebooks/reporting/schema_mapping.yaml",
20+
... locale="de",
21+
... )
22+
>>> display_df = mapper.to_display(mapper.to_canonical(raw_df))
23+
"""
24+
25+
from __future__ import annotations
26+
27+
from dataclasses import dataclass, field, replace
28+
from typing import Iterable, Mapping
29+
30+
import pandas as pd
31+
import yaml
32+
33+
34+
@dataclass(frozen=True)
35+
class ColumnMapper: # pylint: disable=too-many-instance-attributes
36+
"""Column schema with locale-aware display labels."""
37+
38+
version: int = field(
39+
metadata={"doc": "Schema version extracted from the YAML definition."}
40+
)
41+
tz_name: str = field(
42+
metadata={
43+
"doc": "Timezone name to apply when localizing timestamps in reporting notebooks.",
44+
}
45+
)
46+
assume_tz: str = field(
47+
metadata={
48+
"doc": "Timezone assumed for raw timestamps before conversion to tz_name.",
49+
}
50+
)
51+
canonical_to_raw: Mapping[str, str] = field(
52+
metadata={
53+
"doc": (
54+
"Mapping from canonical column identifiers to raw headers "
55+
"from the reporting API."
56+
),
57+
}
58+
)
59+
raw_to_canonical: Mapping[str, str] = field(
60+
metadata={
61+
"doc": "Reverse lookup mapping raw API headers back to canonical identifiers.",
62+
}
63+
)
64+
_labels_all: Mapping[str, Mapping[str, str]] = field(
65+
metadata={
66+
"doc": "Locale-specific display labels keyed by canonical column name.",
67+
}
68+
)
69+
locale: str = field(
70+
default="de",
71+
metadata={"doc": "Preferred locale for display labels when renaming columns."},
72+
)
73+
fallback_locale: str = field(
74+
default="en",
75+
metadata={
76+
"doc": "Fallback locale to use when the preferred locale is unavailable in the schema.",
77+
},
78+
)
79+
80+
# ---------- Construction ----------
81+
@classmethod
82+
def from_yaml( # pylint: disable=too-many-locals
83+
cls,
84+
path: str,
85+
*,
86+
locale: str = "de",
87+
fallback_locale: str = "en",
88+
required: Iterable[str] | None = None,
89+
) -> "ColumnMapper":
90+
"""
91+
Create a ColumnMapper from a YAML configuration file.
92+
93+
Args:
94+
path: Path to the YAML file containing the column mapping definition.
95+
locale: Preferred display locale (default: "de").
96+
fallback_locale: Fallback locale if the preferred one is missing
97+
(default: "en").
98+
required: Optional list of required canonical column names. Raises
99+
ValueError if any are missing.
100+
101+
Returns:
102+
A ColumnMapper instance built from the YAML configuration.
103+
104+
Raises:
105+
ValueError: If the YAML is missing required fields or contains invalid mappings.
106+
"""
107+
with open(path, "r", encoding="utf-8") as f:
108+
cfg = yaml.safe_load(f)
109+
110+
cols = cfg.get("columns") or {}
111+
if not isinstance(cols, dict) or not cols:
112+
raise ValueError("YAML 'columns' is missing or empty.")
113+
114+
c2r: dict[str, str] = {}
115+
labels_all: dict[str, dict[str, str]] = {}
116+
117+
for canonical, spec in cols.items():
118+
if not isinstance(spec, dict):
119+
raise ValueError(f"Invalid spec for '{canonical}' (must be mapping).")
120+
raw = spec.get("raw")
121+
if not raw:
122+
raise ValueError(f"'raw' missing for canonical column '{canonical}'.")
123+
c2r[canonical] = raw
124+
125+
disp = spec.get("display") or {}
126+
if not isinstance(disp, dict):
127+
raise ValueError(f"'display' for '{canonical}' must be a mapping.")
128+
labels_all[canonical] = {str(k): str(v) for k, v in disp.items()}
129+
130+
# Build reverse map and check collisions
131+
r2c: dict[str, str] = {}
132+
for c, r in c2r.items():
133+
if r in r2c and r2c[r] != c:
134+
raise ValueError(f"Raw column '{r}' maps to both '{r2c[r]}' and '{c}'.")
135+
r2c[r] = c
136+
137+
if required:
138+
missing = set(required) - set(c2r.keys())
139+
if missing:
140+
raise ValueError(
141+
f"Missing required canonical columns: {sorted(missing)}"
142+
)
143+
144+
time_cfg = cfg.get("time") or {}
145+
return cls(
146+
version=int(cfg.get("version", 0)),
147+
tz_name=str(time_cfg.get("tz_name", "UTC")),
148+
assume_tz=str(time_cfg.get("assume_tz", "UTC")),
149+
canonical_to_raw=c2r,
150+
raw_to_canonical=r2c,
151+
_labels_all=labels_all,
152+
locale=locale,
153+
fallback_locale=fallback_locale,
154+
)
155+
156+
# ---------- Properties ----------
157+
@property
158+
def canonical_to_display(self) -> Mapping[str, str]:
159+
"""Resolved display labels for the current locale (with fallback)."""
160+
return {
161+
c: (
162+
self._labels_all.get(c, {}).get(self.locale)
163+
or self._labels_all.get(c, {}).get(self.fallback_locale)
164+
or c
165+
)
166+
for c in self.canonical_to_raw
167+
}
168+
169+
@property
170+
def canonicals(self) -> Iterable[str]:
171+
"""Return the canonical column names defined in this mapper."""
172+
return self.canonical_to_raw.keys()
173+
174+
# ---------- Operations on DataFrames ----------
175+
def to_canonical(self, df: pd.DataFrame) -> pd.DataFrame:
176+
"""Rename incoming raw headers to canonical headers (normalize once)."""
177+
return df.rename(columns=self.raw_to_canonical)
178+
179+
def to_raw(self, df: pd.DataFrame) -> pd.DataFrame:
180+
"""Rename canonical headers back to raw."""
181+
return df.rename(columns=self.canonical_to_raw)
182+
183+
def to_display(self, df: pd.DataFrame) -> pd.DataFrame:
184+
"""Rename canonical headers to localized display labels."""
185+
return df.rename(columns=self.canonical_to_display)
186+
187+
# ---------- Locale switching ----------
188+
def with_locale(
189+
self, locale: str, fallback_locale: str | None = None
190+
) -> "ColumnMapper":
191+
"""Create a copy with a different display locale (no re-read of YAML)."""
192+
return replace(
193+
self,
194+
locale=locale,
195+
fallback_locale=(fallback_locale or self.fallback_locale),
196+
)

0 commit comments

Comments
 (0)