diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1471b48..554f97e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,7 @@ ## Upgrading - +- Refactored state extraction to return `StateRecord` objects with descriptive enum names (e.g., "ACTIVE", "FAULT") instead of raw integers for `state_value`, improving clarity and downstream usability. ## New Features diff --git a/pyproject.toml b/pyproject.toml index 3a45cba..2b6182f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ requires-python = ">= 3.11, < 4" dependencies = [ "typing-extensions >= 4.6.1, < 5", "frequenz-client-reporting >= 0.18.0, < 0.19.0", - "frequenz-client-common >= 0.3.0, < 0.4", + "frequenz-client-common >= 0.3.3, < 0.4", ] dynamic = ["version"] diff --git a/src/frequenz/reporting/_reporting.py b/src/frequenz/reporting/_reporting.py index 7c14943..19ba2d1 100644 --- a/src/frequenz/reporting/_reporting.py +++ b/src/frequenz/reporting/_reporting.py @@ -2,20 +2,54 @@ # Copyright © 2024 Frequenz Energy-as-a-Service GmbH """A highlevel interface for the reporting API.""" - +import enum +import logging from collections import namedtuple from datetime import datetime, timedelta from itertools import groupby -from typing import Any +from typing import NamedTuple +from frequenz.client.common.enum_proto import enum_from_proto from frequenz.client.common.metric import Metric +from frequenz.client.common.microgrid.components import ( + ComponentErrorCode, + ComponentStateCode, +) from frequenz.client.reporting import ReportingApiClient from frequenz.client.reporting._types import MetricSample +_logger = logging.getLogger(__name__) + CumulativeEnergy = namedtuple( "CumulativeEnergy", ["start_time", "end_time", "consumption", "production"] ) -"""Type for cumulative energy consumption and production over a specified time.""" + + +class StateRecord(NamedTuple): + """A record of a component state change. + + A named tuple was chosen to allow safe access to the fields while keeping + the simplicity of a tuple. This data type can be easily used to create a + numpy array or a pandas DataFrame. + """ + + microgrid_id: int + """The ID of the microgrid.""" + + component_id: str + """The ID of the component within the microgrid.""" + + state_type: str + """The type of the state (e.g., "state", "warning", "error").""" + + state_value: str + """The value of the state (e.g., "ON", "OFF", "ERROR" etc.).""" + + start_time: datetime | None + """The start time of the state change.""" + + end_time: datetime | None + """The end time of the state change.""" # pylint: disable-next=too-many-arguments @@ -156,9 +190,9 @@ async def fetch_and_extract_state_durations( start_time: datetime, end_time: datetime, resampling_period: timedelta | None, - alert_states: list[int], + alert_states: list[ComponentStateCode], include_warnings: bool = True, -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: +) -> tuple[list[StateRecord], list[StateRecord]]: """Fetch data using the Reporting API and extract state durations and alert records. Args: @@ -172,14 +206,13 @@ async def fetch_and_extract_state_durations( end_time: The end date and time for the period. resampling_period: The period for resampling the data. If None, data will be returned in its original resolution - alert_states: List of component state values that should trigger an alert. - include_warnings: Whether to include warning state values in the alert - records. + alert_states: List of ComponentStateCode names that should trigger an alert. + include_warnings: Whether to include warning states in the alert records. Returns: - A tuple containing two lists: - - all_states: Contains all state records including start and end times. - - alert_records: Contains filtered records matching the alert criteria. + A tuple containing: + - A list of StateRecord instances representing the state changes. + - A list of StateRecord instances that match the alert criteria. """ samples = await _fetch_component_data( client=client, @@ -192,30 +225,23 @@ async def fetch_and_extract_state_durations( include_bounds=False, ) - all_states, alert_records = extract_state_durations( - samples, alert_states, include_warnings - ) + all_states = _extract_state_records(samples, include_warnings) + alert_records = _filter_alerts(all_states, alert_states, include_warnings) return all_states, alert_records -def extract_state_durations( +def _extract_state_records( samples: list[MetricSample], - alert_states: list[int], - include_warnings: bool = True, -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - """ - Extract state durations and alert records based on state transitions. + include_warnings: bool, +) -> list[StateRecord]: + """Extract state records from the provided samples. Args: samples: List of MetricSample instances containing the reporting data. - alert_states: List of component state values that should trigger an alert. - Component error codes are reported by default. - include_warnings: Whether to include warning state values in the alert records. + include_warnings: Whether to include warning states in the alert records. Returns: - A tuple containing two lists: - - all_states: Contains all state records including start and end times. - - alert_records: Contains filtered records matching the alert criteria. + A list of StateRecord instances representing the state changes. """ alert_metrics = ["warning", "error"] if include_warnings else ["error"] state_metrics = ["state"] + alert_metrics @@ -225,28 +251,23 @@ def extract_state_durations( ) if not filtered_samples: - return [], [] + return [] # Group samples by (microgrid_id, component_id, metric) all_states = [] for key, group in groupby( filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric) ): - states = _process_group_samples(key, list(group)) - all_states.extend(states) + all_states.extend(_process_sample_group(key, list(group))) - all_states.sort( - key=lambda x: (x["microgrid_id"], x["component_id"], x["start_time"]) - ) - - alert_records = _filter_alerts(all_states, alert_states, alert_metrics) - return all_states, alert_records + all_states.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time)) + return all_states -def _process_group_samples( - key: tuple[int, int, str], - group_samples: list["MetricSample"], -) -> list[dict[str, Any]]: +def _process_sample_group( + key: tuple[int, str, str], + group_samples: list[MetricSample], +) -> list[StateRecord]: """Process samples for a single group to extract state durations. Args: @@ -254,26 +275,30 @@ def _process_group_samples( group_samples: List of samples for the group. Returns: - List of state records. + A list of StateRecord instances representing the state changes. """ mid, cid, metric = key + if not group_samples: + return [] + state_records = [] - current_state_value = None - start_time = None + current_state_value: float | None = None + start_time: datetime | None = None + enum_class = ComponentStateCode if metric == "state" else ComponentErrorCode for sample in group_samples: if current_state_value != sample.value: # Close previous state run if current_state_value is not None: state_records.append( - { - "microgrid_id": mid, - "component_id": cid, - "state_type": metric, - "state_value": current_state_value, - "start_time": start_time, - "end_time": sample.timestamp, - } + StateRecord( + microgrid_id=mid, + component_id=cid, + state_type=metric, + state_value=_resolve_enum_name(current_state_value, enum_class), + start_time=start_time, + end_time=sample.timestamp, + ) ) # Start new state run current_state_value = sample.value @@ -281,40 +306,67 @@ def _process_group_samples( # Close the last state run state_records.append( - { - "microgrid_id": mid, - "component_id": cid, - "state_type": metric, - "state_value": current_state_value, - "start_time": start_time, - "end_time": None, - } + StateRecord( + microgrid_id=mid, + component_id=cid, + state_type=metric, + state_value=( + _resolve_enum_name(current_state_value, enum_class) + if current_state_value is not None + else "" + ), + start_time=start_time, + end_time=None, + ) ) return state_records +def _resolve_enum_name(value: float, enum_class: type[enum.Enum]) -> str: + """Resolve the name of an enum member from its integer value. + + Args: + value: The integer value of the enum. + enum_class: The enum class to convert the value to. + + Returns: + The name of the enum member if it exists, otherwise the value as a string. + """ + result = enum_from_proto(int(value), enum_class) + if isinstance(result, int): + _logger.warning( + "Unknown enum value %s for %s, returning the integer value as a string.", + value, + enum_class.__name__, + ) + return str(result) + return result.name + + def _filter_alerts( - all_states: list[dict[str, Any]], - alert_states: list[int], - alert_metrics: list[str], -) -> list[dict[str, Any]]: + all_states: list[StateRecord], + alert_states: list[ComponentStateCode], + include_warnings: bool, +) -> list[StateRecord]: """Identify alert records from all states. Args: all_states: List of all state records. - alert_states: List of component state values that should trigger an alert. - alert_metrics: List of metric names that should trigger an alert. + alert_states: List of ComponentStateCode names that should trigger an alert. + include_warnings: Whether to include warning states in the alert records. Returns: - List of alert records. + A list of StateRecord instances that match the alert criteria. """ + alert_metrics = ["warning", "error"] if include_warnings else ["error"] + _alert_state_names = {state.name for state in alert_states} return [ state for state in all_states if ( - (state["state_type"] == "state" and state["state_value"] in alert_states) - or (state["state_type"] in alert_metrics) + (state.state_type == "state" and state.state_value in _alert_state_names) + or (state.state_type in alert_metrics) ) ] diff --git a/tests/test_frequenz_reporting.py b/tests/test_frequenz_reporting.py index f05f53b..0b50a83 100644 --- a/tests/test_frequenz_reporting.py +++ b/tests/test_frequenz_reporting.py @@ -6,16 +6,26 @@ from typing import Any import pytest +from frequenz.client.common.enum_proto import enum_from_proto +from frequenz.client.common.microgrid.components import ( + ComponentErrorCode, + ComponentStateCode, +) from frequenz.client.reporting._types import MetricSample -from frequenz.reporting import delete_me -from frequenz.reporting._reporting import extract_state_durations +from frequenz.reporting._reporting import ( + _extract_state_records, + _filter_alerts, + _resolve_enum_name, +) test_cases_extract_state_durations = [ { "description": "Empty samples", "samples": [], - "alert_states": [1, 2], + "alert_states": [ + enum_from_proto(1, ComponentStateCode), + ], "include_warnings": True, "expected_all_states": [], "expected_alert_records": [], @@ -26,7 +36,7 @@ MetricSample(datetime(2023, 1, 1, 0, 0), 1, "101", "temperature", 25), MetricSample(datetime(2023, 1, 1, 1, 0), 1, "101", "humidity", 60), ], - "alert_states": [1], + "alert_states": [enum_from_proto(1, ComponentStateCode)], "include_warnings": True, "expected_all_states": [], "expected_alert_records": [], @@ -37,14 +47,14 @@ MetricSample(datetime(2023, 1, 1, 0, 0), 1, "101", "state", 0), MetricSample(datetime(2023, 1, 1, 1, 0), 1, "101", "state", 1), ], - "alert_states": [1], + "alert_states": [enum_from_proto(1, ComponentStateCode)], "include_warnings": True, "expected_all_states": [ { "microgrid_id": 1, "component_id": "101", "state_type": "state", - "state_value": 0, + "state_value": _resolve_enum_name(0, ComponentStateCode), "start_time": datetime(2023, 1, 1, 0, 0), "end_time": datetime(2023, 1, 1, 1, 0), }, @@ -52,7 +62,7 @@ "microgrid_id": 1, "component_id": "101", "state_type": "state", - "state_value": 1, + "state_value": _resolve_enum_name(1, ComponentStateCode), "start_time": datetime(2023, 1, 1, 1, 0), "end_time": None, }, @@ -62,7 +72,7 @@ "microgrid_id": 1, "component_id": "101", "state_type": "state", - "state_value": 1, + "state_value": _resolve_enum_name(1, ComponentStateCode), "start_time": datetime(2023, 1, 1, 1, 0), "end_time": None, }, @@ -76,7 +86,7 @@ MetricSample(datetime(2023, 1, 2, 1, 0), 3, "303", "state", 1), MetricSample(datetime(2023, 1, 2, 1, 30), 3, "303", "error", 20), ], - "alert_states": [1], + "alert_states": [enum_from_proto(1, ComponentStateCode)], "include_warnings": True, "expected_all_states": [ # State transitions @@ -84,7 +94,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "state", - "state_value": 0, + "state_value": _resolve_enum_name(0, ComponentStateCode), "start_time": datetime(2023, 1, 2, 0, 0), "end_time": datetime(2023, 1, 2, 1, 0), }, @@ -92,7 +102,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "state", - "state_value": 1, + "state_value": _resolve_enum_name(1, ComponentStateCode), "start_time": datetime(2023, 1, 2, 1, 0), "end_time": None, }, @@ -101,7 +111,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "warning", - "state_value": 10, + "state_value": _resolve_enum_name(10, ComponentErrorCode), "start_time": datetime(2023, 1, 2, 0, 30), "end_time": None, }, @@ -110,7 +120,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "error", - "state_value": 20, + "state_value": _resolve_enum_name(20, ComponentErrorCode), "start_time": datetime(2023, 1, 2, 1, 30), "end_time": None, }, @@ -120,7 +130,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "warning", - "state_value": 10, + "state_value": _resolve_enum_name(10, ComponentErrorCode), "start_time": datetime(2023, 1, 2, 0, 30), "end_time": None, }, @@ -128,7 +138,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "error", - "state_value": 20, + "state_value": _resolve_enum_name(20, ComponentErrorCode), "start_time": datetime(2023, 1, 2, 1, 30), "end_time": None, }, @@ -137,7 +147,7 @@ "microgrid_id": 3, "component_id": "303", "state_type": "state", - "state_value": 1, + "state_value": _resolve_enum_name(1, ComponentStateCode), "start_time": datetime(2023, 1, 2, 1, 0), "end_time": None, }, @@ -149,11 +159,19 @@ @pytest.mark.parametrize( "test_case", test_cases_extract_state_durations, ids=lambda tc: tc["description"] ) -def test_extract_state_durations(test_case: dict[str, Any]) -> None: - """Test the extract_state_durations function.""" - all_states, alert_records = extract_state_durations( - test_case["samples"], test_case["alert_states"], test_case["include_warnings"] +def test_extract_and_filter_state_records(test_case: dict[str, Any]) -> None: + """Test extracting and filtering state records from samples.""" + _all_states = _extract_state_records( + test_case["samples"], test_case["include_warnings"] ) + print(test_case["alert_states"]) + _alert_records = _filter_alerts( + _all_states, + alert_states=test_case["alert_states"], + include_warnings=test_case["include_warnings"], + ) + all_states = [record._asdict() for record in _all_states] + alert_records = [record._asdict() for record in _alert_records] expected_all_states = test_case["expected_all_states"] expected_alert_records = test_case["expected_alert_records"] @@ -195,17 +213,5 @@ def test_extract_state_durations(test_case: dict[str, Any]) -> None: x["start_time"], ), ) - assert all_states_sorted == expected_all_states_sorted assert alert_records_sorted == expected_alert_records_sorted - - -def test_frequenz_reporting_succeeds() -> None: # TODO(cookiecutter): Remove - """Test that the delete_me function succeeds.""" - assert delete_me() is True - - -def test_frequenz_reporting_fails() -> None: # TODO(cookiecutter): Remove - """Test that the delete_me function fails.""" - with pytest.raises(RuntimeError, match="This function should be removed!"): - delete_me(blow_up=True)