Skip to content

Commit fd54888

Browse files
committed
Refactor sensor data streaming methods to return a Receiver
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent cc0f3b5 commit fd54888

File tree

2 files changed

+15
-24
lines changed

2 files changed

+15
-24
lines changed

src/frequenz/client/reporting/_batch_unroll_receiver.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,21 @@
88
from frequenz.channels import Receiver, ReceiverStoppedError
99
from typing_extensions import override
1010

11-
from ._types import ComponentsDataBatch, MetricSample
11+
from ._types import ComponentsDataBatch, MetricSample, SensorsDataBatch
1212

1313

1414
class BatchUnrollReceiver(Receiver[MetricSample]):
1515
"""Receiver to unroll `ComponentsDataBatch`s into `MetricSample`s."""
1616

17-
def __init__(self, stream: Receiver[ComponentsDataBatch]) -> None:
17+
def __init__(
18+
self, stream: Receiver[ComponentsDataBatch | SensorsDataBatch]
19+
) -> None:
1820
"""Initialize the receiver.
1921
2022
Args:
2123
stream: The stream to receive batches from.
2224
"""
23-
self._stream: Receiver[ComponentsDataBatch] = stream
25+
self._stream: Receiver[ComponentsDataBatch | SensorsDataBatch] = stream
2426
self._batch_iter: Iterator[MetricSample] | None = None
2527
self._latest_sample: MetricSample | None = None
2628
self._no_more_data: bool = False

src/frequenz/client/reporting/_client.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ async def receive_single_sensor_data(
326326
end_time: datetime | None,
327327
resampling_period: timedelta | None,
328328
include_states: bool = False,
329-
) -> AsyncIterator[MetricSample]:
329+
) -> Receiver[MetricSample]:
330330
"""Iterate over the data for a single sensor and metric.
331331
332332
Args:
@@ -338,10 +338,8 @@ async def receive_single_sensor_data(
338338
resampling_period: The period for resampling the data.
339339
include_states: Whether to include the state data.
340340
341-
Yields:
342-
A named tuple with the following fields:
343-
* timestamp: The timestamp of the metric sample.
344-
* value: The metric value.
341+
Returns:
342+
A receiver of `MetricSample`s.
345343
"""
346344
receiver = await self._receive_microgrid_sensors_data_batch(
347345
microgrid_sensors=[(microgrid_id, [sensor_id])],
@@ -351,9 +349,7 @@ async def receive_single_sensor_data(
351349
resampling_period=resampling_period,
352350
include_states=include_states,
353351
)
354-
async for batch in receiver:
355-
for entry in batch:
356-
yield entry
352+
return BatchUnrollReceiver(receiver)
357353

358354
# pylint: disable=too-many-arguments
359355
async def receive_microgrid_sensors_data(
@@ -365,7 +361,7 @@ async def receive_microgrid_sensors_data(
365361
end_time: datetime | None,
366362
resampling_period: timedelta | None,
367363
include_states: bool = False,
368-
) -> AsyncIterator[MetricSample]:
364+
) -> Receiver[MetricSample]:
369365
"""Iterate over the data for multiple sensors in a microgrid.
370366
371367
Args:
@@ -377,13 +373,8 @@ async def receive_microgrid_sensors_data(
377373
resampling_period: The period for resampling the data.
378374
include_states: Whether to include the state data.
379375
380-
Yields:
381-
A named tuple with the following fields:
382-
* microgrid_id: The microgrid ID.
383-
* sensor_id: The sensor ID.
384-
* metric: The metric name.
385-
* timestamp: The timestamp of the metric sample.
386-
* value: The metric value.
376+
Returns:
377+
A receiver of `MetricSample`s.
387378
"""
388379
receiver = await self._receive_microgrid_sensors_data_batch(
389380
microgrid_sensors=microgrid_sensors,
@@ -393,9 +384,7 @@ async def receive_microgrid_sensors_data(
393384
resampling_period=resampling_period,
394385
include_states=include_states,
395386
)
396-
async for batch in receiver:
397-
for entry in batch:
398-
yield entry
387+
return BatchUnrollReceiver(receiver)
399388

400389
# pylint: disable=too-many-arguments
401390
# pylint: disable=too-many-locals
@@ -408,7 +397,7 @@ async def _receive_microgrid_sensors_data_batch(
408397
end_time: datetime | None,
409398
resampling_period: timedelta | None,
410399
include_states: bool = False,
411-
) -> AsyncIterator[SensorsDataBatch]:
400+
) -> Receiver[SensorsDataBatch]:
412401
"""Iterate over the sensor data batches in the stream using GrpcStreamBroadcaster.
413402
414403
Args:
@@ -420,7 +409,7 @@ async def _receive_microgrid_sensors_data_batch(
420409
include_states: Whether to include the state data.
421410
422411
Returns:
423-
A SensorDataBatch object of microgrid sensors data.
412+
A GrpcStreamBroadcaster that can be used to receive sensor data batches.
424413
"""
425414
stream_key = (
426415
tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors),

0 commit comments

Comments
 (0)