diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1d7a602..fc5b4f2 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -7,6 +7,8 @@ ## Upgrading * Change 'start_dt' and 'end_dt' to 'start_time' and 'end_time' respectively. +* Rename 'list' to 'receive' in component data retrival functions +* Return the receiver directly in '_recieve_microgrid_components_data_batch' ## New Features diff --git a/src/frequenz/client/reporting/_batch_unroll_receiver.py b/src/frequenz/client/reporting/_batch_unroll_receiver.py new file mode 100644 index 0000000..0994b6e --- /dev/null +++ b/src/frequenz/client/reporting/_batch_unroll_receiver.py @@ -0,0 +1,74 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""A receiver that unrolls batches of data into individual samples.""" + +from collections.abc import Iterator + +from frequenz.channels import Receiver, ReceiverStoppedError +from typing_extensions import override + +from ._types import ComponentsDataBatch, MetricSample, SensorsDataBatch + + +class BatchUnrollReceiver(Receiver[MetricSample]): + """Receiver to unroll `ComponentsDataBatch`s into `MetricSample`s.""" + + def __init__( + self, stream: Receiver[ComponentsDataBatch | SensorsDataBatch] + ) -> None: + """Initialize the receiver. + + Args: + stream: The stream to receive batches from. + """ + self._stream: Receiver[ComponentsDataBatch | SensorsDataBatch] = stream + self._batch_iter: Iterator[MetricSample] | None = None + self._latest_sample: MetricSample | None = None + self._no_more_data: bool = False + + @override + async def ready(self) -> bool: + """Wait until the next `MetricSample` is ready.""" + # If ready is called multiple times, we should return the same result + # so we don't loose any data + if self._latest_sample is not None: + return True + + while True: + # If we have a batch iterator, try to get the next sample + if self._batch_iter is not None: + try: + self._latest_sample = next(self._batch_iter) + return True + # If the batch is done, set the batch iterator to None + except StopIteration: + self._batch_iter = None + + # If we don't have a batch iterator, try to get the next batch + try: + batch = await anext(self._stream) + self._batch_iter = iter(batch) + # If the stream is done, return False + except StopAsyncIteration: + self._no_more_data = True + return False + + @override + def consume(self) -> MetricSample: + """Consume the next `MetricSample`. + + Returns: + The next `MetricSample`. + + Raises: + ReceiverStoppedError: If the receiver is stopped. + RuntimeError: If the receiver is not ready. + """ + sample = self._latest_sample + if sample is None: + if self._no_more_data: + raise ReceiverStoppedError(self) + raise RuntimeError("consume called before ready") + self._latest_sample = None + return sample diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 87d593c..3d2d677 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -3,11 +3,9 @@ """Client for requests to the Reporting API.""" -from collections import abc, namedtuple -from collections.abc import AsyncIterator, Iterator -from dataclasses import dataclass -from datetime import datetime, timedelta, timezone -from typing import Any, AsyncIterable, cast +from collections.abc import AsyncIterable, AsyncIterator +from datetime import datetime, timedelta +from typing import cast # pylint: disable=no-name-in-module from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( @@ -46,6 +44,7 @@ ) from frequenz.api.reporting.v1.reporting_pb2 import TimeFilter as PBTimeFilter from frequenz.api.reporting.v1.reporting_pb2_grpc import ReportingStub +from frequenz.channels import Receiver from frequenz.client.base.channel import ChannelOptions from frequenz.client.base.client import BaseApiClient from frequenz.client.base.exception import ClientNotConnected @@ -53,148 +52,13 @@ from frequenz.client.common.metric import Metric from google.protobuf.timestamp_pb2 import Timestamp as PBTimestamp -MetricSample = namedtuple( - "MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"] +from ._batch_unroll_receiver import BatchUnrollReceiver +from ._types import ( + AggregatedMetric, + ComponentsDataBatch, + MetricSample, + SensorsDataBatch, ) -"""Type for a sample of a time series incl. metric type, microgrid and component ID - -A named tuple was chosen to allow safe access to the fields while keeping the -simplicity of a tuple. This data type can be easily used to create a numpy array -or a pandas DataFrame. -""" - - -@dataclass(frozen=True) -class GenericDataBatch: - """Base class for batches of microgrid data (components or sensors). - - This class serves as a base for handling batches of data related to microgrid - components or sensors. It manages the received protocol buffer (PB) data, - provides access to relevant items via specific attributes, and includes - functionality to work with bounds if applicable. - """ - - _data_pb: Any - id_attr: str - items_attr: str - has_bounds: bool = False - - def is_empty(self) -> bool: - """Check if the batch contains valid data. - - Returns: - True if the batch contains no valid data. - """ - items = getattr(self._data_pb, self.items_attr, []) - if not items: - return True - for item in items: - if not getattr(item, "metric_samples", []) and not getattr( - item, "states", [] - ): - return True - return False - - def __iter__(self) -> Iterator[MetricSample]: - """Get generator that iterates over all values in the batch. - - Note: So far only `SimpleMetricSample` in the `MetricSampleVariant` - message is supported. - - - Yields: - A named tuple with the following fields: - * timestamp: The timestamp of the metric sample. - * microgrid_id: The microgrid ID. - * component_id: The component ID. - * metric: The metric name. - * value: The metric value. - """ - mid = self._data_pb.microgrid_id - items = getattr(self._data_pb, self.items_attr) - - for item in items: - cid = getattr(item, self.id_attr) - for sample in getattr(item, "metric_samples", []): - ts = sample.sampled_at.ToDatetime().replace(tzinfo=timezone.utc) - met = Metric.from_proto(sample.metric).name - value = ( - sample.value.simple_metric.value - if sample.value.HasField("simple_metric") - else None - ) - yield MetricSample(ts, mid, cid, met, value) - - if self.has_bounds: - for i, bound in enumerate(sample.bounds): - if bound.lower: - yield MetricSample( - ts, mid, cid, f"{met}_bound_{i}_lower", bound.lower - ) - if bound.upper: - yield MetricSample( - ts, mid, cid, f"{met}_bound_{i}_upper", bound.upper - ) - - for state in getattr(item, "states", []): - ts = state.sampled_at.ToDatetime().replace(tzinfo=timezone.utc) - for name, category in { - "state": getattr(state, "states", []), - "warning": getattr(state, "warnings", []), - "error": getattr(state, "errors", []), - }.items(): - if not isinstance(category, abc.Iterable): - continue - for s in category: - yield MetricSample(ts, mid, cid, name, s) - - -@dataclass(frozen=True) -class ComponentsDataBatch(GenericDataBatch): - """Batch of microgrid components data.""" - - def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse): - """Initialize the ComponentsDataBatch. - - Args: - data_pb: The underlying protobuf message. - """ - super().__init__( - data_pb, id_attr="component_id", items_attr="components", has_bounds=True - ) - - -@dataclass(frozen=True) -class SensorsDataBatch(GenericDataBatch): - """Batch of microgrid sensors data.""" - - def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse): - """Initialize the SensorsDataBatch. - - Args: - data_pb: The underlying protobuf message. - """ - super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors") - - -@dataclass(frozen=True) -class AggregatedMetric: - """An aggregated metric sample returned by the Reporting service.""" - - _data_pb: PBAggregatedStreamResponse - """The underlying protobuf message.""" - - def sample(self) -> MetricSample: - """Return the aggregated metric sample.""" - return MetricSample( - timestamp=self._data_pb.sample.sampled_at.ToDatetime().replace( - tzinfo=timezone.utc - ), - microgrid_id=self._data_pb.aggregation_config.microgrid_id, - component_id=self._data_pb.aggregation_config.aggregation_formula, - metric=self._data_pb.aggregation_config.metric, - value=self._data_pb.sample.sample.value, - ) class ReportingApiClient(BaseApiClient[ReportingStub]): @@ -222,7 +86,37 @@ def __init__( channel_defaults=channel_defaults, ) - self._broadcasters: dict[int, GrpcStreamBroadcaster[Any, Any]] = {} + self._components_data_streams: dict[ + tuple[ + tuple[ + tuple[int, tuple[int, ...]], ... + ], # microgrid_components as a tuple of tuples + tuple[str, ...], # metric names + float | None, # start_time timestamp + float | None, # end_time timestamp + int | None, # resampling period in seconds + bool, # include_states + bool, # include_bounds + ], + GrpcStreamBroadcaster[ + PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch + ], + ] = {} + self._sensors_data_streams: dict[ + tuple[ + tuple[ + tuple[int, tuple[int, ...]], ... + ], # microgrid_sensors as a tuple of tuples + tuple[str, ...], # metric names + float | None, # start_time timestamp + float | None, # end_time timestamp + int | None, # resampling period in seconds + bool, # include_states + ], + GrpcStreamBroadcaster[ + PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch + ], + ] = {} self._metadata = (("key", key),) if key else () @@ -234,7 +128,7 @@ def stub(self) -> ReportingStub: return self._stub # pylint: disable=too-many-arguments - async def list_single_component_data( + def receive_single_component_data( self, *, microgrid_id: int, @@ -245,7 +139,7 @@ async def list_single_component_data( resampling_period: timedelta | None, include_states: bool = False, include_bounds: bool = False, - ) -> AsyncIterator[MetricSample]: + ) -> Receiver[MetricSample]: """Iterate over the data for a single metric. Args: @@ -258,12 +152,10 @@ async def list_single_component_data( include_states: Whether to include the state data. include_bounds: Whether to include the bound data. - Yields: - A named tuple with the following fields: - * timestamp: The timestamp of the metric sample. - * value: The metric value. + Returns: + A receiver of `MetricSample`s. """ - async for batch in self._list_microgrid_components_data_batch( + receiver = self._receive_microgrid_components_data_batch( microgrid_components=[(microgrid_id, [component_id])], metrics=[metrics] if isinstance(metrics, Metric) else metrics, start_time=start_time, @@ -271,12 +163,12 @@ async def list_single_component_data( resampling_period=resampling_period, include_states=include_states, include_bounds=include_bounds, - ): - for entry in batch: - yield entry + ) + + return BatchUnrollReceiver(receiver) # pylint: disable=too-many-arguments - async def list_microgrid_components_data( + def receive_microgrid_components_data( self, *, microgrid_components: list[tuple[int, list[int]]], @@ -286,7 +178,7 @@ async def list_microgrid_components_data( resampling_period: timedelta | None, include_states: bool = False, include_bounds: bool = False, - ) -> AsyncIterator[MetricSample]: + ) -> Receiver[MetricSample]: """Iterate over the data for multiple microgrids and components. Args: @@ -299,15 +191,10 @@ async def list_microgrid_components_data( include_states: Whether to include the state data. include_bounds: Whether to include the bound data. - Yields: - A named tuple with the following fields: - * microgrid_id: The microgrid ID. - * component_id: The component ID. - * metric: The metric name. - * timestamp: The timestamp of the metric sample. - * value: The metric value. + Returns: + A receiver of `MetricSample`s. """ - async for batch in self._list_microgrid_components_data_batch( + receiver = self._receive_microgrid_components_data_batch( microgrid_components=microgrid_components, metrics=[metrics] if isinstance(metrics, Metric) else metrics, start_time=start_time, @@ -315,13 +202,13 @@ async def list_microgrid_components_data( resampling_period=resampling_period, include_states=include_states, include_bounds=include_bounds, - ): - for entry in batch: - yield entry + ) + + return BatchUnrollReceiver(receiver) # pylint: disable=too-many-arguments # pylint: disable=too-many-locals - async def _list_microgrid_components_data_batch( + def _receive_microgrid_components_data_batch( self, *, microgrid_components: list[tuple[int, list[int]]], @@ -331,104 +218,102 @@ async def _list_microgrid_components_data_batch( resampling_period: timedelta | None, include_states: bool = False, include_bounds: bool = False, - ) -> AsyncIterator[ComponentsDataBatch]: - """Iterate over the component data batches in the stream using GrpcStreamBroadcaster. - - Args: - microgrid_components: A list of tuples of microgrid IDs and component IDs. - metrics: A list of metrics. - start_time: start datetime, if None, the earliest available data will be used - end_time: end datetime, if None starts streaming indefinitely from start_time - resampling_period: The period for resampling the data. - include_states: Whether to include the state data. - include_bounds: Whether to include the bound data. - - Yields: - A ComponentsDataBatch object of microgrid components data. - """ - microgrid_components_pb = [ - PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids) - for mid, cids in microgrid_components - ] - - def dt2ts(dt: datetime) -> PBTimestamp: - ts = PBTimestamp() - ts.FromDatetime(dt) - return ts - - time_filter = PBTimeFilter( - start=dt2ts(start_time) if start_time else None, - end=dt2ts(end_time) if end_time else None, + ) -> Receiver[ComponentsDataBatch]: + """Return a GrpcStreamBroadcaster for microgrid component data.""" + stream_key = ( + tuple((mid, tuple(cids)) for mid, cids in microgrid_components), + tuple(metric.name for metric in metrics), + start_time.timestamp() if start_time else None, + end_time.timestamp() if end_time else None, + round(resampling_period.total_seconds()) if resampling_period else None, + include_states, + include_bounds, ) - incl_states = ( - PBFilterOption.FILTER_OPTION_INCLUDE - if include_states - else PBFilterOption.FILTER_OPTION_EXCLUDE - ) - incl_bounds = ( - PBFilterOption.FILTER_OPTION_INCLUDE - if include_bounds - else PBFilterOption.FILTER_OPTION_EXCLUDE - ) - include_options = PBReceiveMicrogridComponentsDataStreamRequest.IncludeOptions( - bounds=incl_bounds, - states=incl_states, - ) + if ( + stream_key not in self._components_data_streams + or not self._components_data_streams[stream_key].is_running + ): + microgrid_components_pb = [ + PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids) + for mid, cids in microgrid_components + ] + + def dt2ts(dt: datetime) -> PBTimestamp: + ts = PBTimestamp() + ts.FromDatetime(dt) + return ts + + time_filter = PBTimeFilter( + start=dt2ts(start_time) if start_time else None, + end=dt2ts(end_time) if end_time else None, + ) - stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter( - time_filter=time_filter, - resampling_options=PBResamplingOptions( - resolution=( - round(resampling_period.total_seconds()) - if resampling_period is not None - else None + incl_states = ( + PBFilterOption.FILTER_OPTION_INCLUDE + if include_states + else PBFilterOption.FILTER_OPTION_EXCLUDE + ) + incl_bounds = ( + PBFilterOption.FILTER_OPTION_INCLUDE + if include_bounds + else PBFilterOption.FILTER_OPTION_EXCLUDE + ) + include_options = ( + PBReceiveMicrogridComponentsDataStreamRequest.IncludeOptions( + bounds=incl_bounds, + states=incl_states, ) - ), - include_options=include_options, - ) - - metric_conns_pb = [ - PBMetricConnections( - metric=metric.to_proto(), - connections=[], ) - for metric in metrics - ] - request = PBReceiveMicrogridComponentsDataStreamRequest( - microgrid_components=microgrid_components_pb, - metrics=metric_conns_pb, - filter=stream_filter, - ) + stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter( + time_filter=time_filter, + resampling_options=PBResamplingOptions( + resolution=( + round(resampling_period.total_seconds()) + if resampling_period + else None + ) + ), + include_options=include_options, + ) - def transform_response( - response: PBReceiveMicrogridComponentsDataStreamResponse, - ) -> ComponentsDataBatch: - return ComponentsDataBatch(response) + metric_conns_pb = [ + PBMetricConnections(metric=metric.to_proto(), connections=[]) + for metric in metrics + ] - async def stream_method() -> ( - AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse] - ): - call_iterator = self.stub.ReceiveMicrogridComponentsDataStream( - request, metadata=self._metadata + request = PBReceiveMicrogridComponentsDataStreamRequest( + microgrid_components=microgrid_components_pb, + metrics=metric_conns_pb, + filter=stream_filter, ) - async for response in cast( - AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse], - call_iterator, - ): - yield response - broadcaster = GrpcStreamBroadcaster( - stream_name="microgrid-components-data-stream", - stream_method=stream_method, - transform=transform_response, - retry_strategy=None, - ) + def transform_response( + response: PBReceiveMicrogridComponentsDataStreamResponse, + ) -> ComponentsDataBatch: + return ComponentsDataBatch(response) - receiver = broadcaster.new_receiver() - async for data in receiver: - yield data + async def stream_method() -> ( + AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse] + ): + call_iterator = self.stub.ReceiveMicrogridComponentsDataStream( + request, metadata=self._metadata + ) + async for response in cast( + AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse], + call_iterator, + ): + yield response + + self._components_data_streams[stream_key] = GrpcStreamBroadcaster( + stream_name="microgrid-components-data-stream", + stream_method=stream_method, + transform=transform_response, + retry_strategy=None, + ) + + return self._components_data_streams[stream_key].new_receiver() # pylint: disable=too-many-arguments async def receive_single_sensor_data( @@ -441,7 +326,7 @@ async def receive_single_sensor_data( end_time: datetime | None, resampling_period: timedelta | None, include_states: bool = False, - ) -> AsyncIterator[MetricSample]: + ) -> Receiver[MetricSample]: """Iterate over the data for a single sensor and metric. Args: @@ -453,10 +338,8 @@ async def receive_single_sensor_data( resampling_period: The period for resampling the data. include_states: Whether to include the state data. - Yields: - A named tuple with the following fields: - * timestamp: The timestamp of the metric sample. - * value: The metric value. + Returns: + A receiver of `MetricSample`s. """ receiver = await self._receive_microgrid_sensors_data_batch( microgrid_sensors=[(microgrid_id, [sensor_id])], @@ -466,9 +349,7 @@ async def receive_single_sensor_data( resampling_period=resampling_period, include_states=include_states, ) - async for batch in receiver: - for entry in batch: - yield entry + return BatchUnrollReceiver(receiver) # pylint: disable=too-many-arguments async def receive_microgrid_sensors_data( @@ -480,7 +361,7 @@ async def receive_microgrid_sensors_data( end_time: datetime | None, resampling_period: timedelta | None, include_states: bool = False, - ) -> AsyncIterator[MetricSample]: + ) -> Receiver[MetricSample]: """Iterate over the data for multiple sensors in a microgrid. Args: @@ -492,13 +373,8 @@ async def receive_microgrid_sensors_data( resampling_period: The period for resampling the data. include_states: Whether to include the state data. - Yields: - A named tuple with the following fields: - * microgrid_id: The microgrid ID. - * sensor_id: The sensor ID. - * metric: The metric name. - * timestamp: The timestamp of the metric sample. - * value: The metric value. + Returns: + A receiver of `MetricSample`s. """ receiver = await self._receive_microgrid_sensors_data_batch( microgrid_sensors=microgrid_sensors, @@ -508,9 +384,7 @@ async def receive_microgrid_sensors_data( resampling_period=resampling_period, include_states=include_states, ) - async for batch in receiver: - for entry in batch: - yield entry + return BatchUnrollReceiver(receiver) # pylint: disable=too-many-arguments # pylint: disable=too-many-locals @@ -523,7 +397,7 @@ async def _receive_microgrid_sensors_data_batch( end_time: datetime | None, resampling_period: timedelta | None, include_states: bool = False, - ) -> AsyncIterator[SensorsDataBatch]: + ) -> Receiver[SensorsDataBatch]: """Iterate over the sensor data batches in the stream using GrpcStreamBroadcaster. Args: @@ -535,83 +409,96 @@ async def _receive_microgrid_sensors_data_batch( include_states: Whether to include the state data. Returns: - A SensorDataBatch object of microgrid sensors data. + A GrpcStreamBroadcaster that can be used to receive sensor data batches. """ - microgrid_sensors_pb = [ - PBMicrogridSensorIDs(microgrid_id=mid, sensor_ids=sids) - for mid, sids in microgrid_sensors - ] + stream_key = ( + tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors), + tuple(metric.name for metric in metrics), + start_time.timestamp() if start_time else None, + end_time.timestamp() if end_time else None, + round(resampling_period.total_seconds()) if resampling_period else None, + include_states, + ) - def dt2ts(dt: datetime) -> PBTimestamp: - ts = PBTimestamp() - ts.FromDatetime(dt) - return ts + if ( + stream_key not in self._sensors_data_streams + or not self._sensors_data_streams[stream_key].is_running + ): - time_filter = PBTimeFilter( - start=dt2ts(start_time) if start_time else None, - end=dt2ts(end_time) if end_time else None, - ) + microgrid_sensors_pb = [ + PBMicrogridSensorIDs(microgrid_id=mid, sensor_ids=sids) + for mid, sids in microgrid_sensors + ] - incl_states = ( - PBFilterOption.FILTER_OPTION_INCLUDE - if include_states - else PBFilterOption.FILTER_OPTION_EXCLUDE - ) - include_options = PBReceiveMicrogridSensorsDataStreamRequest.IncludeOptions( - states=incl_states, - ) + def dt2ts(dt: datetime) -> PBTimestamp: + ts = PBTimestamp() + ts.FromDatetime(dt) + return ts - stream_filter = PBReceiveMicrogridSensorsDataStreamRequest.StreamFilter( - time_filter=time_filter, - resampling_options=PBResamplingOptions( - resolution=( - round(resampling_period.total_seconds()) - if resampling_period is not None - else None - ) - ), - include_options=include_options, - ) + time_filter = PBTimeFilter( + start=dt2ts(start_time) if start_time else None, + end=dt2ts(end_time) if end_time else None, + ) - metric_conns_pb = [ - PBMetricConnections( - metric=metric.to_proto(), - connections=[], + incl_states = ( + PBFilterOption.FILTER_OPTION_INCLUDE + if include_states + else PBFilterOption.FILTER_OPTION_EXCLUDE + ) + include_options = PBReceiveMicrogridSensorsDataStreamRequest.IncludeOptions( + states=incl_states, ) - for metric in metrics - ] - request = PBReceiveMicrogridSensorsDataStreamRequest( - microgrid_sensors=microgrid_sensors_pb, - metrics=metric_conns_pb, - filter=stream_filter, - ) + stream_filter = PBReceiveMicrogridSensorsDataStreamRequest.StreamFilter( + time_filter=time_filter, + resampling_options=PBResamplingOptions( + resolution=( + round(resampling_period.total_seconds()) + if resampling_period is not None + else None + ) + ), + include_options=include_options, + ) - def transform_response( - response: PBReceiveMicrogridSensorsDataStreamResponse, - ) -> SensorsDataBatch: - return SensorsDataBatch(response) + metric_conns_pb = [ + PBMetricConnections( + metric=metric.to_proto(), + connections=[], + ) + for metric in metrics + ] - async def stream_method() -> ( - AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse] - ): - call_iterator = self.stub.ReceiveMicrogridSensorsDataStream( - request, metadata=self._metadata + request = PBReceiveMicrogridSensorsDataStreamRequest( + microgrid_sensors=microgrid_sensors_pb, + metrics=metric_conns_pb, + filter=stream_filter, ) - async for response in cast( - AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse], - call_iterator, - ): - yield response - broadcaster = GrpcStreamBroadcaster( - stream_name="microgrid-sensors-data-stream", - stream_method=stream_method, - transform=transform_response, - retry_strategy=None, - ) + def transform_response( + response: PBReceiveMicrogridSensorsDataStreamResponse, + ) -> SensorsDataBatch: + return SensorsDataBatch(response) + + async def stream_method() -> ( + AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse] + ): + call_iterator = self.stub.ReceiveMicrogridSensorsDataStream( + request, metadata=self._metadata + ) + async for response in cast( + AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse], + call_iterator, + ): + yield response + + self._sensors_data_streams[stream_key] = GrpcStreamBroadcaster( + stream_name="microgrid-sensors-data-stream", + stream_method=stream_method, + transform=transform_response, + ) - return broadcaster.new_receiver() + return self._sensors_data_streams[stream_key].new_receiver() async def receive_aggregated_data( self, diff --git a/src/frequenz/client/reporting/_types.py b/src/frequenz/client/reporting/_types.py new file mode 100644 index 0000000..cfd3228 --- /dev/null +++ b/src/frequenz/client/reporting/_types.py @@ -0,0 +1,172 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Types for the Reporting API client.""" + +import math +from collections.abc import Iterable, Iterator +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, NamedTuple + +# pylint: disable=no-name-in-module +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse, +) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveMicrogridComponentsDataStreamResponse as PBReceiveMicrogridComponentsDataStreamResponse, +) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveMicrogridSensorsDataStreamResponse as PBReceiveMicrogridSensorsDataStreamResponse, +) + +# pylint: enable=no-name-in-module +from frequenz.client.common.metric import Metric + + +class MetricSample(NamedTuple): + """Type for a sample of a time series incl. metric type, microgrid and component ID. + + A named tuple was chosen to allow safe access to the fields while keeping the + simplicity of a tuple. This data type can be easily used to create a numpy array + or a pandas DataFrame. + """ + + timestamp: datetime + microgrid_id: int + component_id: str + metric: str + value: float + + +@dataclass(frozen=True) +class GenericDataBatch: + """Base class for batches of microgrid data (components or sensors). + + This class serves as a base for handling batches of data related to microgrid + components or sensors. It manages the received protocol buffer (PB) data, + provides access to relevant items via specific attributes, and includes + functionality to work with bounds if applicable. + """ + + _data_pb: Any + id_attr: str + items_attr: str + has_bounds: bool = False + + def is_empty(self) -> bool: + """Check if the batch contains valid data. + + Returns: + True if the batch contains no valid data. + """ + items = getattr(self._data_pb, self.items_attr, []) + if not items: + return True + for item in items: + if not getattr(item, "metric_samples", []) and not getattr( + item, "states", [] + ): + return True + return False + + def __iter__(self) -> Iterator[MetricSample]: + """Get generator that iterates over all values in the batch. + + Note: So far only `SimpleMetricSample` in the `MetricSampleVariant` + message is supported. + + + Yields: + A named tuple with the following fields: + * timestamp: The timestamp of the metric sample. + * microgrid_id: The microgrid ID. + * component_id: The component ID. + * metric: The metric name. + * value: The metric value. + """ + mid = self._data_pb.microgrid_id + items = getattr(self._data_pb, self.items_attr) + + for item in items: + cid = getattr(item, self.id_attr) + for sample in getattr(item, "metric_samples", []): + ts = sample.sampled_at.ToDatetime().replace(tzinfo=timezone.utc) + met = Metric.from_proto(sample.metric).name + value = ( + sample.value.simple_metric.value + if sample.value.HasField("simple_metric") + else math.nan + ) + yield MetricSample(ts, mid, cid, met, value) + + if self.has_bounds: + for i, bound in enumerate(sample.bounds): + if bound.lower: + yield MetricSample( + ts, mid, cid, f"{met}_bound_{i}_lower", bound.lower + ) + if bound.upper: + yield MetricSample( + ts, mid, cid, f"{met}_bound_{i}_upper", bound.upper + ) + + for state in getattr(item, "states", []): + ts = state.sampled_at.ToDatetime().replace(tzinfo=timezone.utc) + for category, category_items in { + "state": getattr(state, "states", []), + "warning": getattr(state, "warnings", []), + "error": getattr(state, "errors", []), + }.items(): + if not isinstance(category_items, Iterable): + continue + for s in category_items: + yield MetricSample(ts, mid, cid, category, s) + + +@dataclass(frozen=True) +class ComponentsDataBatch(GenericDataBatch): + """Batch of microgrid components data.""" + + def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse): + """Initialize the ComponentsDataBatch. + + Args: + data_pb: The underlying protobuf message. + """ + super().__init__( + data_pb, id_attr="component_id", items_attr="components", has_bounds=True + ) + + +@dataclass(frozen=True) +class SensorsDataBatch(GenericDataBatch): + """Batch of microgrid sensors data.""" + + def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse): + """Initialize the SensorsDataBatch. + + Args: + data_pb: The underlying protobuf message. + """ + super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors") + + +@dataclass(frozen=True) +class AggregatedMetric: + """An aggregated metric sample returned by the Reporting service.""" + + _data_pb: PBAggregatedStreamResponse + """The underlying protobuf message.""" + + def sample(self) -> MetricSample: + """Return the aggregated metric sample.""" + return MetricSample( + timestamp=self._data_pb.sample.sampled_at.ToDatetime().replace( + tzinfo=timezone.utc + ), + microgrid_id=self._data_pb.aggregation_config.microgrid_id, + component_id=self._data_pb.aggregation_config.aggregation_formula, + metric=Metric(self._data_pb.aggregation_config.metric).name, + value=self._data_pb.sample.sample.value, + ) diff --git a/src/frequenz/client/reporting/cli/__main__.py b/src/frequenz/client/reporting/cli/__main__.py index a5b5779..88f94ae 100644 --- a/src/frequenz/client/reporting/cli/__main__.py +++ b/src/frequenz/client/reporting/cli/__main__.py @@ -6,13 +6,12 @@ import argparse import asyncio from datetime import datetime, timedelta -from pprint import pprint from typing import AsyncIterator from frequenz.client.common.metric import Metric from frequenz.client.reporting import ReportingApiClient -from frequenz.client.reporting._client import MetricSample +from frequenz.client.reporting._types import MetricSample def main() -> None: @@ -158,7 +157,7 @@ async def data_iter() -> AsyncIterator[MetricSample]: else None ) - async for sample in client.list_microgrid_components_data( + async for sample in client.receive_microgrid_components_data( microgrid_components=microgrid_components, metrics=metrics, start_time=start_time, @@ -187,11 +186,6 @@ async def data_iter() -> AsyncIterator[MetricSample]: async for sample in data_iter(): print(sample) - elif fmt == "dict": - # Dumping all data as a single dict - dct = await iter_to_dict(data_iter()) - pprint(dct) - elif fmt == "csv": # Print header print(",".join(MetricSample._fields)) @@ -205,42 +199,5 @@ async def data_iter() -> AsyncIterator[MetricSample]: return -async def iter_to_dict( - components_data_iter: AsyncIterator[MetricSample], -) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: - """Convert components data iterator into a single dict. - - The nesting structure is: - { - microgrid_id: { - component_id: { - timestamp: { - metric: value - } - } - } - } - - Args: - components_data_iter: async generator - - Returns: - Single dict with with all components data - """ - ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} - - async for ts, mid, cid, met, value in components_data_iter: - if mid not in ret: - ret[mid] = {} - if cid not in ret[mid]: - ret[mid][cid] = {} - if ts not in ret[mid][cid]: - ret[mid][cid][ts] = {} - - ret[mid][cid][ts][met] = value - - return ret - - if __name__ == "__main__": main() diff --git a/tests/test_client_reporting.py b/tests/test_client_reporting.py index 299ba7f..1b98834 100644 --- a/tests/test_client_reporting.py +++ b/tests/test_client_reporting.py @@ -10,7 +10,7 @@ from frequenz.client.base.client import BaseApiClient from frequenz.client.reporting import ReportingApiClient -from frequenz.client.reporting._client import ComponentsDataBatch +from frequenz.client.reporting._types import ComponentsDataBatch @pytest.mark.asyncio