44"""Functions for analyzing microgrid component state transitions and extracting alerts."""
55import logging
66from datetime import datetime , timedelta
7- from itertools import groupby
87
98from frequenz .client .common .metric import Metric
109from frequenz .client .common .microgrid .components import (
@@ -69,8 +68,7 @@ async def fetch_and_extract_state_durations(
6968
7069
7170def _extract_state_records (
72- samples : list [MetricSample ],
73- include_warnings : bool ,
71+ samples : list [MetricSample ], include_warnings : bool
7472) -> list [StateRecord ]:
7573 """Extract state records from the provided samples.
7674
@@ -81,84 +79,140 @@ def _extract_state_records(
8179 Returns:
8280 A list of StateRecord instances representing the state changes.
8381 """
84- alert_metrics = ["warning" , "error" ] if include_warnings else ["error" ]
85- state_metrics = ["state" ] + alert_metrics
86- filtered_samples = sorted (
87- (s for s in samples if s .metric in state_metrics ),
88- key = lambda s : (s .microgrid_id , s .component_id , s .metric , s .timestamp ),
89- )
90-
91- if not filtered_samples :
92- return []
82+ component_groups = _group_samples_by_component (samples , include_warnings )
9383
94- # Group samples by (microgrid_id, component_id, metric)
95- all_states = []
96- for key , group in groupby (
97- filtered_samples , key = lambda s : (s .microgrid_id , s .component_id , s .metric )
98- ):
99- all_states .extend (_process_sample_group (key , list (group )))
84+ all_records = []
85+ for (mid , cid ), metrics in component_groups .items ():
86+ if "state" not in metrics :
87+ continue
88+ all_records .extend (_process_sample_group (mid , cid , metrics ))
10089
101- all_states .sort (key = lambda x : (x .microgrid_id , x .component_id , x .start_time ))
102- return all_states
90+ all_records .sort (key = lambda x : (x .microgrid_id , x .component_id , x .start_time ))
91+ return all_records
10392
10493
94+ # pylint: disable-next=too-many-locals,too-many-branches
10595def _process_sample_group (
106- key : tuple [int , str , str ],
107- group_samples : list [MetricSample ],
96+ microgrid_id : int ,
97+ component_id : str ,
98+ samples_by_metric : dict [str , list [MetricSample ]],
10899) -> list [StateRecord ]:
109- """Process samples for a single group to extract state durations .
100+ """Process state/error/warning samples for a single component .
110101
111102 Args:
112- key: Tuple containing microgrid ID, component ID, and metric.
113- group_samples: List of samples for the group.
103+ microgrid_id: ID of the microgrid.
104+ component_id: ID of the component.
105+ samples_by_metric: Dict with keys "state", "error", optionally "warning".
114106
115107 Returns:
116- A list of StateRecord instances representing the state changes.
108+ A list of StateRecord instances representing the state changes and
109+ error/warning durations (if any).
117110 """
118- mid , cid , metric = key
119- if not group_samples :
120- return []
121-
122- state_records = []
123- current_state_value : float | None = None
124- start_time : datetime | None = None
125- enum_class = ComponentStateCode if metric == "state" else ComponentErrorCode
126-
127- for sample in group_samples :
128- if current_state_value != sample .value :
129- # Close previous state run
130- if current_state_value is not None :
131- state_records .append (
132- StateRecord (
133- microgrid_id = mid ,
134- component_id = cid ,
135- state_type = metric ,
136- state_value = _resolve_enum_name (current_state_value , enum_class ),
137- start_time = start_time ,
138- end_time = sample .timestamp ,
139- )
140- )
141- # Start new state run
142- current_state_value = sample .value
143- start_time = sample .timestamp
144-
145- # Close the last state run
146- state_records .append (
147- StateRecord (
148- microgrid_id = mid ,
149- component_id = cid ,
150- state_type = metric ,
151- state_value = (
152- _resolve_enum_name (current_state_value , enum_class )
153- if current_state_value is not None
154- else ""
155- ),
156- start_time = start_time ,
157- end_time = None ,
111+ state_samples = sorted (samples_by_metric ["state" ], key = lambda s : s .timestamp )
112+ error_by_ts = {s .timestamp : s for s in samples_by_metric .get ("error" , [])}
113+ warning_by_ts = {s .timestamp : s for s in samples_by_metric .get ("warning" , [])}
114+
115+ records : list [StateRecord ] = []
116+ state_val = error_val = warning_val = None
117+ state_start = error_start = warning_start = None
118+
119+ def emit (
120+ metric : str ,
121+ val : float ,
122+ start : datetime | None ,
123+ end : datetime | None ,
124+ enum_class : type [ComponentStateCode | ComponentErrorCode ],
125+ ) -> None :
126+ """Emit a state record."""
127+ records .append (
128+ StateRecord (
129+ microgrid_id = microgrid_id ,
130+ component_id = component_id ,
131+ state_type = metric ,
132+ state_value = _resolve_enum_name (val , enum_class ),
133+ start_time = start ,
134+ end_time = end ,
135+ )
158136 )
159- )
160137
161- return state_records
138+ for sample in state_samples :
139+ ts = sample .timestamp
140+
141+ # State change
142+ if sample .value != state_val :
143+ if state_val is not None :
144+ emit ("state" , state_val , state_start , ts , ComponentStateCode )
145+ state_val = sample .value
146+ state_start = ts
147+
148+ # Close error/warning if exiting ERROR
149+ if state_val != ComponentStateCode .ERROR .value :
150+ if error_val is not None :
151+ emit ("error" , error_val , error_start , ts , ComponentErrorCode )
152+ error_val = error_start = None
153+ if warning_val is not None :
154+ emit ("warning" , warning_val , warning_start , ts , ComponentErrorCode )
155+ warning_val = warning_start = None
156+
157+ # While in ERROR
158+ if state_val == ComponentStateCode .ERROR .value :
159+ if ts in error_by_ts :
160+ new_err = error_by_ts [ts ].value
161+ if new_err != error_val :
162+ if error_val is not None :
163+ emit ("error" , error_val , error_start , ts , ComponentErrorCode )
164+ error_val = new_err
165+ error_start = ts
166+
167+ if ts in warning_by_ts :
168+ new_warn = warning_by_ts [ts ].value
169+ if new_warn != warning_val :
170+ if warning_val is not None :
171+ emit (
172+ "warning" ,
173+ warning_val ,
174+ warning_start ,
175+ ts ,
176+ ComponentErrorCode ,
177+ )
178+ warning_val = new_warn
179+ warning_start = ts
180+
181+ if state_val is not None :
182+ emit ("state" , state_val , state_start , None , ComponentStateCode )
183+ if state_val == ComponentStateCode .ERROR .value :
184+ if error_val is not None :
185+ emit ("error" , error_val , error_start , None , ComponentErrorCode )
186+ if warning_val is not None :
187+ emit ("warning" , warning_val , warning_start , None , ComponentErrorCode )
188+ return records
189+
190+
191+ def _group_samples_by_component (
192+ samples : list [MetricSample ], include_warnings : bool
193+ ) -> dict [tuple [int , str ], dict [str , list [MetricSample ]]]:
194+ """Group samples by (microgrid_id, component_id) and metric.
195+
196+ Args:
197+ samples: List of MetricSample instances containing the reporting data.
198+ include_warnings: Whether to include warning states in the alert records.
199+
200+ Returns:
201+ A dictionary where keys are tuples of (microgrid_id, component_id) and values
202+ are dictionaries with metric names as keys and lists of MetricSample as values.
203+ """
204+ alert_metrics = {"state" , "error" }
205+ if include_warnings :
206+ alert_metrics .add ("warning" )
207+
208+ component_groups : dict [tuple [int , str ], dict [str , list [MetricSample ]]] = {}
209+ for sample in samples :
210+ if sample .metric not in alert_metrics :
211+ continue
212+ key = (sample .microgrid_id , sample .component_id )
213+ metric_dict = component_groups .setdefault (key , {})
214+ metric_dict .setdefault (sample .metric , []).append (sample )
215+ return component_groups
162216
163217
164218def _resolve_enum_name (
0 commit comments