From decf7b08cd4fa3ef61892e50fde7813d87668b68 Mon Sep 17 00:00:00 2001 From: Flora Date: Thu, 24 Apr 2025 14:51:26 +0200 Subject: [PATCH] Add sensor ID Signed-off-by: Flora --- RELEASE_NOTES.md | 2 +- src/frequenz/client/reporting/_client.py | 336 ++++++++++++++++++----- tests/test_client_reporting.py | 4 +- 3 files changed, 274 insertions(+), 68 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f80f12e..53a4726 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +* Add sensor endpoint to client. ## Bug Fixes diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 3bc8737..ac3b1c1 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -3,8 +3,8 @@ """Client for requests to the Reporting API.""" -from collections import namedtuple -from collections.abc import AsyncIterator, Iterable, Iterator +from collections import abc, namedtuple +from collections.abc import AsyncIterator, Iterator from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, AsyncIterable, cast @@ -13,6 +13,9 @@ from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( MicrogridComponentIDs as PBMicrogridComponentIDs, ) +from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( + MicrogridSensorIDs as PBMicrogridSensorIDs, +) from frequenz.api.reporting.v1.reporting_pb2 import ( AggregationConfig as PBAggregationConfig, ) @@ -32,6 +35,12 @@ from frequenz.api.reporting.v1.reporting_pb2 import ( ReceiveMicrogridComponentsDataStreamResponse as PBReceiveMicrogridComponentsDataStreamResponse, ) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveMicrogridSensorsDataStreamRequest as PBReceiveMicrogridSensorsDataStreamRequest, +) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveMicrogridSensorsDataStreamResponse as PBReceiveMicrogridSensorsDataStreamResponse, +) from frequenz.api.reporting.v1.reporting_pb2 import ( ResamplingOptions as PBResamplingOptions, ) @@ -56,11 +65,19 @@ @dataclass(frozen=True) -class ComponentsDataBatch: - """A batch of components data for a single microgrid returned by the Reporting service.""" +class GenericDataBatch: + """Base class for batches of microgrid data (components or sensors). - _data_pb: PBReceiveMicrogridComponentsDataStreamResponse - """The underlying protobuf message.""" + This class serves as a base for handling batches of data related to microgrid + components or sensors. It manages the received protocol buffer (PB) data, + provides access to relevant items via specific attributes, and includes + functionality to work with bounds if applicable. + """ + + _data_pb: Any + id_attr: str + items_attr: str + has_bounds: bool = False def is_empty(self) -> bool: """Check if the batch contains valid data. @@ -68,16 +85,16 @@ def is_empty(self) -> bool: Returns: True if the batch contains no valid data. """ - if not self._data_pb.components: - return True - if ( - not self._data_pb.components[0].metric_samples - and not self._data_pb.components[0].states - ): + items = getattr(self._data_pb, self.items_attr, []) + if not items: return True + for item in items: + if not getattr(item, "metric_samples", []) and not getattr( + item, "states", [] + ): + return True return False - # pylint: disable=too-many-locals def __iter__(self) -> Iterator[MetricSample]: """Get generator that iterates over all values in the batch. @@ -93,65 +110,71 @@ def __iter__(self) -> Iterator[MetricSample]: * metric: The metric name. * value: The metric value. """ - data = self._data_pb - mid = data.microgrid_id - for cdata in data.components: - cid = cdata.component_id - for msample in cdata.metric_samples: - ts = msample.sampled_at.ToDatetime() - # Ensure tz-aware timestamps, - # as the API returns tz-naive UTC timestamps - ts = ts.replace(tzinfo=timezone.utc) - met = Metric.from_proto(msample.metric).name + mid = self._data_pb.microgrid_id + items = getattr(self._data_pb, self.items_attr) + + for item in items: + cid = getattr(item, self.id_attr) + for sample in getattr(item, "metric_samples", []): + ts = sample.sampled_at.ToDatetime().replace(tzinfo=timezone.utc) + met = Metric.from_proto(sample.metric).name value = ( - msample.value.simple_metric.value - if msample.value.simple_metric + sample.value.simple_metric.value + if sample.value.HasField("simple_metric") else None ) - yield MetricSample( - timestamp=ts, - microgrid_id=mid, - component_id=cid, - metric=met, - value=value, - ) - for i, bound in enumerate(msample.bounds): - if bound.lower: - yield MetricSample( - timestamp=ts, - microgrid_id=mid, - component_id=cid, - metric=f"{met}_bound_{i}_lower", - value=bound.lower, - ) - if bound.upper: - yield MetricSample( - timestamp=ts, - microgrid_id=mid, - component_id=cid, - metric=f"{met}_bound_{i}_upper", - value=bound.upper, - ) - for state in cdata.states: - ts = state.sampled_at.ToDatetime() + yield MetricSample(ts, mid, cid, met, value) + + if self.has_bounds: + for i, bound in enumerate(sample.bounds): + if bound.lower: + yield MetricSample( + ts, mid, cid, f"{met}_bound_{i}_lower", bound.lower + ) + if bound.upper: + yield MetricSample( + ts, mid, cid, f"{met}_bound_{i}_upper", bound.upper + ) + + for state in getattr(item, "states", []): + ts = state.sampled_at.ToDatetime().replace(tzinfo=timezone.utc) for name, category in { - "state": state.states, - "warning": state.warnings, - "error": state.errors, + "state": getattr(state, "states", []), + "warning": getattr(state, "warnings", []), + "error": getattr(state, "errors", []), }.items(): - # Skip if the category is not present - if not isinstance(category, Iterable): + if not isinstance(category, abc.Iterable): continue - # Each category can have multiple states - # that are provided as individual samples for s in category: - yield MetricSample( - timestamp=ts, - microgrid_id=mid, - component_id=cid, - metric=name, - value=s, - ) + yield MetricSample(ts, mid, cid, name, s) + + +@dataclass(frozen=True) +class ComponentsDataBatch(GenericDataBatch): + """Batch of microgrid components data.""" + + def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse): + """Initialize the ComponentsDataBatch. + + Args: + data_pb: The underlying protobuf message. + """ + super().__init__( + data_pb, id_attr="component_id", items_attr="components", has_bounds=True + ) + + +@dataclass(frozen=True) +class SensorsDataBatch(GenericDataBatch): + """Batch of microgrid sensors data.""" + + def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse): + """Initialize the SensorsDataBatch. + + Args: + data_pb: The underlying protobuf message. + """ + super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors") @dataclass(frozen=True) @@ -407,6 +430,189 @@ async def stream_method() -> ( async for data in receiver: yield data + # pylint: disable=too-many-arguments + async def receive_single_sensor_data( + self, + *, + microgrid_id: int, + sensor_id: int, + metrics: Metric | list[Metric], + start_dt: datetime | None, + end_dt: datetime | None, + resampling_period: timedelta | None, + include_states: bool = False, + ) -> AsyncIterator[MetricSample]: + """Iterate over the data for a single sensor and metric. + + Args: + microgrid_id: The microgrid ID. + sensor_id: The sensor ID. + metrics: The metric name or list of metric names. + start_dt: start datetime, if None, the earliest available data will be used. + end_dt: end datetime, if None starts streaming indefinitely from start_dt. + resampling_period: The period for resampling the data. + include_states: Whether to include the state data. + + Yields: + A named tuple with the following fields: + * timestamp: The timestamp of the metric sample. + * value: The metric value. + """ + receiver = await self._receive_microgrid_sensors_data_batch( + microgrid_sensors=[(microgrid_id, [sensor_id])], + metrics=[metrics] if isinstance(metrics, Metric) else metrics, + start_dt=start_dt, + end_dt=end_dt, + resampling_period=resampling_period, + include_states=include_states, + ) + async for batch in receiver: + for entry in batch: + yield entry + + # pylint: disable=too-many-arguments + async def receive_microgrid_sensors_data( + self, + *, + microgrid_sensors: list[tuple[int, list[int]]], + metrics: Metric | list[Metric], + start_dt: datetime | None, + end_dt: datetime | None, + resampling_period: timedelta | None, + include_states: bool = False, + ) -> AsyncIterator[MetricSample]: + """Iterate over the data for multiple sensors in a microgrid. + + Args: + microgrid_sensors: List of tuples where each tuple contains + microgrid ID and corresponding sensor IDs. + metrics: The metric name or list of metric names. + start_dt: start datetime, if None, the earliest available data will be used. + end_dt: end datetime, if None starts streaming indefinitely from start_dt. + resampling_period: The period for resampling the data. + include_states: Whether to include the state data. + + Yields: + A named tuple with the following fields: + * microgrid_id: The microgrid ID. + * sensor_id: The sensor ID. + * metric: The metric name. + * timestamp: The timestamp of the metric sample. + * value: The metric value. + """ + receiver = await self._receive_microgrid_sensors_data_batch( + microgrid_sensors=microgrid_sensors, + metrics=[metrics] if isinstance(metrics, Metric) else metrics, + start_dt=start_dt, + end_dt=end_dt, + resampling_period=resampling_period, + include_states=include_states, + ) + async for batch in receiver: + for entry in batch: + yield entry + + # pylint: disable=too-many-arguments + # pylint: disable=too-many-locals + async def _receive_microgrid_sensors_data_batch( + self, + *, + microgrid_sensors: list[tuple[int, list[int]]], + metrics: list[Metric], + start_dt: datetime | None, + end_dt: datetime | None, + resampling_period: timedelta | None, + include_states: bool = False, + ) -> AsyncIterator[SensorsDataBatch]: + """Iterate over the sensor data batches in the stream using GrpcStreamBroadcaster. + + Args: + microgrid_sensors: A list of tuples of microgrid IDs and sensor IDs. + metrics: A list of metrics. + start_dt: start datetime, if None, the earliest available data will be used. + end_dt: end datetime, if None starts streaming indefinitely from start_dt. + resampling_period: The period for resampling the data. + include_states: Whether to include the state data. + + Returns: + A SensorDataBatch object of microgrid sensors data. + """ + microgrid_sensors_pb = [ + PBMicrogridSensorIDs(microgrid_id=mid, sensor_ids=sids) + for mid, sids in microgrid_sensors + ] + + def dt2ts(dt: datetime) -> PBTimestamp: + ts = PBTimestamp() + ts.FromDatetime(dt) + return ts + + time_filter = PBTimeFilter( + start=dt2ts(start_dt) if start_dt else None, + end=dt2ts(end_dt) if end_dt else None, + ) + + incl_states = ( + PBFilterOption.FILTER_OPTION_INCLUDE + if include_states + else PBFilterOption.FILTER_OPTION_EXCLUDE + ) + include_options = PBReceiveMicrogridSensorsDataStreamRequest.IncludeOptions( + states=incl_states, + ) + + stream_filter = PBReceiveMicrogridSensorsDataStreamRequest.StreamFilter( + time_filter=time_filter, + resampling_options=PBResamplingOptions( + resolution=( + round(resampling_period.total_seconds()) + if resampling_period is not None + else None + ) + ), + include_options=include_options, + ) + + metric_conns_pb = [ + PBMetricConnections( + metric=metric.to_proto(), + connections=[], + ) + for metric in metrics + ] + + request = PBReceiveMicrogridSensorsDataStreamRequest( + microgrid_sensors=microgrid_sensors_pb, + metrics=metric_conns_pb, + filter=stream_filter, + ) + + def transform_response( + response: PBReceiveMicrogridSensorsDataStreamResponse, + ) -> SensorsDataBatch: + return SensorsDataBatch(response) + + async def stream_method() -> ( + AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse] + ): + call_iterator = self.stub.ReceiveMicrogridSensorsDataStream( + request, metadata=self._metadata + ) + async for response in cast( + AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse], + call_iterator, + ): + yield response + + broadcaster = GrpcStreamBroadcaster( + stream_name="microgrid-sensors-data-stream", + stream_method=stream_method, + transform=transform_response, + retry_strategy=None, + ) + + return broadcaster.new_receiver() + async def receive_aggregated_data( self, *, diff --git a/tests/test_client_reporting.py b/tests/test_client_reporting.py index 6dd58c9..299ba7f 100644 --- a/tests/test_client_reporting.py +++ b/tests/test_client_reporting.py @@ -40,7 +40,7 @@ def test_components_data_batch_is_empty_true() -> None: """Test that the is_empty method returns True when the page is empty.""" data_pb = MagicMock() data_pb.components = [] - batch = ComponentsDataBatch(_data_pb=data_pb) + batch = ComponentsDataBatch(data_pb=data_pb) assert batch.is_empty() is True @@ -49,5 +49,5 @@ def test_components_data_batch_is_empty_false() -> None: data_pb = MagicMock() data_pb.components = [MagicMock()] data_pb.components[0].metric_samples = [MagicMock()] - batch = ComponentsDataBatch(_data_pb=data_pb) + batch = ComponentsDataBatch(data_pb=data_pb) assert batch.is_empty() is False