Skip to content

Commit f1fd44c

Browse files
committed
Refactor state extraction to use typed StateRecord and enums
- Introduced `StateRecord`, a typed NamedTuple, to replace dict-based state records. - Replaced raw integer state values with proper enum names using `ComponentStateCode` and `ComponentErrorCode`. - Improved internal structure by splitting `_extract_state_records()` and `_filter_alerts()` into cleaner, testable units. - Added `_resolve_enum_name()` to map proto values to readable enum names. - Updated unit tests accordingly for enum-based comparisons and removed deprecated test cases. This improves downstream use of state records, such as in alert visualisation and reporting. Signed-off-by: cyiallou - Costas <[email protected]>
1 parent badc443 commit f1fd44c

File tree

3 files changed

+158
-100
lines changed

3 files changed

+158
-100
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
- Refactored state extraction to return `StateRecord` objects with descriptive enum names (e.g., "ACTIVE", "FAULT") instead of raw integers for `state_value`, improving clarity and downstream usability.
1010

1111
## New Features
1212

src/frequenz/reporting/_reporting.py

Lines changed: 119 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,54 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""A highlevel interface for the reporting API."""
5-
5+
import enum
6+
import logging
67
from collections import namedtuple
78
from datetime import datetime, timedelta
89
from itertools import groupby
9-
from typing import Any
10+
from typing import NamedTuple
1011

12+
from frequenz.client.common.enum_proto import enum_from_proto
1113
from frequenz.client.common.metric import Metric
14+
from frequenz.client.common.microgrid.components import (
15+
ComponentErrorCode,
16+
ComponentStateCode,
17+
)
1218
from frequenz.client.reporting import ReportingApiClient
1319
from frequenz.client.reporting._types import MetricSample
1420

21+
_logger = logging.getLogger(__name__)
22+
1523
CumulativeEnergy = namedtuple(
1624
"CumulativeEnergy", ["start_time", "end_time", "consumption", "production"]
1725
)
18-
"""Type for cumulative energy consumption and production over a specified time."""
26+
27+
28+
class StateRecord(NamedTuple):
29+
"""A record of a component state change.
30+
31+
A named tuple was chosen to allow safe access to the fields while keeping
32+
the simplicity of a tuple. This data type can be easily used to create a
33+
numpy array or a pandas DataFrame.
34+
"""
35+
36+
microgrid_id: int
37+
"""The ID of the microgrid."""
38+
39+
component_id: str
40+
"""The ID of the component within the microgrid."""
41+
42+
state_type: str
43+
"""The type of the state (e.g., "state", "warning", "error")."""
44+
45+
state_value: str
46+
"""The value of the state (e.g., "ON", "OFF", "ERROR" etc.)."""
47+
48+
start_time: datetime | None
49+
"""The start time of the state change."""
50+
51+
end_time: datetime | None
52+
"""The end time of the state change."""
1953

2054

2155
# pylint: disable-next=too-many-arguments
@@ -156,9 +190,9 @@ async def fetch_and_extract_state_durations(
156190
start_time: datetime,
157191
end_time: datetime,
158192
resampling_period: timedelta | None,
159-
alert_states: list[int],
193+
alert_states: list[ComponentStateCode],
160194
include_warnings: bool = True,
161-
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
195+
) -> tuple[list[StateRecord], list[StateRecord]]:
162196
"""Fetch data using the Reporting API and extract state durations and alert records.
163197
164198
Args:
@@ -172,14 +206,13 @@ async def fetch_and_extract_state_durations(
172206
end_time: The end date and time for the period.
173207
resampling_period: The period for resampling the data. If None, data
174208
will be returned in its original resolution
175-
alert_states: List of component state values that should trigger an alert.
176-
include_warnings: Whether to include warning state values in the alert
177-
records.
209+
alert_states: List of ComponentStateCode names that should trigger an alert.
210+
include_warnings: Whether to include warning states in the alert records.
178211
179212
Returns:
180-
A tuple containing two lists:
181-
- all_states: Contains all state records including start and end times.
182-
- alert_records: Contains filtered records matching the alert criteria.
213+
A tuple containing:
214+
- A list of StateRecord instances representing the state changes.
215+
- A list of StateRecord instances that match the alert criteria.
183216
"""
184217
samples = await _fetch_component_data(
185218
client=client,
@@ -192,30 +225,23 @@ async def fetch_and_extract_state_durations(
192225
include_bounds=False,
193226
)
194227

195-
all_states, alert_records = extract_state_durations(
196-
samples, alert_states, include_warnings
197-
)
228+
all_states = _extract_state_records(samples, include_warnings)
229+
alert_records = _filter_alerts(all_states, alert_states, include_warnings)
198230
return all_states, alert_records
199231

200232

201-
def extract_state_durations(
233+
def _extract_state_records(
202234
samples: list[MetricSample],
203-
alert_states: list[int],
204-
include_warnings: bool = True,
205-
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
206-
"""
207-
Extract state durations and alert records based on state transitions.
235+
include_warnings: bool,
236+
) -> list[StateRecord]:
237+
"""Extract state records from the provided samples.
208238
209239
Args:
210240
samples: List of MetricSample instances containing the reporting data.
211-
alert_states: List of component state values that should trigger an alert.
212-
Component error codes are reported by default.
213-
include_warnings: Whether to include warning state values in the alert records.
241+
include_warnings: Whether to include warning states in the alert records.
214242
215243
Returns:
216-
A tuple containing two lists:
217-
- all_states: Contains all state records including start and end times.
218-
- alert_records: Contains filtered records matching the alert criteria.
244+
A list of StateRecord instances representing the state changes.
219245
"""
220246
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
221247
state_metrics = ["state"] + alert_metrics
@@ -225,96 +251,122 @@ def extract_state_durations(
225251
)
226252

