Skip to content

Commit 364ca41

Browse files
authored
Merge pull request #131 from cyiallou/feat/add-component-state
Add state analysis module for extracting component state transitions and alerts
2 parents 8150274 + 1236092 commit 364ca41

File tree

4 files changed

+572
-0
lines changed

4 files changed

+572
-0
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
- The signature for passing config files MicrogridConfig.load_config() has been changed to accept a path a list of paths and a directory containing the config files.
1515
- `MicrogridData` class needs to be initialized with a `MicrogridConfig` object instead of a path to config file(s).
1616
- Added a transactional stateful data fetcher.
17+
- Added a new `state_analysis` module for detecting and analysing component state transitions and alerts from reporting data.
18+
- Provides structured `StateRecord` objects with human-readable enum names.
19+
- Supports filtering for alert states and warnings.
20+
- Includes full test coverage for transition detection and alert filtering logic.
1721

1822
## Bug Fixes
1923

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Data structures for representing component state transitions and alerts."""
5+
6+
from datetime import datetime
7+
from typing import NamedTuple
8+
9+
10+
class StateRecord(NamedTuple):
11+
"""A record of a component state change.
12+
13+
A named tuple was chosen to allow safe access to the fields while keeping
14+
the simplicity of a tuple. This data type can be easily used to create a
15+
numpy array or a pandas DataFrame.
16+
"""
17+
18+
microgrid_id: int
19+
"""The ID of the microgrid."""
20+
21+
component_id: str
22+
"""The ID of the component within the microgrid."""
23+
24+
state_type: str
25+
"""The type of the state (e.g., "state", "warning", "error")."""
26+
27+
state_value: str
28+
"""The value of the state (e.g., "ON", "OFF", "ERROR" etc.)."""
29+
30+
start_time: datetime | None
31+
"""The start time of the state change."""
32+
33+
end_time: datetime | None
34+
"""The end time of the state change."""
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Functions for analyzing microgrid component state transitions and extracting alerts."""
5+
import logging
6+
from datetime import datetime, timedelta
7+
8+
from frequenz.client.common.metric import Metric
9+
from frequenz.client.common.microgrid.components import (
10+
ComponentErrorCode,
11+
ComponentStateCode,
12+
)
13+
from frequenz.client.reporting import ReportingApiClient
14+
from frequenz.client.reporting._types import MetricSample
15+
16+
from ._state_records import StateRecord
17+
18+
_logger = logging.getLogger(__name__)
19+
20+
21+
# pylint: disable-next=too-many-arguments
22+
async def fetch_and_extract_state_durations(
23+
*,
24+
client: ReportingApiClient,
25+
microgrid_components: list[tuple[int, list[int]]],
26+
metrics: list[Metric],
27+
start_time: datetime,
28+
end_time: datetime,
29+
resampling_period: timedelta | None,
30+
alert_states: list[ComponentStateCode],
31+
include_warnings: bool = True,
32+
) -> tuple[list[StateRecord], list[StateRecord]]:
33+
"""Fetch data using the Reporting API and extract state durations and alert records.
34+
35+
Args:
36+
client: The client used to fetch the metric samples from the Reporting API.
37+
microgrid_components: List of tuples where each tuple contains microgrid
38+
ID and corresponding component IDs.
39+
metrics: List of metric names.
40+
NOTE: The service will support requesting states without metrics in
41+
the future and this argument will be removed.
42+
start_time: The start date and time for the period.
43+
end_time: The end date and time for the period.
44+
resampling_period: The period for resampling the data. If None, data
45+
will be returned in its original resolution.
46+
alert_states: List of ComponentStateCode names that should trigger an alert.
47+
include_warnings: Whether to include warning states in the alert records.
48+
49+
Returns:
50+
A tuple containing:
51+
- A list of StateRecord instances representing the state changes.
52+
- A list of StateRecord instances that match the alert criteria.
53+
"""
54+
samples = await _fetch_component_data(
55+
client=client,
56+
microgrid_components=microgrid_components,
57+
metrics=metrics,
58+
start_time=start_time,
59+
end_time=end_time,
60+
resampling_period=resampling_period,
61+
include_states=True,
62+
include_bounds=False,
63+
)
64+
65+
all_states = _extract_state_records(samples, include_warnings)
66+
alert_records = _filter_alerts(all_states, alert_states, include_warnings)
67+
return all_states, alert_records
68+
69+
70+
def _extract_state_records(
71+
samples: list[MetricSample], include_warnings: bool
72+
) -> list[StateRecord]:
73+
"""Extract state records from the provided samples.
74+
75+
Args:
76+
samples: List of MetricSample instances containing the reporting data.
77+
include_warnings: Whether to include warning states in the alert records.
78+
79+
Returns:
80+
A list of StateRecord instances representing the state changes.
81+
"""
82+
component_groups = _group_samples_by_component(samples, include_warnings)
83+
84+
all_records = []
85+
for (mid, cid), metrics in component_groups.items():
86+
if "state" not in metrics:
87+
continue
88+
all_records.extend(_process_sample_group(mid, cid, metrics))
89+
90+
all_records.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
91+
return all_records
92+
93+
94+
# pylint: disable-next=too-many-locals,too-many-branches
95+
def _process_sample_group(
96+
microgrid_id: int,
97+
component_id: str,
98+
samples_by_metric: dict[str, list[MetricSample]],
99+
) -> list[StateRecord]:
100+
"""Process state/error/warning samples for a single component.
101+
102+
Args:
103+
microgrid_id: ID of the microgrid.
104+
component_id: ID of the component.
105+
samples_by_metric: Dict with keys "state", "error", optionally "warning".
106+
107+
Returns:
108+
A list of StateRecord instances representing the state changes and
109+
error/warning durations (if any).
110+
"""
111+
state_samples = sorted(samples_by_metric["state"], key=lambda s: s.timestamp)
112+
error_by_ts = {s.timestamp: s for s in samples_by_metric.get("error", [])}
113+
warning_by_ts = {s.timestamp: s for s in samples_by_metric.get("warning", [])}
114+
115+
records: list[StateRecord] = []
116+
state_val = error_val = warning_val = None
117+
state_start = error_start = warning_start = None
118+
119+
def emit(
120+
metric: str,
121+
val: float,
122+
start: datetime | None,
123+
end: datetime | None,
124+
enum_class: type[ComponentStateCode | ComponentErrorCode],
125+
) -> None:
126+
"""Emit a state record."""
127+
records.append(
128+
StateRecord(
129+
microgrid_id=microgrid_id,
130+
component_id=component_id,
131+
state_type=metric,
132+
state_value=_resolve_enum_name(val, enum_class),
133+
start_time=start,
134+
end_time=end,
135+
)
136+
)
137+
138+
for sample in state_samples:
139+
ts = sample.timestamp
140+
141+
# State change
142+
if sample.value != state_val:
143+
if state_val is not None:
144+
emit("state", state_val, state_start, ts, ComponentStateCode)
145+
state_val = sample.value
146+
state_start = ts
147+
148+
# Close error/warning if exiting ERROR
149+
if state_val != ComponentStateCode.ERROR.value:
150+
if error_val is not None:
151+
emit("error", error_val, error_start, ts, ComponentErrorCode)
152+
error_val = error_start = None
153+
if warning_val is not None:
154+
emit("warning", warning_val, warning_start, ts, ComponentErrorCode)
155+
warning_val = warning_start = None
156+
157+
# While in ERROR
158+
if state_val == ComponentStateCode.ERROR.value:
159+
if ts in error_by_ts:
160+
new_err = error_by_ts[ts].value
161+
if new_err != error_val:
162+
if error_val is not None:
163+
emit("error", error_val, error_start, ts, ComponentErrorCode)
164+
error_val = new_err
165+
error_start = ts
166+
167+
if ts in warning_by_ts:
168+
new_warn = warning_by_ts[ts].value
169+
if new_warn != warning_val:
170+
if warning_val is not None:
171+
emit(
172+
"warning",
173+
warning_val,
174+
warning_start,
175+
ts,
176+
ComponentErrorCode,
177+
)
178+
warning_val = new_warn
179+
warning_start = ts
180+
181+
if state_val is not None:
182+
emit("state", state_val, state_start, None, ComponentStateCode)
183+
if state_val == ComponentStateCode.ERROR.value:
184+
if error_val is not None:
185+
emit("error", error_val, error_start, None, ComponentErrorCode)
186+
if warning_val is not None:
187+
emit("warning", warning_val, warning_start, None, ComponentErrorCode)
188+
return records
189+
190+
191+
def _group_samples_by_component(
192+
samples: list[MetricSample], include_warnings: bool
193+
) -> dict[tuple[int, str], dict[str, list[MetricSample]]]:
194+
"""Group samples by (microgrid_id, component_id) and metric.
195+
196+
Args:
197+
samples: List of MetricSample instances containing the reporting data.
198+
include_warnings: Whether to include warning states in the alert records.
199+
200+
Returns:
201+
A dictionary where keys are tuples of (microgrid_id, component_id) and values
202+
are dictionaries with metric names as keys and lists of MetricSample as values.
203+
"""
204+
alert_metrics = {"state", "error"}
205+
if include_warnings:
206+
alert_metrics.add("warning")
207+
208+
component_groups: dict[tuple[int, str], dict[str, list[MetricSample]]] = {}
209+
for sample in samples:
210+
if sample.metric not in alert_metrics:
211+
continue
212+
key = (sample.microgrid_id, sample.component_id)
213+
metric_dict = component_groups.setdefault(key, {})
214+
metric_dict.setdefault(sample.metric, []).append(sample)
215+
return component_groups
216+
217+
218+
def _resolve_enum_name(
219+
value: float, enum_class: type[ComponentStateCode | ComponentErrorCode]
220+
) -> str:
221+
"""Resolve the name of an enum member from its integer value.
222+
223+
Args:
224+
value: The integer value of the enum.
225+
enum_class: The enum class to convert the value to.
226+
227+
Returns:
228+
The name of the enum member if it exists, otherwise if the value is invalid,
229+
the enum class will return a default value (e.g., "UNSPECIFIED").
230+
"""
231+
result = enum_class.from_proto(int(value)) # type: ignore[arg-type]
232+
return result.name
233+
234+
235+
def _filter_alerts(
236+
all_states: list[StateRecord],
237+
alert_states: list[ComponentStateCode],
238+
include_warnings: bool,
239+
) -> list[StateRecord]:
240+
"""Identify alert records from all states.
241+
242+
Args:
243+
all_states: List of all state records.
244+
alert_states: List of ComponentStateCode names that should trigger an alert.
245+
include_warnings: Whether to include warning states in the alert records.
246+
247+
Returns:
248+
A list of StateRecord instances that match the alert criteria.
249+
"""
250+
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
251+
_alert_state_names = {state.name for state in alert_states}
252+
return [
253+
state
254+
for state in all_states
255+
if (
256+
(state.state_type == "state" and state.state_value in _alert_state_names)
257+
or (state.state_type in alert_metrics)
258+
)
259+
]
260+
261+
262+
# pylint: disable-next=too-many-arguments
263+
async def _fetch_component_data(
264+
*,
265+
client: ReportingApiClient,
266+
microgrid_components: list[tuple[int, list[int]]],
267+
metrics: list[Metric],
268+
start_time: datetime,
269+
end_time: datetime,
270+
resampling_period: timedelta | None,
271+
include_states: bool = False,
272+
include_bounds: bool = False,
273+
) -> list[MetricSample]:
274+
"""Fetch component data from the Reporting API.
275+
276+
Args:
277+
client: The client used to fetch the metric samples from the Reporting API.
278+
microgrid_components: List of tuples where each tuple contains
279+
microgrid ID and corresponding component IDs.
280+
metrics: List of metric names.
281+
start_time: The start date and time for the period.
282+
end_time: The end date and time for the period.
283+
resampling_period: The period for resampling the data. If None, data
284+
will be returned in its original resolution
285+
include_states: Whether to include the state data.
286+
include_bounds: Whether to include the bound data.
287+
288+
Returns:
289+
List of MetricSample instances containing the reporting data.
290+
"""
291+
return [
292+
sample
293+
async for sample in client.receive_microgrid_components_data(
294+
microgrid_components=microgrid_components,
295+
metrics=metrics,
296+
start_time=start_time,
297+
end_time=end_time,
298+
resampling_period=resampling_period,
299+
include_states=include_states,
300+
include_bounds=include_bounds,
301+
)
302+
]

0 commit comments

Comments
 (0)