diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f8281f2..9af6ea7 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,6 +12,7 @@ ## New Features * Add Readme information +* Added functionality to extract state durations and filter alerts. ## Bug Fixes diff --git a/src/frequenz/reporting/_reporting.py b/src/frequenz/reporting/_reporting.py index bd52bcb..1d94624 100644 --- a/src/frequenz/reporting/_reporting.py +++ b/src/frequenz/reporting/_reporting.py @@ -5,9 +5,12 @@ from collections import namedtuple from datetime import datetime, timedelta +from itertools import groupby +from typing import Any from frequenz.client.common.metric import Metric from frequenz.client.reporting import ReportingApiClient +from frequenz.client.reporting._client import MetricSample CumulativeEnergy = namedtuple( "CumulativeEnergy", ["start_time", "end_time", "consumption", "production"] @@ -122,3 +125,218 @@ async def cumulative_energy( consumption=consumption, production=production, ) + + +# pylint: disable-next=too-many-arguments +async def fetch_and_extract_state_durations( + *, + client: ReportingApiClient, + microgrid_components: list[tuple[int, list[int]]], + metrics: list[Metric], + start_time: datetime, + end_time: datetime, + resampling_period: timedelta | None, + alert_states: list[int], + include_warnings: bool = True, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Fetch data using the Reporting API and extract state durations and alert records. + + Args: + client: The client used to fetch the metric samples from the Reporting API. + microgrid_components: List of tuples where each tuple contains microgrid + ID and corresponding component IDs. + metrics: List of metric names. + NOTE: The service will support requesting states without metrics in + the future and this argument will be removed. + start_time: The start date and time for the period. + end_time: The end date and time for the period. + resampling_period: The period for resampling the data. If None, data + will be returned in its original resolution + alert_states: List of component state values that should trigger an alert. + include_warnings: Whether to include warning state values in the alert + records. + + Returns: + A tuple containing two lists: + - all_states: Contains all state records including start and end times. + - alert_records: Contains filtered records matching the alert criteria. + """ + samples = await _fetch_component_data( + client=client, + microgrid_components=microgrid_components, + metrics=metrics, + start_time=start_time, + end_time=end_time, + resampling_period=resampling_period, + include_states=True, + include_bounds=False, + ) + + all_states, alert_records = extract_state_durations( + samples, alert_states, include_warnings + ) + return all_states, alert_records + + +def extract_state_durations( + samples: list[MetricSample], + alert_states: list[int], + include_warnings: bool = True, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """ + Extract state durations and alert records based on state transitions. + + Args: + samples: List of MetricSample instances containing the reporting data. + alert_states: List of component state values that should trigger an alert. + Component error codes are reported by default. + include_warnings: Whether to include warning state values in the alert records. + + Returns: + A tuple containing two lists: + - all_states: Contains all state records including start and end times. + - alert_records: Contains filtered records matching the alert criteria. + """ + alert_metrics = ["warning", "error"] if include_warnings else ["error"] + state_metrics = ["state"] + alert_metrics + filtered_samples = sorted( + (s for s in samples if s.metric in state_metrics), + key=lambda s: (s.microgrid_id, s.component_id, s.metric, s.timestamp), + ) + + if not filtered_samples: + return [], [] + + # Group samples by (microgrid_id, component_id, metric) + all_states = [] + for key, group in groupby( + filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric) + ): + states = _process_group_samples(key, list(group)) + all_states.extend(states) + + all_states.sort( + key=lambda x: (x["microgrid_id"], x["component_id"], x["start_time"]) + ) + + alert_records = _filter_alerts(all_states, alert_states, alert_metrics) + return all_states, alert_records + + +def _process_group_samples( + key: tuple[int, int, str], + group_samples: list["MetricSample"], +) -> list[dict[str, Any]]: + """Process samples for a single group to extract state durations. + + Args: + key: Tuple containing microgrid ID, component ID, and metric. + group_samples: List of samples for the group. + + Returns: + List of state records. + """ + mid, cid, metric = key + state_records = [] + current_state_value = None + start_time = None + + for sample in group_samples: + if current_state_value != sample.value: + # Close previous state run + if current_state_value is not None: + state_records.append( + { + "microgrid_id": mid, + "component_id": cid, + "state_type": metric, + "state_value": current_state_value, + "start_time": start_time, + "end_time": sample.timestamp, + } + ) + # Start new state run + current_state_value = sample.value + start_time = sample.timestamp + + # Close the last state run + state_records.append( + { + "microgrid_id": mid, + "component_id": cid, + "state_type": metric, + "state_value": current_state_value, + "start_time": start_time, + "end_time": None, + } + ) + + return state_records + + +def _filter_alerts( + all_states: list[dict[str, Any]], + alert_states: list[int], + alert_metrics: list[str], +) -> list[dict[str, Any]]: + """Identify alert records from all states. + + Args: + all_states: List of all state records. + alert_states: List of component state values that should trigger an alert. + alert_metrics: List of metric names that should trigger an alert. + + Returns: + List of alert records. + """ + return [ + state + for state in all_states + if ( + (state["state_type"] == "state" and state["state_value"] in alert_states) + or (state["state_type"] in alert_metrics) + ) + ] + + +# pylint: disable-next=too-many-arguments +async def _fetch_component_data( + *, + client: ReportingApiClient, + microgrid_components: list[tuple[int, list[int]]], + metrics: list[Metric], + start_time: datetime, + end_time: datetime, + resampling_period: timedelta | None, + include_states: bool = False, + include_bounds: bool = False, +) -> list[MetricSample]: + """Fetch component data from the Reporting API. + + Args: + client: The client used to fetch the metric samples from the Reporting API. + microgrid_components: List of tuples where each tuple contains + microgrid ID and corresponding component IDs. + metrics: List of metric names. + start_time: The start date and time for the period. + end_time: The end date and time for the period. + resampling_period: The period for resampling the data. If None, data + will be returned in its original resolution + include_states: Whether to include the state data. + include_bounds: Whether to include the bound data. + + Returns: + List of MetricSample instances containing the reporting data. + """ + return [ + sample + async for sample in client.list_microgrid_components_data( + microgrid_components=microgrid_components, + metrics=metrics, + start_dt=start_time, + end_dt=end_time, + resampling_period=resampling_period, + include_states=include_states, + include_bounds=include_bounds, + ) + ] diff --git a/tests/test_frequenz_reporting.py b/tests/test_frequenz_reporting.py index a76f895..b24ae27 100644 --- a/tests/test_frequenz_reporting.py +++ b/tests/test_frequenz_reporting.py @@ -2,9 +2,202 @@ # Copyright © 2024 Frequenz Energy-as-a-Service GmbH """Tests for the frequenz.reporting package.""" +from datetime import datetime +from typing import Any + import pytest +from frequenz.client.reporting._client import MetricSample from frequenz.reporting import delete_me +from frequenz.reporting._reporting import extract_state_durations + +test_cases_extract_state_durations = [ + { + "description": "Empty samples", + "samples": [], + "alert_states": [1, 2], + "include_warnings": True, + "expected_all_states": [], + "expected_alert_records": [], + }, + { + "description": "No matching metrics", + "samples": [ + MetricSample(datetime(2023, 1, 1, 0, 0), 1, 101, "temperature", 25), + MetricSample(datetime(2023, 1, 1, 1, 0), 1, 101, "humidity", 60), + ], + "alert_states": [1], + "include_warnings": True, + "expected_all_states": [], + "expected_alert_records": [], + }, + { + "description": "Single state change", + "samples": [ + MetricSample(datetime(2023, 1, 1, 0, 0), 1, 101, "state", 0), + MetricSample(datetime(2023, 1, 1, 1, 0), 1, 101, "state", 1), + ], + "alert_states": [1], + "include_warnings": True, + "expected_all_states": [ + { + "microgrid_id": 1, + "component_id": 101, + "state_type": "state", + "state_value": 0, + "start_time": datetime(2023, 1, 1, 0, 0), + "end_time": datetime(2023, 1, 1, 1, 0), + }, + { + "microgrid_id": 1, + "component_id": 101, + "state_type": "state", + "state_value": 1, + "start_time": datetime(2023, 1, 1, 1, 0), + "end_time": None, + }, + ], + "expected_alert_records": [ + { + "microgrid_id": 1, + "component_id": 101, + "state_type": "state", + "state_value": 1, + "start_time": datetime(2023, 1, 1, 1, 0), + "end_time": None, + }, + ], + }, + { + "description": "Warnings and errors included", + "samples": [ + MetricSample(datetime(2023, 1, 2, 0, 0), 3, 303, "state", 0), + MetricSample(datetime(2023, 1, 2, 0, 30), 3, 303, "warning", "W1"), + MetricSample(datetime(2023, 1, 2, 1, 0), 3, 303, "state", 1), + MetricSample(datetime(2023, 1, 2, 1, 30), 3, 303, "error", "E1"), + ], + "alert_states": [1], + "include_warnings": True, + "expected_all_states": [ + # State transitions + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "state", + "state_value": 0, + "start_time": datetime(2023, 1, 2, 0, 0), + "end_time": datetime(2023, 1, 2, 1, 0), + }, + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "state", + "state_value": 1, + "start_time": datetime(2023, 1, 2, 1, 0), + "end_time": None, + }, + # Warning transitions + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "warning", + "state_value": "W1", + "start_time": datetime(2023, 1, 2, 0, 30), + "end_time": None, + }, + # Error transitions + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "error", + "state_value": "E1", + "start_time": datetime(2023, 1, 2, 1, 30), + "end_time": None, + }, + ], + "expected_alert_records": [ + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "warning", + "state_value": "W1", + "start_time": datetime(2023, 1, 2, 0, 30), + "end_time": None, + }, + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "error", + "state_value": "E1", + "start_time": datetime(2023, 1, 2, 1, 30), + "end_time": None, + }, + # State alert + { + "microgrid_id": 3, + "component_id": 303, + "state_type": "state", + "state_value": 1, + "start_time": datetime(2023, 1, 2, 1, 0), + "end_time": None, + }, + ], + }, +] + + +@pytest.mark.parametrize( + "test_case", test_cases_extract_state_durations, ids=lambda tc: tc["description"] +) +def test_extract_state_durations(test_case: dict[str, Any]) -> None: + """Test the extract_state_durations function.""" + all_states, alert_records = extract_state_durations( + test_case["samples"], test_case["alert_states"], test_case["include_warnings"] + ) + + expected_all_states = test_case["expected_all_states"] + expected_alert_records = test_case["expected_alert_records"] + + all_states_sorted = sorted( + all_states, + key=lambda x: ( + x["microgrid_id"], + x["component_id"], + x["state_type"], + x["start_time"], + ), + ) + expected_all_states_sorted = sorted( + expected_all_states, + key=lambda x: ( + x["microgrid_id"], + x["component_id"], + x["state_type"], + x["start_time"], + ), + ) + + alert_records_sorted = sorted( + alert_records, + key=lambda x: ( + x["microgrid_id"], + x["component_id"], + x["state_type"], + x["start_time"], + ), + ) + expected_alert_records_sorted = sorted( + expected_alert_records, + key=lambda x: ( + x["microgrid_id"], + x["component_id"], + x["state_type"], + x["start_time"], + ), + ) + + assert all_states_sorted == expected_all_states_sorted + assert alert_records_sorted == expected_alert_records_sorted def test_frequenz_reporting_succeeds() -> None: # TODO(cookiecutter): Remove