227253
if not filtered_samples:
228-
return [], []
254+
return []
229255

230256
# Group samples by (microgrid_id, component_id, metric)
231257
all_states = []
232258
for key, group in groupby(
233259
filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric)
234260
):
235-
states = _process_group_samples(key, list(group))
236-
all_states.extend(states)
261+
all_states.extend(_process_sample_group(key, list(group)))
237262

238-
all_states.sort(
239-
key=lambda x: (x["microgrid_id"], x["component_id"], x["start_time"])
240-
)
241-
242-
alert_records = _filter_alerts(all_states, alert_states, alert_metrics)
243-
return all_states, alert_records
263+
all_states.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
264+
return all_states
244265

245266

246-
def _process_group_samples(
247-
key: tuple[int, int, str],
248-
group_samples: list["MetricSample"],
249-
) -> list[dict[str, Any]]:
267+
def _process_sample_group(
268+
key: tuple[int, str, str],
269+
group_samples: list[MetricSample],
270+
) -> list[StateRecord]:
250271
"""Process samples for a single group to extract state durations.
251272
252273
Args:
253274
key: Tuple containing microgrid ID, component ID, and metric.
254275
group_samples: List of samples for the group.
255276
256277
Returns:
257-
List of state records.
278+
A list of StateRecord instances representing the state changes.
258279
"""
259280
mid, cid, metric = key
281+
if not group_samples:
282+
return []
283+
260284
state_records = []
261-
current_state_value = None
262-
start_time = None
285+
current_state_value: float | None = None
286+
start_time: datetime | None = None
287+
enum_class = ComponentStateCode if metric == "state" else ComponentErrorCode
263288

264289
for sample in group_samples:
265290
if current_state_value != sample.value:
266291
# Close previous state run
267292
if current_state_value is not None:
268293
state_records.append(
269-
{
270-
"microgrid_id": mid,
271-
"component_id": cid,
272-
"state_type": metric,
273-
"state_value": current_state_value,
274-
"start_time": start_time,
275-
"end_time": sample.timestamp,
276-
}
294+
StateRecord(
295+
microgrid_id=mid,
296+
component_id=cid,
297+
state_type=metric,
298+
state_value=_resolve_enum_name(current_state_value, enum_class),
299+
start_time=start_time,
300+
end_time=sample.timestamp,
301+
)
277302
)
278303
# Start new state run
279304
current_state_value = sample.value
280305
start_time = sample.timestamp
281306

282307
# Close the last state run
283308
state_records.append(
284-
{
285-
"microgrid_id": mid,
286-
"component_id": cid,
287-
"state_type": metric,
288-
"state_value": current_state_value,
289-
"start_time": start_time,
290-
"end_time": None,
291-
}
309+
StateRecord(
310+
microgrid_id=mid,
311+
component_id=cid,
312+
state_type=metric,
313+
state_value=(
314+
_resolve_enum_name(current_state_value, enum_class)
315+
if current_state_value is not None
316+
else ""
317+
),
318+
start_time=start_time,
319+
end_time=None,
320+
)
292321
)
293322

294323
return state_records
295324

296325

326+
def _resolve_enum_name(value: float, enum_class: type[enum.Enum]) -> str:
327+
"""Resolve the name of an enum member from its integer value.
328+
329+
Args:
330+
value: The integer value of the enum.
331+
enum_class: The enum class to convert the value to.
332+
333+
Returns:
334+
The name of the enum member if it exists, otherwise the value as a string.
335+
"""
336+
result = enum_from_proto(int(value), enum_class)
337+
if isinstance(result, int):
338+
_logger.warning(
339+
"Unknown enum value %s for %s, returning the integer value as a string.",
340+
value,
341+
enum_class.__name__,
342+
)
343+
return str(result)
344+
return result.name
345+
346+
297347
def _filter_alerts(
298-
all_states: list[dict[str, Any]],
299-
alert_states: list[int],
300-
alert_metrics: list[str],
301-
) -> list[dict[str, Any]]:
348+
all_states: list[StateRecord],
349+
alert_states: list[ComponentStateCode],
350+
include_warnings: bool,
351+
) -> list[StateRecord]:
302352
"""Identify alert records from all states.
303353
304354
Args:
305355
all_states: List of all state records.
306-
alert_states: List of component state values that should trigger an alert.
307-
alert_metrics: List of metric names that should trigger an alert.
356+
alert_states: List of ComponentStateCode names that should trigger an alert.
357+
include_warnings: Whether to include warning states in the alert records.
308358
309359
Returns:
310-
List of alert records.
360+
A list of StateRecord instances that match the alert criteria.
311361
"""
362+
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
363+
_alert_state_names = {state.name for state in alert_states}
312364
return [
313365
state
314366
for state in all_states
315367
if (
316-
(state["state_type"] == "state" and state["state_value"] in alert_states)
317-
or (state["state_type"] in alert_metrics)
368+
(state.state_type == "state" and state.state_value in _alert_state_names)
369+
or (state.state_type in alert_metrics)
318370
)
319371
]
320372

0 commit comments

Comments
 (0)