Skip to content

Commit 07842a9

Browse files
authored
Update components retrival functions (#177)
Implemented comments from #169 for components.
2 parents a9107be + fd54888 commit 07842a9

File tree

6 files changed

+480
-388
lines changed

6 files changed

+480
-388
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
## Upgrading
88

99
* Change 'start_dt' and 'end_dt' to 'start_time' and 'end_time' respectively.
10+
* Rename 'list' to 'receive' in component data retrival functions
11+
* Return the receiver directly in '_recieve_microgrid_components_data_batch'
1012

1113
## New Features
1214

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that unrolls batches of data into individual samples."""
5+
6+
from collections.abc import Iterator
7+
8+
from frequenz.channels import Receiver, ReceiverStoppedError
9+
from typing_extensions import override
10+
11+
from ._types import ComponentsDataBatch, MetricSample, SensorsDataBatch
12+
13+
14+
class BatchUnrollReceiver(Receiver[MetricSample]):
15+
"""Receiver to unroll `ComponentsDataBatch`s into `MetricSample`s."""
16+
17+
def __init__(
18+
self, stream: Receiver[ComponentsDataBatch | SensorsDataBatch]
19+
) -> None:
20+
"""Initialize the receiver.
21+
22+
Args:
23+
stream: The stream to receive batches from.
24+
"""
25+
self._stream: Receiver[ComponentsDataBatch | SensorsDataBatch] = stream
26+
self._batch_iter: Iterator[MetricSample] | None = None
27+
self._latest_sample: MetricSample | None = None
28+
self._no_more_data: bool = False
29+
30+
@override
31+
async def ready(self) -> bool:
32+
"""Wait until the next `MetricSample` is ready."""
33+
# If ready is called multiple times, we should return the same result
34+
# so we don't loose any data
35+
if self._latest_sample is not None:
36+
return True
37+
38+
while True:
39+
# If we have a batch iterator, try to get the next sample
40+
if self._batch_iter is not None:
41+
try:
42+
self._latest_sample = next(self._batch_iter)
43+
return True
44+
# If the batch is done, set the batch iterator to None
45+
except StopIteration:
46+
self._batch_iter = None
47+
48+
# If we don't have a batch iterator, try to get the next batch
49+
try:
50+
batch = await anext(self._stream)
51+
self._batch_iter = iter(batch)
52+
# If the stream is done, return False
53+
except StopAsyncIteration:
54+
self._no_more_data = True
55+
return False
56+
57+
@override
58+
def consume(self) -> MetricSample:
59+
"""Consume the next `MetricSample`.
60+
61+
Returns:
62+
The next `MetricSample`.
63+
64+
Raises:
65+
ReceiverStoppedError: If the receiver is stopped.
66+
RuntimeError: If the receiver is not ready.
67+
"""
68+
sample = self._latest_sample
69+
if sample is None:
70+
if self._no_more_data:
71+
raise ReceiverStoppedError(self)
72+
raise RuntimeError("consume called before ready")
73+
self._latest_sample = None
74+
return sample

0 commit comments

Comments
 (0)