Skip to content

Commit 968ad19

Browse files
feat: add a column mapper class
Signed-off-by: Mohammad Tayyab <[email protected]>
1 parent ecf0fe6 commit 968ad19

File tree

3 files changed

+255
-1
lines changed

3 files changed

+255
-1
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
## New Features
1212

1313
<!-- Here goes the main new features and examples or instructions on how to use them -->
14+
* Add flag to indicate whether PV is curtailable.
15+
* Add asset optimization reporting package with data fetcher and visualization module.
16+
* Add column mapping yaml file to maintain canonical, english and german column names used in reporting notebooks.
1417

1518
## Bug Fixes
1619

1720
* Replaces multiple duplicated plot functions with a single reusable one.
1821
* Handle empty weather/reporting dataframes gracefully to avoid transformation errors. The "Solar Maintenance" notebook is updated accordingly.
1922
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
2023

21-
* Add reusable, modular data processing functions
24+
* Add reusable, modular data processing functions for reporting notebooks.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
version: 1
2+
3+
time:
4+
tz_name: "Europe/Berlin"
5+
assume_tz: "UTC"
6+
7+
columns:
8+
timestamp:
9+
raw: "timestamp"
10+
display:
11+
en: "Timestamp"
12+
de: "Zeitpunkt"
13+
14+
# grid_consumption:
15+
# raw: "grid"
16+
# display:
17+
# en: "Grid Connection"
18+
# de: "Netzanschluss"
19+
20+
net_import:
21+
raw: "Netzbezug"
22+
display:
23+
en: "Grid Import"
24+
de: "Netzbezug"
25+
26+
net_consumption:
27+
raw: "consumption"
28+
display:
29+
en: "Net Consumption"
30+
de: "Netto Gesamtverbrauch"
31+
32+
battery_throughput:
33+
raw: "battery"
34+
display:
35+
en: "Battery Throughput"
36+
de: "Batterie Durchsatz"
37+
38+
battery_pos:
39+
raw: "battery_pos"
40+
display:
41+
en: "Battery Throughput (pos.)"
42+
de: "Batterie Durchsatz (positiv)"
43+
44+
pv_prod:
45+
raw: "PV Produktion"
46+
display:
47+
en: "PV Production"
48+
de: "PV Produktion"
49+
50+
pv_neg:
51+
raw: "pv_neg"
52+
display:
53+
en: "PV (neg.)"
54+
de: "PV (neg.)"
55+
56+
pv_excess:
57+
raw: "pv_excess"
58+
display:
59+
en: "PV Excess"
60+
de: "PV Überschuss"
61+
62+
pv_feedin:
63+
raw: "PV Einspeisung"
64+
display:
65+
en: "PV Feed-in"
66+
de: "PV Einspeisung"
67+
68+
pv_self:
69+
raw: "PV Eigenverbrauch"
70+
display:
71+
en: "PV Self-Consumption"
72+
de: "PV Eigenverbrauch"
73+
74+
pv_in_bat:
75+
raw: "pv_bat"
76+
display:
77+
en: "PV in Battery"
78+
de: "PV in Batterie"
79+
80+
pv_share:
81+
raw: "PV Eigenverbrauchsanteil"
82+
display:
83+
en: "PV Self-Consumption Share"
84+
de: "PV Eigenverbrauchsanteil"
85+
86+
pv_throughput:
87+
raw: "pv"
88+
display:
89+
en: "PV Throughput"
90+
de: "PV Durchsatz"
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Column mapping utilities for energy reporting.
5+
6+
Provides the `ColumnMapper` dataclass to manage renaming between raw,
7+
canonical, and localized display column names. Supports loading schema
8+
from YAML, locale-aware label resolution, and renaming of DataFrames.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
from dataclasses import dataclass, replace
14+
from typing import Dict, Iterable, Literal, Mapping, Optional
15+
16+
import pandas as pd
17+
import yaml
18+
19+
20+
@dataclass(frozen=True)
21+
class ColumnMapper: # pylint: disable=too-many-instance-attributes
22+
"""Column schema with locale-aware display labels."""
23+
24+
version: int
25+
tz_name: str
26+
assume_tz: str
27+
canonical_to_raw: Mapping[str, str]
28+
raw_to_canonical: Mapping[str, str]
29+
_labels_all: Mapping[str, Mapping[str, str]]
30+
locale: str = "de"
31+
fallback_locale: str = "en"
32+
33+
# ---------- Construction ----------
34+
@classmethod
35+
def from_yaml( # pylint: disable=too-many-locals
36+
cls,
37+
path: str,
38+
*,
39+
locale: str = "de",
40+
fallback_locale: str = "en",
41+
required: Optional[Iterable[str]] = None,
42+
) -> "ColumnMapper":
43+
"""
44+
Create a ColumnMapper from a YAML configuration file.
45+
46+
Args:
47+
path: Path to the YAML file containing the column mapping definition.
48+
locale: Preferred display locale (default: "de").
49+
fallback_locale: Fallback locale if the preferred one is missing
50+
(default: "en").
51+
required: Optional list of required canonical column names. Raises
52+
ValueError if any are missing.
53+
54+
Returns:
55+
A ColumnMapper instance built from the YAML configuration.
56+
57+
Raises:
58+
ValueError: If the YAML is missing required fields or contains invalid mappings.
59+
"""
60+
with open(path, "r", encoding="utf-8") as f:
61+
cfg = yaml.safe_load(f)
62+
63+
cols = cfg.get("columns") or {}
64+
if not isinstance(cols, dict) or not cols:
65+
raise ValueError("YAML 'columns' is missing or empty.")
66+
67+
c2r: Dict[str, str] = {}
68+
labels_all: Dict[str, Dict[str, str]] = {}
69+
70+
for canonical, spec in cols.items():
71+
if not isinstance(spec, dict):
72+
raise ValueError(f"Invalid spec for '{canonical}' (must be mapping).")
73+
raw = spec.get("raw")
74+
if not raw:
75+
raise ValueError(f"'raw' missing for canonical column '{canonical}'.")
76+
c2r[canonical] = raw
77+
78+
disp = spec.get("display") or {}
79+
if not isinstance(disp, dict):
80+
raise ValueError(f"'display' for '{canonical}' must be a mapping.")
81+
labels_all[canonical] = {str(k): str(v) for k, v in disp.items()}
82+
83+
# Build reverse map and check collisions
84+
r2c: Dict[str, str] = {}
85+
for c, r in c2r.items():
86+
if r in r2c and r2c[r] != c:
87+
raise ValueError(f"Raw column '{r}' maps to both '{r2c[r]}' and '{c}'.")
88+
r2c[r] = c
89+
90+
if required:
91+
missing = set(required) - set(c2r.keys())
92+
if missing:
93+
raise ValueError(
94+
f"Missing required canonical columns: {sorted(missing)}"
95+
)
96+
97+
time_cfg = cfg.get("time") or {}
98+
return cls(
99+
version=int(cfg.get("version", 0)),
100+
tz_name=str(time_cfg.get("tz_name", "UTC")),
101+
assume_tz=str(time_cfg.get("assume_tz", "UTC")),
102+
canonical_to_raw=c2r,
103+
raw_to_canonical=r2c,
104+
_labels_all=labels_all,
105+
locale=locale,
106+
fallback_locale=fallback_locale,
107+
)
108+
109+
# ---------- Properties ----------
110+
@property
111+
def canonical_to_display(self) -> Mapping[str, str]:
112+
"""Resolved display labels for the current locale (with fallback)."""
113+
return {
114+
c: (
115+
self._labels_all.get(c, {}).get(self.locale)
116+
or self._labels_all.get(c, {}).get(self.fallback_locale)
117+
or c
118+
)
119+
for c in self.canonical_to_raw
120+
}
121+
122+
@property
123+
def canonicals(self) -> Iterable[str]:
124+
"""Return the canonical column names defined in this mapper."""
125+
return self.canonical_to_raw.keys()
126+
127+
# ---------- Operations on DataFrames ----------
128+
def to_canonical(self, df: pd.DataFrame) -> pd.DataFrame:
129+
"""Rename incoming raw headers to canonical headers (normalize once)."""
130+
return df.rename(columns=self.raw_to_canonical)
131+
132+
def to_raw(self, df: pd.DataFrame) -> pd.DataFrame:
133+
"""Rename canonical headers back to raw."""
134+
return df.rename(columns=self.canonical_to_raw)
135+
136+
def to_display(self, df: pd.DataFrame) -> pd.DataFrame:
137+
"""Rename canonical headers to localized display labels."""
138+
return df.rename(columns=self.canonical_to_display)
139+
140+
def rename(
141+
self, df: pd.DataFrame, to: Literal["canonical", "raw", "display"]
142+
) -> pd.DataFrame:
143+
"""Unified renamer if you prefer one entrypoint."""
144+
if to == "canonical":
145+
return self.to_canonical(df)
146+
if to == "raw":
147+
return self.to_raw(df)
148+
if to == "display":
149+
return self.to_display(df)
150+
raise ValueError("to must be 'canonical', 'raw', or 'display'.")
151+
152+
# ---------- Locale switching ----------
153+
def with_locale(
154+
self, locale: str, fallback_locale: Optional[str] = None
155+
) -> "ColumnMapper":
156+
"""Create a copy with a different display locale (no re-read of YAML)."""
157+
return replace(
158+
self,
159+
locale=locale,
160+
fallback_locale=(fallback_locale or self.fallback_locale),
161+
)

0 commit comments

Comments
 (0)