diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1471b48..1616897 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,7 @@ ## Summary - +- Code relevant to fetching data and extracting state durations and alert records has been moved to another repo (`frequenz-lib-notebooks`) and has therefore been deleted from this repo. ## Upgrading diff --git a/src/frequenz/reporting/_reporting.py b/src/frequenz/reporting/_reporting.py index 7c14943..035c117 100644 --- a/src/frequenz/reporting/_reporting.py +++ b/src/frequenz/reporting/_reporting.py @@ -5,12 +5,9 @@ from collections import namedtuple from datetime import datetime, timedelta -from itertools import groupby -from typing import Any from frequenz.client.common.metric import Metric from frequenz.client.reporting import ReportingApiClient -from frequenz.client.reporting._types import MetricSample CumulativeEnergy = namedtuple( "CumulativeEnergy", ["start_time", "end_time", "consumption", "production"] @@ -145,218 +142,3 @@ async def cumulative_energy( consumption=consumption, production=production, ) - - -# pylint: disable-next=too-many-arguments -async def fetch_and_extract_state_durations( - *, - client: ReportingApiClient, - microgrid_components: list[tuple[int, list[int]]], - metrics: list[Metric], - start_time: datetime, - end_time: datetime, - resampling_period: timedelta | None, - alert_states: list[int], - include_warnings: bool = True, -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - """Fetch data using the Reporting API and extract state durations and alert records. - - Args: - client: The client used to fetch the metric samples from the Reporting API. - microgrid_components: List of tuples where each tuple contains microgrid - ID and corresponding component IDs. - metrics: List of metric names. - NOTE: The service will support requesting states without metrics in - the future and this argument will be removed. - start_time: The start date and time for the period. - end_time: The end date and time for the period. - resampling_period: The period for resampling the data. If None, data - will be returned in its original resolution - alert_states: List of component state values that should trigger an alert. - include_warnings: Whether to include warning state values in the alert - records. - - Returns: - A tuple containing two lists: - - all_states: Contains all state records including start and end times. - - alert_records: Contains filtered records matching the alert criteria. - """ - samples = await _fetch_component_data( - client=client, - microgrid_components=microgrid_components, - metrics=metrics, - start_time=start_time, - end_time=end_time, - resampling_period=resampling_period, - include_states=True, - include_bounds=False, - ) - - all_states, alert_records = extract_state_durations( - samples, alert_states, include_warnings - ) - return all_states, alert_records - - -def extract_state_durations( - samples: list[MetricSample], - alert_states: list[int], - include_warnings: bool = True, -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - """ - Extract state durations and alert records based on state transitions. - - Args: - samples: List of MetricSample instances containing the reporting data. - alert_states: List of component state values that should trigger an alert. - Component error codes are reported by default. - include_warnings: Whether to include warning state values in the alert records. - - Returns: - A tuple containing two lists: - - all_states: Contains all state records including start and end times. - - alert_records: Contains filtered records matching the alert criteria. - """ - alert_metrics = ["warning", "error"] if include_warnings else ["error"] - state_metrics = ["state"] + alert_metrics - filtered_samples = sorted( - (s for s in samples if s.metric in state_metrics), - key=lambda s: (s.microgrid_id, s.component_id, s.metric, s.timestamp), - ) - - if not filtered_samples: - return [], [] - - # Group samples by (microgrid_id, component_id, metric) - all_states = [] - for key, group in groupby( - filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric) - ): - states = _process_group_samples(key, list(group)) - all_states.extend(states) - - all_states.sort( - key=lambda x: (x["microgrid_id"], x["component_id"], x["start_time"]) - ) - - alert_records = _filter_alerts(all_states, alert_states, alert_metrics) - return all_states, alert_records - - -def _process_group_samples( - key: tuple[int, int, str], - group_samples: list["MetricSample"], -) -> list[dict[str, Any]]: - """Process samples for a single group to extract state durations. - - Args: - key: Tuple containing microgrid ID, component ID, and metric. - group_samples: List of samples for the group. - - Returns: - List of state records. - """ - mid, cid, metric = key - state_records = [] - current_state_value = None - start_time = None - - for sample in group_samples: - if current_state_value != sample.value: - # Close previous state run - if current_state_value is not None: - state_records.append( - { - "microgrid_id": mid, - "component_id": cid, - "state_type": metric, - "state_value": current_state_value, - "start_time": start_time, - "end_time": sample.timestamp, - } - ) - # Start new state run - current_state_value = sample.value - start_time = sample.timestamp - - # Close the last state run - state_records.append( - { - "microgrid_id": mid, - "component_id": cid, - "state_type": metric, - "state_value": current_state_value, - "start_time": start_time, - "end_time": None, - } - ) - - return state_records - - -def _filter_alerts( - all_states: list[dict[str, Any]], - alert_states: list[int], - alert_metrics: list[str], -) -> list[dict[str, Any]]: - """Identify alert records from all states. - - Args: - all_states: List of all state records. - alert_states: List of component state values that should trigger an alert. - alert_metrics: List of metric names that should trigger an alert. - - Returns: - List of alert records. - """ - return [ - state - for state in all_states - if ( - (state["state_type"] == "state" and state["state_value"] in alert_states) - or (state["state_type"] in alert_metrics) - ) - ] - - -# pylint: disable-next=too-many-arguments -async def _fetch_component_data( - *, - client: ReportingApiClient, - microgrid_components: list[tuple[int, list[int]]], - metrics: list[Metric], - start_time: datetime, - end_time: datetime, - resampling_period: timedelta | None, - include_states: bool = False, - include_bounds: bool = False, -) -> list[MetricSample]: - """Fetch component data from the Reporting API. - - Args: - client: The client used to fetch the metric samples from the Reporting API. - microgrid_components: List of tuples where each tuple contains - microgrid ID and corresponding component IDs. - metrics: List of metric names. - start_time: The start date and time for the period. - end_time: The end date and time for the period. - resampling_period: The period for resampling the data. If None, data - will be returned in its original resolution - include_states: Whether to include the state data. - include_bounds: Whether to include the bound data. - - Returns: - List of MetricSample instances containing the reporting data. - """ - return [ - sample - async for sample in client.receive_microgrid_components_data( - microgrid_components=microgrid_components, - metrics=metrics, - start_time=start_time, - end_time=end_time, - resampling_period=resampling_period, - include_states=include_states, - include_bounds=include_bounds, - ) - ] diff --git a/tests/test_frequenz_reporting.py b/tests/test_frequenz_reporting.py index f05f53b..a76f895 100644 --- a/tests/test_frequenz_reporting.py +++ b/tests/test_frequenz_reporting.py @@ -2,202 +2,9 @@ # Copyright © 2024 Frequenz Energy-as-a-Service GmbH """Tests for the frequenz.reporting package.""" -from datetime import datetime -from typing import Any - import pytest -from frequenz.client.reporting._types import MetricSample from frequenz.reporting import delete_me -from frequenz.reporting._reporting import extract_state_durations - -test_cases_extract_state_durations = [ - { - "description": "Empty samples", - "samples": [], - "alert_states": [1, 2], - "include_warnings": True, - "expected_all_states": [], - "expected_alert_records": [], - }, - { - "description": "No matching metrics", - "samples": [ - MetricSample(datetime(2023, 1, 1, 0, 0), 1, "101", "temperature", 25), - MetricSample(datetime(2023, 1, 1, 1, 0), 1, "101", "humidity", 60), - ], - "alert_states": [1], - "include_warnings": True, - "expected_all_states": [], - "expected_alert_records": [], - }, - { - "description": "Single state change", - "samples": [ - MetricSample(datetime(2023, 1, 1, 0, 0), 1, "101", "state", 0), - MetricSample(datetime(2023, 1, 1, 1, 0), 1, "101", "state", 1), - ], - "alert_states": [1], - "include_warnings": True, - "expected_all_states": [ - { - "microgrid_id": 1, - "component_id": "101", - "state_type": "state", - "state_value": 0, - "start_time": datetime(2023, 1, 1, 0, 0), - "end_time": datetime(2023, 1, 1, 1, 0), - }, - { - "microgrid_id": 1, - "component_id": "101", - "state_type": "state", - "state_value": 1, - "start_time": datetime(2023, 1, 1, 1, 0), - "end_time": None, - }, - ], - "expected_alert_records": [ - { - "microgrid_id": 1, - "component_id": "101", - "state_type": "state", - "state_value": 1, - "start_time": datetime(2023, 1, 1, 1, 0), - "end_time": None, - }, - ], - }, - { - "description": "Warnings and errors included", - "samples": [ - MetricSample(datetime(2023, 1, 2, 0, 0), 3, "303", "state", 0), - MetricSample(datetime(2023, 1, 2, 0, 30), 3, "303", "warning", 10), - MetricSample(datetime(2023, 1, 2, 1, 0), 3, "303", "state", 1), - MetricSample(datetime(2023, 1, 2, 1, 30), 3, "303", "error", 20), - ], - "alert_states": [1], - "include_warnings": True, - "expected_all_states": [ - # State transitions - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "state", - "state_value": 0, - "start_time": datetime(2023, 1, 2, 0, 0), - "end_time": datetime(2023, 1, 2, 1, 0), - }, - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "state", - "state_value": 1, - "start_time": datetime(2023, 1, 2, 1, 0), - "end_time": None, - }, - # Warning transitions - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "warning", - "state_value": 10, - "start_time": datetime(2023, 1, 2, 0, 30), - "end_time": None, - }, - # Error transitions - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "error", - "state_value": 20, - "start_time": datetime(2023, 1, 2, 1, 30), - "end_time": None, - }, - ], - "expected_alert_records": [ - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "warning", - "state_value": 10, - "start_time": datetime(2023, 1, 2, 0, 30), - "end_time": None, - }, - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "error", - "state_value": 20, - "start_time": datetime(2023, 1, 2, 1, 30), - "end_time": None, - }, - # State alert - { - "microgrid_id": 3, - "component_id": "303", - "state_type": "state", - "state_value": 1, - "start_time": datetime(2023, 1, 2, 1, 0), - "end_time": None, - }, - ], - }, -] - - -@pytest.mark.parametrize( - "test_case", test_cases_extract_state_durations, ids=lambda tc: tc["description"] -) -def test_extract_state_durations(test_case: dict[str, Any]) -> None: - """Test the extract_state_durations function.""" - all_states, alert_records = extract_state_durations( - test_case["samples"], test_case["alert_states"], test_case["include_warnings"] - ) - - expected_all_states = test_case["expected_all_states"] - expected_alert_records = test_case["expected_alert_records"] - - all_states_sorted = sorted( - all_states, - key=lambda x: ( - x["microgrid_id"], - x["component_id"], - x["state_type"], - x["start_time"], - ), - ) - expected_all_states_sorted = sorted( - expected_all_states, - key=lambda x: ( - x["microgrid_id"], - x["component_id"], - x["state_type"], - x["start_time"], - ), - ) - - alert_records_sorted = sorted( - alert_records, - key=lambda x: ( - x["microgrid_id"], - x["component_id"], - x["state_type"], - x["start_time"], - ), - ) - expected_alert_records_sorted = sorted( - expected_alert_records, - key=lambda x: ( - x["microgrid_id"], - x["component_id"], - x["state_type"], - x["start_time"], - ), - ) - - assert all_states_sorted == expected_all_states_sorted - assert alert_records_sorted == expected_alert_records_sorted def test_frequenz_reporting_succeeds() -> None: # TODO(cookiecutter): Remove