Skip to content

Commit 2464c3e

Browse files
committed
Implement BatchUnrollReceiver
This turns a `Receiver[ComponentDataBatch]` into a `Receiver[MetricSample]` by unrolling `ComponentDataBatch` objects into multiple `MetricSample`s. Then this receiver is used in the client to return `Receiver`s instead of iterators for component data streams. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 715a6b8 commit 2464c3e

File tree

2 files changed

+92
-37
lines changed

2 files changed

+92
-37
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that unrolls batches of data into individual samples."""
5+
6+
from collections.abc import Iterator
7+
8+
from frequenz.channels import Receiver, ReceiverStoppedError
9+
from typing_extensions import override
10+
11+
from ._types import ComponentsDataBatch, MetricSample
12+
13+
14+
class BatchUnrollReceiver(Receiver[MetricSample]):
15+
"""Receiver to unroll `ComponentsDataBatch`s into `MetricSample`s."""
16+
17+
def __init__(self, stream: Receiver[ComponentsDataBatch]) -> None:
18+
"""Initialize the receiver.
19+
20+
Args:
21+
stream: The stream to receive batches from.
22+
"""
23+
self._stream: Receiver[ComponentsDataBatch] = stream
24+
self._batch_iter: Iterator[MetricSample] | None = None
25+
self._latest_sample: MetricSample | None = None
26+
self._no_more_data: bool = False
27+
28+
@override
29+
async def ready(self) -> bool:
30+
"""Wait until the next `MetricSample` is ready."""
31+
# If ready is called multiple times, we should return the same result
32+
# so we don't loose any data
33+
if self._latest_sample is not None:
34+
return True
35+
36+
while True:
37+
# If we have a batch iterator, try to get the next sample
38+
if self._batch_iter is not None:
39+
try:
40+
self._latest_sample = next(self._batch_iter)
41+
return True
42+
# If the batch is done, set the batch iterator to None
43+
except StopIteration:
44+
self._batch_iter = None
45+
46+
# If we don't have a batch iterator, try to get the next batch
47+
try:
48+
batch = await anext(self._stream)
49+
self._batch_iter = iter(batch)
50+
# If the stream is done, return False
51+
except StopAsyncIteration:
52+
self._no_more_data = True
53+
return False
54+
55+
@override
56+
def consume(self) -> MetricSample:
57+
"""Consume the next `MetricSample`.
58+
59+
Returns:
60+
The next `MetricSample`.
61+
62+
Raises:
63+
ReceiverStoppedError: If the receiver is stopped.
64+
RuntimeError: If the receiver is not ready.
65+
"""
66+
sample = self._latest_sample
67+
if sample is None:
68+
if self._no_more_data:
69+
raise ReceiverStoppedError(self)
70+
raise RuntimeError("consume called before ready")
71+
self._latest_sample = None
72+
return sample

src/frequenz/client/reporting/_client.py

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33

44
"""Client for requests to the Reporting API."""
55

6-
from collections import abc, namedtuple
7-
from collections.abc import AsyncIterator, Iterator
8-
from dataclasses import dataclass
9-
from datetime import datetime, timedelta, timezone
10-
from typing import Any, AsyncIterable, cast
6+
from collections.abc import AsyncIterable, AsyncIterator
7+
from datetime import datetime, timedelta
8+
from typing import cast
119

1210
# pylint: disable=no-name-in-module
1311
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
@@ -46,13 +44,15 @@
4644
)
4745
from frequenz.api.reporting.v1.reporting_pb2 import TimeFilter as PBTimeFilter
4846
from frequenz.api.reporting.v1.reporting_pb2_grpc import ReportingStub
47+
from frequenz.channels import Receiver
4948
from frequenz.client.base.channel import ChannelOptions
5049
from frequenz.client.base.client import BaseApiClient
5150
from frequenz.client.base.exception import ClientNotConnected
5251
from frequenz.client.base.streaming import GrpcStreamBroadcaster
5352
from frequenz.client.common.metric import Metric
5453
from google.protobuf.timestamp_pb2 import Timestamp as PBTimestamp
5554

55+
from ._batch_unroll_receiver import BatchUnrollReceiver
5656
from ._types import (
5757
AggregatedMetric,
5858
ComponentsDataBatch,
@@ -113,7 +113,7 @@ def stub(self) -> ReportingStub:
113113
return self._stub
114114

115115
# pylint: disable=too-many-arguments
116-
async def receive_single_component_data(
116+
def receive_single_component_data(
117117
self,
118118
*,
119119
microgrid_id: int,
@@ -124,7 +124,7 @@ async def receive_single_component_data(
124124
resampling_period: timedelta | None,
125125
include_states: bool = False,
126126
include_bounds: bool = False,
127-
) -> AsyncIterator[MetricSample]:
127+
) -> Receiver[MetricSample]:
128128
"""Iterate over the data for a single metric.
129129
130130
Args:
@@ -137,12 +137,10 @@ async def receive_single_component_data(
137137
include_states: Whether to include the state data.
138138
include_bounds: Whether to include the bound data.
139139
140-
Yields:
141-
A named tuple with the following fields:
142-
* timestamp: The timestamp of the metric sample.
143-
* value: The metric value.
140+
Returns:
141+
A receiver of `MetricSample`s.
144142
"""
145-
broadcaster = await self._receive_microgrid_components_data_batch(
143+
receiver = self._receive_microgrid_components_data_batch(
146144
microgrid_components=[(microgrid_id, [component_id])],
147145
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
148146
start_time=start_time,
@@ -152,14 +150,10 @@ async def receive_single_component_data(
152150
include_bounds=include_bounds,
153151
)
154152

155-
receiver = broadcaster.new_receiver()
156-
157-
async for batch in receiver:
158-
for entry in batch:
159-
yield entry
153+
return BatchUnrollReceiver(receiver)
160154

161155
# pylint: disable=too-many-arguments
162-
async def receive_microgrid_components_data(
156+
def receive_microgrid_components_data(
163157
self,
164158
*,
165159
microgrid_components: list[tuple[int, list[int]]],
@@ -169,7 +163,7 @@ async def receive_microgrid_components_data(
169163
resampling_period: timedelta | None,
170164
include_states: bool = False,
171165
include_bounds: bool = False,
172-
) -> AsyncIterator[MetricSample]:
166+
) -> Receiver[MetricSample]:
173167
"""Iterate over the data for multiple microgrids and components.
174168
175169
Args:
@@ -182,15 +176,10 @@ async def receive_microgrid_components_data(
182176
include_states: Whether to include the state data.
183177
include_bounds: Whether to include the bound data.
184178
185-
Yields:
186-
A named tuple with the following fields:
187-
* microgrid_id: The microgrid ID.
188-
* component_id: The component ID.
189-
* metric: The metric name.
190-
* timestamp: The timestamp of the metric sample.
191-
* value: The metric value.
179+
Returns:
180+
A receiver of `MetricSample`s.
192181
"""
193-
broadcaster = await self._receive_microgrid_components_data_batch(
182+
receiver = self._receive_microgrid_components_data_batch(
194183
microgrid_components=microgrid_components,
195184
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
196185
start_time=start_time,
@@ -200,15 +189,11 @@ async def receive_microgrid_components_data(
200189
include_bounds=include_bounds,
201190
)
202191

203-
receiver = broadcaster.new_receiver()
204-
205-
async for batch in receiver:
206-
for entry in batch:
207-
yield entry
192+
return BatchUnrollReceiver(receiver)
208193

209194
# pylint: disable=too-many-arguments
210195
# pylint: disable=too-many-locals
211-
async def _receive_microgrid_components_data_batch(
196+
def _receive_microgrid_components_data_batch(
212197
self,
213198
*,
214199
microgrid_components: list[tuple[int, list[int]]],
@@ -218,9 +203,7 @@ async def _receive_microgrid_components_data_batch(
218203
resampling_period: timedelta | None,
219204
include_states: bool = False,
220205
include_bounds: bool = False,
221-
) -> GrpcStreamBroadcaster[
222-
PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch
223-
]:
206+
) -> Receiver[ComponentsDataBatch]:
224207
"""Return a GrpcStreamBroadcaster for microgrid component data."""
225208
stream_key = (
226209
tuple((mid, tuple(cids)) for mid, cids in microgrid_components),
@@ -315,7 +298,7 @@ async def stream_method() -> (
315298
retry_strategy=None,
316299
)
317300

318-
return self._components_data_streams[stream_key]
301+
return self._components_data_streams[stream_key].new_receiver()
319302

320303
# pylint: disable=too-many-arguments
321304
async def receive_single_sensor_data(

0 commit comments

Comments
 (0)