Skip to content

Commit 5263a62

Browse files
committed
Add state analysis module for extracting component state transitions and alerts
This introduces a new `state_analysis` module for analysing microgrid component state transitions using the Reporting API. It includes: - A `StateRecord` named tuple for structured representation of state changes. - Logic to fetch, extract, and group state/warning/error transitions. - Filtering support for alert detection based on `ComponentStateCode` values. - A private `_resolve_enum_name()` helper for translating raw integer states to descriptive enum names. - Added `# type: ignore[arg-type]` temporarily to ignore `mypy` errors raised for the deprecated `from_proto` method. This will be replaced in the neear future when the reporting and weather clients are updated to the latest `client-common` and `common-api` versions - currently, it will raise all sorts of errors when one tries to use `enum_from_proto` from `client-common` `v0.3.3`. - Tests. Signed-off-by: cyiallou - Costas <[email protected]>
1 parent 7dc2f2b commit 5263a62

File tree

4 files changed

+520
-0
lines changed

4 files changed

+520
-0
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
- Added consistent logger setup across all modules for structured logging and improved observability. Example notebooks updated to demonstrate logger usage.
1414
- The signature for passing config files MicrogridConfig.load_config() has been changed to accept a path a list of paths and a directory containing the config files.
1515
- `MicrogridData` class needs to be initialized with a `MicrogridConfig` object instead of a path to config file(s).
16+
- Added a new `state_analysis` module for detecting and analysing component state transitions and alerts from reporting data.
17+
- Provides structured `StateRecord` objects with human-readable enum names.
18+
- Supports filtering for alert states and warnings.
19+
- Includes full test coverage for transition detection and alert filtering logic.
1620

