Skip to content

Commit 8905696

Browse files
committed
Fix: align error and warning durations with state==ERROR segments
Previously, error and warning metric samples were tracked independently of the component's primary `state` metric. This led to incorrect or unbounded durations when the component transitioned into or out of `ERROR` state. This patch changes the extraction logic to emit error and warning `StateRecords` only while the component is in `ERROR` state, ensuring accurate segmentation. Key changes: - `_extract_state_records()` now groups samples by (microgrid_id, component_id) and collects state, error, and optional warning metrics together. - `_process_sample_group()` was rewritten to iterate over state samples and conditionally emit error and warning transitions aligned with the `ERROR` window. - Introduced an `emit()` helper to simplify record generation. Fixes: - ERROR segments with repeated values never closed - ERROR durations overshot actual ERROR state window Test suite has been updated to reflect the stricter state coordination. Signed-off-by: cyiallou - Costas <[email protected]>
1 parent 5263a62 commit 8905696

File tree

2 files changed

+142
-90
lines changed

2 files changed

+142
-90
lines changed

src/frequenz/lib/notebooks/reporting/state_analysis.py

Lines changed: 122 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"""Functions for analyzing microgrid component state transitions and extracting alerts."""
55
import logging
66
from datetime import datetime, timedelta
7-
from itertools import groupby
87

