|
5 | 5 |
|
6 | 6 | from collections import namedtuple |
7 | 7 | from datetime import datetime, timedelta |
8 | | -from itertools import groupby |
9 | | -from typing import Any |
10 | 8 |
|
11 | 9 | from frequenz.client.common.metric import Metric |
12 | 10 | from frequenz.client.reporting import ReportingApiClient |
13 | | -from frequenz.client.reporting._types import MetricSample |
14 | 11 |
|
15 | 12 | CumulativeEnergy = namedtuple( |
16 | 13 | "CumulativeEnergy", ["start_time", "end_time", "consumption", "production"] |
@@ -145,218 +142,3 @@ async def cumulative_energy( |
145 | 142 | consumption=consumption, |
146 | 143 | production=production, |
147 | 144 | ) |
148 | | - |
149 | | - |
150 | | -# pylint: disable-next=too-many-arguments |
151 | | -async def fetch_and_extract_state_durations( |
152 | | - *, |
153 | | - client: ReportingApiClient, |
154 | | - microgrid_components: list[tuple[int, list[int]]], |
155 | | - metrics: list[Metric], |
156 | | - start_time: datetime, |
157 | | - end_time: datetime, |
158 | | - resampling_period: timedelta | None, |
159 | | - alert_states: list[int], |
160 | | - include_warnings: bool = True, |
161 | | -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: |
162 | | - """Fetch data using the Reporting API and extract state durations and alert records. |
163 | | -
|
164 | | - Args: |
165 | | - client: The client used to fetch the metric samples from the Reporting API. |
166 | | - microgrid_components: List of tuples where each tuple contains microgrid |
167 | | - ID and corresponding component IDs. |
168 | | - metrics: List of metric names. |
169 | | - NOTE: The service will support requesting states without metrics in |
170 | | - the future and this argument will be removed. |
171 | | - start_time: The start date and time for the period. |
172 | | - end_time: The end date and time for the period. |
173 | | - resampling_period: The period for resampling the data. If None, data |
174 | | - will be returned in its original resolution |
175 | | - alert_states: List of component state values that should trigger an alert. |
176 | | - include_warnings: Whether to include warning state values in the alert |
177 | | - records. |
178 | | -
|
179 | | - Returns: |
180 | | - A tuple containing two lists: |
181 | | - - all_states: Contains all state records including start and end times. |
182 | | - - alert_records: Contains filtered records matching the alert criteria. |
183 | | - """ |
184 | | - samples = await _fetch_component_data( |
185 | | - client=client, |
186 | | - microgrid_components=microgrid_components, |
187 | | - metrics=metrics, |
188 | | - start_time=start_time, |
189 | | - end_time=end_time, |
190 | | - resampling_period=resampling_period, |
191 | | - include_states=True, |
192 | | - include_bounds=False, |
193 | | - ) |
194 | | - |
195 | | - all_states, alert_records = extract_state_durations( |
196 | | - samples, alert_states, include_warnings |
197 | | - ) |
198 | | - return all_states, alert_records |
199 | | - |
200 | | - |
201 | | -def extract_state_durations( |
202 | | - samples: list[MetricSample], |
203 | | - alert_states: list[int], |
204 | | - include_warnings: bool = True, |
205 | | -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: |
206 | | - """ |
207 | | - Extract state durations and alert records based on state transitions. |
208 | | -
|
209 | | - Args: |
210 | | - samples: List of MetricSample instances containing the reporting data. |
211 | | - alert_states: List of component state values that should trigger an alert. |
212 | | - Component error codes are reported by default. |
213 | | - include_warnings: Whether to include warning state values in the alert records. |
214 | | -
|
215 | | - Returns: |
216 | | - A tuple containing two lists: |
217 | | - - all_states: Contains all state records including start and end times. |
218 | | - - alert_records: Contains filtered records matching the alert criteria. |
219 | | - """ |
220 | | - alert_metrics = ["warning", "error"] if include_warnings else ["error"] |
221 | | - state_metrics = ["state"] + alert_metrics |
222 | | - filtered_samples = sorted( |
223 | | - (s for s in samples if s.metric in state_metrics), |
224 | | - key=lambda s: (s.microgrid_id, s.component_id, s.metric, s.timestamp), |
225 | | - ) |
226 | | - |
227 | | - if not filtered_samples: |
228 | | - return [], [] |
229 | | - |
230 | | - # Group samples by (microgrid_id, component_id, metric) |
231 | | - all_states = [] |
232 | | - for key, group in groupby( |
233 | | - filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric) |
234 | | - ): |
235 | | - states = _process_group_samples(key, list(group)) |
236 | | - all_states.extend(states) |
237 | | - |
238 | | - all_states.sort( |
239 | | - key=lambda x: (x["microgrid_id"], x["component_id"], x["start_time"]) |
240 | | - ) |
241 | | - |
242 | | - alert_records = _filter_alerts(all_states, alert_states, alert_metrics) |
243 | | - return all_states, alert_records |
244 | | - |
245 | | - |
246 | | -def _process_group_samples( |
247 | | - key: tuple[int, int, str], |
248 | | - group_samples: list["MetricSample"], |
249 | | -) -> list[dict[str, Any]]: |
250 | | - """Process samples for a single group to extract state durations. |
251 | | -
|
252 | | - Args: |
253 | | - key: Tuple containing microgrid ID, component ID, and metric. |
254 | | - group_samples: List of samples for the group. |
255 | | -
|
256 | | - Returns: |
257 | | - List of state records. |
258 | | - """ |
259 | | - mid, cid, metric = key |
260 | | - state_records = [] |
261 | | - current_state_value = None |
262 | | - start_time = None |
263 | | - |
264 | | - for sample in group_samples: |
265 | | - if current_state_value != sample.value: |
266 | | - # Close previous state run |
267 | | - if current_state_value is not None: |
268 | | - state_records.append( |
269 | | - { |
270 | | - "microgrid_id": mid, |
271 | | - "component_id": cid, |
272 | | - "state_type": metric, |
273 | | - "state_value": current_state_value, |
274 | | - "start_time": start_time, |
275 | | - "end_time": sample.timestamp, |
276 | | - } |
277 | | - ) |
278 | | - # Start new state run |
279 | | - current_state_value = sample.value |
280 | | - start_time = sample.timestamp |
281 | | - |
282 | | - # Close the last state run |
283 | | - state_records.append( |
284 | | - { |
285 | | - "microgrid_id": mid, |
286 | | - "component_id": cid, |
287 | | - "state_type": metric, |
288 | | - "state_value": current_state_value, |
289 | | - "start_time": start_time, |
290 | | - "end_time": None, |
291 | | - } |
292 | | - ) |
293 | | - |
294 | | - return state_records |
295 | | - |
296 | | - |
297 | | -def _filter_alerts( |
298 | | - all_states: list[dict[str, Any]], |
299 | | - alert_states: list[int], |
300 | | - alert_metrics: list[str], |
301 | | -) -> list[dict[str, Any]]: |
302 | | - """Identify alert records from all states. |
303 | | -
|
304 | | - Args: |
305 | | - all_states: List of all state records. |
306 | | - alert_states: List of component state values that should trigger an alert. |
307 | | - alert_metrics: List of metric names that should trigger an alert. |
308 | | -
|
309 | | - Returns: |
310 | | - List of alert records. |
311 | | - """ |
312 | | - return [ |
313 | | - state |
314 | | - for state in all_states |
315 | | - if ( |
316 | | - (state["state_type"] == "state" and state["state_value"] in alert_states) |
317 | | - or (state["state_type"] in alert_metrics) |
318 | | - ) |
319 | | - ] |
320 | | - |
321 | | - |
322 | | -# pylint: disable-next=too-many-arguments |
323 | | -async def _fetch_component_data( |
324 | | - *, |
325 | | - client: ReportingApiClient, |
326 | | - microgrid_components: list[tuple[int, list[int]]], |
327 | | - metrics: list[Metric], |
328 | | - start_time: datetime, |
329 | | - end_time: datetime, |
330 | | - resampling_period: timedelta | None, |
331 | | - include_states: bool = False, |
332 | | - include_bounds: bool = False, |
333 | | -) -> list[MetricSample]: |
334 | | - """Fetch component data from the Reporting API. |
335 | | -
|
336 | | - Args: |
337 | | - client: The client used to fetch the metric samples from the Reporting API. |
338 | | - microgrid_components: List of tuples where each tuple contains |
339 | | - microgrid ID and corresponding component IDs. |
340 | | - metrics: List of metric names. |
341 | | - start_time: The start date and time for the period. |
342 | | - end_time: The end date and time for the period. |
343 | | - resampling_period: The period for resampling the data. If None, data |
344 | | - will be returned in its original resolution |
345 | | - include_states: Whether to include the state data. |
346 | | - include_bounds: Whether to include the bound data. |
347 | | -
|
348 | | - Returns: |
349 | | - List of MetricSample instances containing the reporting data. |
350 | | - """ |
351 | | - return [ |
352 | | - sample |
353 | | - async for sample in client.receive_microgrid_components_data( |
354 | | - microgrid_components=microgrid_components, |
355 | | - metrics=metrics, |
356 | | - start_time=start_time, |
357 | | - end_time=end_time, |
358 | | - resampling_period=resampling_period, |
359 | | - include_states=include_states, |
360 | | - include_bounds=include_bounds, |
361 | | - ) |
362 | | - ] |
0 commit comments