1721
## Bug Fixes
1822

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Data structures for representing component state transitions and alerts."""
5+
6+
from datetime import datetime
7+
from typing import NamedTuple
8+
9+
10+
class StateRecord(NamedTuple):
11+
"""A record of a component state change.
12+
13+
A named tuple was chosen to allow safe access to the fields while keeping
14+
the simplicity of a tuple. This data type can be easily used to create a
15+
numpy array or a pandas DataFrame.
16+
"""
17+
18+
microgrid_id: int
19+
"""The ID of the microgrid."""
20+
21+
component_id: str
22+
"""The ID of the component within the microgrid."""
23+
24+
state_type: str
25+
"""The type of the state (e.g., "state", "warning", "error")."""
26+
27+
state_value: str
28+
"""The value of the state (e.g., "ON", "OFF", "ERROR" etc.)."""
29+
30+
start_time: datetime | None
31+
"""The start time of the state change."""
32+
33+
end_time: datetime | None
34+
"""The end time of the state change."""
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Functions for analyzing microgrid component state transitions and extracting alerts."""
5+
import logging
6+
from datetime import datetime, timedelta
7+
from itertools import groupby
8+
9+
from frequenz.client.common.metric import Metric
10+
from frequenz.client.common.microgrid.components import (
11+
ComponentErrorCode,
12+
ComponentStateCode,
13+
)
14+
from frequenz.client.reporting import ReportingApiClient
15+
from frequenz.client.reporting._types import MetricSample
16+
17+
from ._state_records import StateRecord
18+
19+
_logger = logging.getLogger(__name__)
20+
21+
22+
# pylint: disable-next=too-many-arguments
23+
async def fetch_and_extract_state_durations(
24+
*,
25+
client: ReportingApiClient,
26+
microgrid_components: list[tuple[int, list[int]]],
27+
metrics: list[Metric],
28+
start_time: datetime,
29+
end_time: datetime,
30+
resampling_period: timedelta | None,
31+
alert_states: list[ComponentStateCode],
32+
include_warnings: bool = True,
33+
) -> tuple[list[StateRecord], list[StateRecord]]:
34+
"""Fetch data using the Reporting API and extract state durations and alert records.
35+
36+
Args:
37+
client: The client used to fetch the metric samples from the Reporting API.
38+
microgrid_components: List of tuples where each tuple contains microgrid
39+
ID and corresponding component IDs.
40+
metrics: List of metric names.
41+
NOTE: The service will support requesting states without metrics in
42+
the future and this argument will be removed.
43+
start_time: The start date and time for the period.
44+
end_time: The end date and time for the period.
45+
resampling_period: The period for resampling the data. If None, data
46+
will be returned in its original resolution.
47+
alert_states: List of ComponentStateCode names that should trigger an alert.
48+
include_warnings: Whether to include warning states in the alert records.
49+
50+
Returns:
51+
A tuple containing:
52+
- A list of StateRecord instances representing the state changes.
53+
- A list of StateRecord instances that match the alert criteria.
54+
"""
55+
samples = await _fetch_component_data(
56+
client=client,
57+
microgrid_components=microgrid_components,
58+
metrics=metrics,
59+
start_time=start_time,
60+
end_time=end_time,
61+
resampling_period=resampling_period,
62+
include_states=True,
63+
include_bounds=False,
64+
)
65+
66+
all_states = _extract_state_records(samples, include_warnings)
67+
alert_records = _filter_alerts(all_states, alert_states, include_warnings)
68+
return all_states, alert_records
69+
70+
71+
def _extract_state_records(
72+
samples: list[MetricSample],
73+
include_warnings: bool,
74+
) -> list[StateRecord]:
75+
"""Extract state records from the provided samples.
76+
77+
Args:
78+
samples: List of MetricSample instances containing the reporting data.
79+
include_warnings: Whether to include warning states in the alert records.
80+
81+
Returns:
82+
A list of StateRecord instances representing the state changes.
83+
"""
84+
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
85+
state_metrics = ["state"] + alert_metrics
86+
filtered_samples = sorted(
87+
(s for s in samples if s.metric in state_metrics),
88+
key=lambda s: (s.microgrid_id, s.component_id, s.metric, s.timestamp),
89+
)
90+
91+
if not filtered_samples:
92+
return []
93+
94+
# Group samples by (microgrid_id, component_id, metric)
95+
all_states = []
96+
for key, group in groupby(
97+
filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric)
98+
):
99+
all_states.extend(_process_sample_group(key, list(group)))
100+
101+
all_states.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
102+
return all_states
103+
104+
105+
def _process_sample_group(
106+
key: tuple[int, str, str],
107+
group_samples: list[MetricSample],
108+
) -> list[StateRecord]:
109+
"""Process samples for a single group to extract state durations.
110+
111+
Args:
112+
key: Tuple containing microgrid ID, component ID, and metric.
113+
group_samples: List of samples for the group.
114+
115+
Returns:
116+
A list of StateRecord instances representing the state changes.
117+
"""
118+
mid, cid, metric = key
119+
if not group_samples:
120+
return []
121+
122+
state_records = []
123+
current_state_value: float | None = None
124+
start_time: datetime | None = None
125+
enum_class = ComponentStateCode if metric == "state" else ComponentErrorCode
126+
127+
for sample in group_samples:
128+
if current_state_value != sample.value:
129+
# Close previous state run
130+
if current_state_value is not None:
131+
state_records.append(
132+
StateRecord(
133+
microgrid_id=mid,
134+
component_id=cid,
135+
state_type=metric,
136+
state_value=_resolve_enum_name(current_state_value, enum_class),
137+
start_time=start_time,
138+
end_time=sample.timestamp,
139+
)
140+
)
141+
# Start new state run
142+
current_state_value = sample.value
143+
start_time = sample.timestamp
144+
145+
# Close the last state run
146+
state_records.append(
147+
StateRecord(
148+
microgrid_id=mid,
149+
component_id=cid,
150+
state_type=metric,
151+
state_value=(
152+
_resolve_enum_name(current_state_value, enum_class)
153+
if current_state_value is not None
154+
else ""
155+
),
156+
start_time=start_time,
157+
end_time=None,
158+
)
159+
)
160+
161+
return state_records
162+
163+
164+
def _resolve_enum_name(
165+
value: float, enum_class: type[ComponentStateCode | ComponentErrorCode]
166+
) -> str:
167+
"""Resolve the name of an enum member from its integer value.
168+
169+
Args:
170+
value: The integer value of the enum.
171+
enum_class: The enum class to convert the value to.
172+
173+
Returns:
174+
The name of the enum member if it exists, otherwise if the value is invalid,
175+
the enum class will return a default value (e.g., "UNSPECIFIED").
176+
"""
177+
result = enum_class.from_proto(int(value)) # type: ignore[arg-type]
178+
return result.name
179+
180+
181+
def _filter_alerts(
182+
all_states: list[StateRecord],
183+
alert_states: list[ComponentStateCode],
184+
include_warnings: bool,
185+
) -> list[StateRecord]:
186+
"""Identify alert records from all states.
187+
188+
Args:
189+
all_states: List of all state records.
190+
alert_states: List of ComponentStateCode names that should trigger an alert.
191+
include_warnings: Whether to include warning states in the alert records.
192+
193+
Returns:
194+
A list of StateRecord instances that match the alert criteria.
195+
"""
196+
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
197+
_alert_state_names = {state.name for state in alert_states}
198+
return [
199+
state
200+
for state in all_states
201+
if (
202+
(state.state_type == "state" and state.state_value in _alert_state_names)
203+
or (state.state_type in alert_metrics)
204+
)
205+
]
206+
207+
208+
# pylint: disable-next=too-many-arguments
209+
async def _fetch_component_data(
210+
*,
211+
client: ReportingApiClient,
212+
microgrid_components: list[tuple[int, list[int]]],
213+
metrics: list[Metric],
214+
start_time: datetime,
215+
end_time: datetime,
216+
resampling_period: timedelta | None,
217+
include_states: bool = False,
218+
include_bounds: bool = False,
219+
) -> list[MetricSample]:
220+
"""Fetch component data from the Reporting API.
221+
222+
Args:
223+
client: The client used to fetch the metric samples from the Reporting API.
224+
microgrid_components: List of tuples where each tuple contains
225+
microgrid ID and corresponding component IDs.
226+
metrics: List of metric names.
227+
start_time: The start date and time for the period.
228+
end_time: The end date and time for the period.
229+
resampling_period: The period for resampling the data. If None, data
230+
will be returned in its original resolution
231+
include_states: Whether to include the state data.
232+
include_bounds: Whether to include the bound data.
233+
234+
Returns:
235+
List of MetricSample instances containing the reporting data.
236+
"""
237+
return [
238+
sample
239+
async for sample in client.receive_microgrid_components_data(
240+
microgrid_components=microgrid_components,
241+
metrics=metrics,
242+
start_time=start_time,
243+
end_time=end_time,
244+
resampling_period=resampling_period,
245+
include_states=include_states,
246+
include_bounds=include_bounds,
247+
)
248+
]

0 commit comments

Comments
 (0)