98
from frequenz.client.common.metric import Metric
109
from frequenz.client.common.microgrid.components import (
@@ -69,8 +68,7 @@ async def fetch_and_extract_state_durations(
6968

7069

7170
def _extract_state_records(
72-
samples: list[MetricSample],
73-
include_warnings: bool,
71+
samples: list[MetricSample], include_warnings: bool
7472
) -> list[StateRecord]:
7573
"""Extract state records from the provided samples.
7674
@@ -81,84 +79,140 @@ def _extract_state_records(
8179
Returns:
8280
A list of StateRecord instances representing the state changes.
8381
"""
84-
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
85-
state_metrics = ["state"] + alert_metrics
86-
filtered_samples = sorted(
87-
(s for s in samples if s.metric in state_metrics),
88-
key=lambda s: (s.microgrid_id, s.component_id, s.metric, s.timestamp),
89-
)
90-
91-
if not filtered_samples:
92-
return []
82+
component_groups = _group_samples_by_component(samples, include_warnings)
9383

94-
# Group samples by (microgrid_id, component_id, metric)
95-
all_states = []
96-
for key, group in groupby(
97-
filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric)
98-
):
99-
all_states.extend(_process_sample_group(key, list(group)))
84+
all_records = []
85+
for (mid, cid), metrics in component_groups.items():
86+
if "state" not in metrics:
87+
continue
88+
all_records.extend(_process_sample_group(mid, cid, metrics))
10089

101-
all_states.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
102-
return all_states
90+
all_records.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
91+
return all_records
10392

10493

94+
# pylint: disable-next=too-many-locals,too-many-branches
10595
def _process_sample_group(
106-
key: tuple[int, str, str],
107-
group_samples: list[MetricSample],
96+
microgrid_id: int,
97+
component_id: str,
98+
samples_by_metric: dict[str, list[MetricSample]],
10899
) -> list[StateRecord]:
109-
"""Process samples for a single group to extract state durations.
100+
"""Process state/error/warning samples for a single component.
110101
111102
Args:
112-
key: Tuple containing microgrid ID, component ID, and metric.
113-
group_samples: List of samples for the group.
103+
microgrid_id: ID of the microgrid.
104+
component_id: ID of the component.
105+
samples_by_metric: Dict with keys "state", "error", optionally "warning".
114106
115107
Returns:
116-
A list of StateRecord instances representing the state changes.
108+
A list of StateRecord instances representing the state changes and
109+
error/warning durations (if any).
117110
"""
118-
mid, cid, metric = key
119-
if not group_samples:
120-
return []
121-
122-
state_records = []
123-
current_state_value: float | None = None
124-
start_time: datetime | None = None
125-
enum_class = ComponentStateCode if metric == "state" else ComponentErrorCode
126-
127-
for sample in group_samples:
128-
if current_state_value != sample.value:
129-
# Close previous state run
130-
if current_state_value is not None:
131-
state_records.append(
132-
StateRecord(
133-
microgrid_id=mid,
134-
component_id=cid,
135-
state_type=metric,
136-
state_value=_resolve_enum_name(current_state_value, enum_class),
137-
start_time=start_time,
138-
end_time=sample.timestamp,
139-
)
140-
)
141-
# Start new state run
142-
current_state_value = sample.value
143-
start_time = sample.timestamp
144-
145-
# Close the last state run
146-
state_records.append(
147-
StateRecord(
148-
microgrid_id=mid,
149-
component_id=cid,
150-
state_type=metric,
151-
state_value=(
152-
_resolve_enum_name(current_state_value, enum_class)
153-
if current_state_value is not None
154-
else ""
155-
),
156-
start_time=start_time,
157-
end_time=None,
111+
state_samples = sorted(samples_by_metric["state"], key=lambda s: s.timestamp)
112+
error_by_ts = {s.timestamp: s for s in samples_by_metric.get("error", [])}
113+
warning_by_ts = {s.timestamp: s for s in samples_by_metric.get("warning", [])}
114+
115+
records: list[StateRecord] = []
116+
state_val = error_val = warning_val = None
117+
state_start = error_start = warning_start = None
118+
119+
def emit(
120+
metric: str,
121+
val: float,
122+
start: datetime | None,
123+
end: datetime | None,
124+
enum_class: type[ComponentStateCode | ComponentErrorCode],
125+
) -> None:
126+
"""Emit a state record."""
127+
records.append(
128+
StateRecord(
129+
microgrid_id=microgrid_id,
130+
component_id=component_id,
131+
state_type=metric,
132+
state_value=_resolve_enum_name(val, enum_class),
133+
start_time=start,
134+
end_time=end,
135+
)
158136
)
159-
)
160137

161-
return state_records
138+
for sample in state_samples:
139+
ts = sample.timestamp
140+
141+
# State change
142+
if sample.value != state_val:
143+
if state_val is not None:
144+
emit("state", state_val, state_start, ts, ComponentStateCode)
145+
state_val = sample.value
146+
state_start = ts
147+
148+
# Close error/warning if exiting ERROR
149+
if state_val != ComponentStateCode.ERROR.value:
150+
if error_val is not None:
151+
emit("error", error_val, error_start, ts, ComponentErrorCode)
152+
error_val = error_start = None
153+
if warning_val is not None:
154+
emit("warning", warning_val, warning_start, ts, ComponentErrorCode)
155+
warning_val = warning_start = None
156+
157+
# While in ERROR
158+
if state_val == ComponentStateCode.ERROR.value:
159+
if ts in error_by_ts:
160+
new_err = error_by_ts[ts].value
161+
if new_err != error_val:
162+
if error_val is not None:
163+
emit("error", error_val, error_start, ts, ComponentErrorCode)
164+
error_val = new_err
165+
error_start = ts
166+
167+
if ts in warning_by_ts:
168+
new_warn = warning_by_ts[ts].value
169+
if new_warn != warning_val:
170+
if warning_val is not None:
171+
emit(
172+
"warning",
173+
warning_val,
174+
warning_start,
175+
ts,
176+
ComponentErrorCode,
177+
)
178+
warning_val = new_warn
179+
warning_start = ts
180+
181+
if state_val is not None:
182+
emit("state", state_val, state_start, None, ComponentStateCode)
183+
if state_val == ComponentStateCode.ERROR.value:
184+
if error_val is not None:
185+
emit("error", error_val, error_start, None, ComponentErrorCode)
186+
if warning_val is not None:
187+
emit("warning", warning_val, warning_start, None, ComponentErrorCode)
188+
return records
189+
190+
191+
def _group_samples_by_component(
192+
samples: list[MetricSample], include_warnings: bool
193+
) -> dict[tuple[int, str], dict[str, list[MetricSample]]]:
194+
"""Group samples by (microgrid_id, component_id) and metric.
195+
196+
Args:
197+
samples: List of MetricSample instances containing the reporting data.
198+
include_warnings: Whether to include warning states in the alert records.
199+
200+
Returns:
201+
A dictionary where keys are tuples of (microgrid_id, component_id) and values
202+
are dictionaries with metric names as keys and lists of MetricSample as values.
203+
"""
204+
alert_metrics = {"state", "error"}
205+
if include_warnings:
206+
alert_metrics.add("warning")
207+
208+
component_groups: dict[tuple[int, str], dict[str, list[MetricSample]]] = {}
209+
for sample in samples:
210+
if sample.metric not in alert_metrics:
211+
continue
212+
key = (sample.microgrid_id, sample.component_id)
213+
metric_dict = component_groups.setdefault(key, {})
214+
metric_dict.setdefault(sample.metric, []).append(sample)
215+
return component_groups
162216

163217

164218
def _resolve_enum_name(

tests/test_state_analysis.py

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,74 +79,72 @@
7979
"description": "Warnings and errors included",
8080
"samples": [
8181
MetricSample(datetime(2023, 1, 2, 0, 0), 3, "303", "state", 0),
82+
MetricSample(datetime(2023, 1, 2, 0, 30), 3, "303", "state", 10),
8283
MetricSample(datetime(2023, 1, 2, 0, 30), 3, "303", "warning", 10),
8384
MetricSample(datetime(2023, 1, 2, 1, 0), 3, "303", "state", 1),
85+
MetricSample(datetime(2023, 1, 2, 1, 30), 3, "303", "state", 20),
8486
MetricSample(datetime(2023, 1, 2, 1, 30), 3, "303", "error", 20),
8587
],
8688
"alert_states": [ComponentStateCode.from_proto(1)], # type: ignore[arg-type]
8789
"include_warnings": True,
8890
"expected_all_states": [
89-
# State transitions
9091
{
9192
"microgrid_id": 3,
9293
"component_id": "303",
9394
"state_type": "state",
9495
"state_value": _resolve_enum_name(0, ComponentStateCode),
9596
"start_time": datetime(2023, 1, 2, 0, 0),
96-
"end_time": datetime(2023, 1, 2, 1, 0),
97+
"end_time": datetime(2023, 1, 2, 0, 30),
9798
},
9899
{
99100
"microgrid_id": 3,
100101
"component_id": "303",
101102
"state_type": "state",
102-
"state_value": _resolve_enum_name(1, ComponentStateCode),
103-
"start_time": datetime(2023, 1, 2, 1, 0),
104-
"end_time": None,
103+
"state_value": _resolve_enum_name(10, ComponentStateCode),
104+
"start_time": datetime(2023, 1, 2, 0, 30),
105+
"end_time": datetime(2023, 1, 2, 1, 0),
105106
},
106-
# Warning transitions
107107
{
108108
"microgrid_id": 3,
109109
"component_id": "303",
110-
"state_type": "warning",
111-
"state_value": _resolve_enum_name(10, ComponentErrorCode),
112-
"start_time": datetime(2023, 1, 2, 0, 30),
113-
"end_time": None,
110+
"state_type": "state",
111+
"state_value": _resolve_enum_name(1, ComponentStateCode),
112+
"start_time": datetime(2023, 1, 2, 1, 0),
113+
"end_time": datetime(2023, 1, 2, 1, 30),
114114
},
115-
# Error transitions
116115
{
117116
"microgrid_id": 3,
118117
"component_id": "303",
119-
"state_type": "error",
120-
"state_value": _resolve_enum_name(20, ComponentErrorCode),
118+
"state_type": "state",
119+
"state_value": _resolve_enum_name(20, ComponentStateCode),
121120
"start_time": datetime(2023, 1, 2, 1, 30),
122121
"end_time": None,
123122
},
124-
],
125-
"expected_alert_records": [
126123
{
127124
"microgrid_id": 3,
128125
"component_id": "303",
129126
"state_type": "warning",
130127
"state_value": _resolve_enum_name(10, ComponentErrorCode),
131128
"start_time": datetime(2023, 1, 2, 0, 30),
132-
"end_time": None,
129+
"end_time": datetime(2023, 1, 2, 1, 0),
133130
},
131+
],
132+
"expected_alert_records": [
134133
{
135134
"microgrid_id": 3,
136135
"component_id": "303",
137-
"state_type": "error",
138-
"state_value": _resolve_enum_name(20, ComponentErrorCode),
139-
"start_time": datetime(2023, 1, 2, 1, 30),
140-
"end_time": None,
136+
"state_type": "warning",
137+
"state_value": _resolve_enum_name(10, ComponentErrorCode),
138+
"start_time": datetime(2023, 1, 2, 0, 30),
139+
"end_time": datetime(2023, 1, 2, 1, 0),
141140
},
142-
# State alert
143141
{
144142
"microgrid_id": 3,
145143
"component_id": "303",
146144
"state_type": "state",
147145
"state_value": _resolve_enum_name(1, ComponentStateCode),
148146
"start_time": datetime(2023, 1, 2, 1, 0),
149-
"end_time": None,
147+
"end_time": datetime(2023, 1, 2, 1, 30),
150148
},
151149
],
152150
},

0 commit comments

Comments
 (0)