Skip to content

Commit 6b7f178

Browse files
Add & use close channel method in the registry (#225)
Add all required code to automatically close channels when we're done sending values in the mock_microgrid. Required for the datasourcing actor benchmark. - Add private method to allow closing of channels - Update mock_microgrid to close channels when done sending. - Update datasourcing actor to close channels.
2 parents 4be4a3b + 19da0e8 commit 6b7f178

File tree

5 files changed

+53
-1
lines changed

5 files changed

+53
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
* A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk.
1414
* Add the `run(*actors)` function for running and synchronizing the execution of actors. This new function simplifies the way actors are managed on the client side, allowing for a cleaner and more streamlined approach. Users/apps can now run actors simply by calling run(actor1, actor2, actor3...) without the need to manually call join() and deal with linting errors.
15+
* The datasourcing actor now automatically closes all sending channels when the input channel closes.
1516

1617
## Bug Fixes
1718

src/frequenz/sdk/actor/_channel_registry.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,15 @@ def new_receiver(self, key: str) -> Receiver[Any]:
4949
if key not in self._channels:
5050
self._channels[key] = Broadcast(f"{self._name}-{key}")
5151
return self._channels[key].new_receiver()
52+
53+
async def _close_channel(self, key: str) -> None:
54+
"""Close a channel with the given key.
55+
56+
This method is private and should only be used in special cases.
57+
58+
Args:
59+
key: A key to identify the channel.
60+
"""
61+
if key in self._channels:
62+
if channel := self._channels.pop(key, None):
63+
await channel.close()

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,18 +343,39 @@ async def _handle_data_stream(
343343
stream_senders = self._get_metric_senders(
344344
category, self._req_streaming_metrics[comp_id]
345345
)
346-
api_data_receiver = self.comp_data_receivers[comp_id]
346+
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]
347+
348+
senders_done: asyncio.Event = asyncio.Event()
349+
pending_messages = 0
347350

348351
def process_msg(data: Any) -> None:
349352
tasks = []
350353
for extractor, senders in stream_senders:
351354
for sender in senders:
352355
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
353356
asyncio.gather(*tasks)
357+
nonlocal pending_messages
358+
pending_messages -= 1
359+
if pending_messages == 0:
360+
senders_done.set()
354361

355362
async for data in api_data_receiver:
363+
pending_messages += 1
364+
senders_done.clear()
356365
process_msg(data)
357366

367+
while pending_messages > 0:
368+
await senders_done.wait()
369+
370+
await asyncio.gather(
371+
*[
372+
# pylint: disable=protected-access
373+
self._registry._close_channel(r.get_channel_name())
374+
for requests in self._req_streaming_metrics[comp_id].values()
375+
for r in requests
376+
]
377+
)
378+
358379
async def _update_streams(
359380
self,
360381
comp_id: int,

tests/timeseries/mock_microgrid.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ async def _comp_data_send_task(
148148
await self._microgrid.send(make_comp_data(val_to_send, timestamp))
149149
await asyncio.sleep(self._sample_rate_s)
150150

151+
await self._microgrid.close_channel(comp_id)
152+
151153
def _start_meter_streaming(self, meter_id: int) -> None:
152154
self._streaming_coros.append(
153155
self._comp_data_send_task(

tests/utils/mock_microgrid.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ def __init__(self, components: Set[Component], connections: Set[Connection]):
4949
meter_channels = self._create_meter_channels()
5050
ev_charger_channels = self._create_ev_charger_channels()
5151

52+
self._all_channels: Dict[int, Broadcast[Any]] = {
53+
**bat_channels,
54+
**inv_channels,
55+
**meter_channels,
56+
**ev_charger_channels,
57+
}
58+
5259
mock_api = self._create_mock_api(
5360
bat_channels, inv_channels, meter_channels, ev_charger_channels
5461
)
@@ -128,6 +135,15 @@ async def send(self, data: ComponentData) -> bool:
128135

129136
raise RuntimeError(f"{type(data)} is not supported in MockMicrogridClient.")
130137

138+
async def close_channel(self, cid: int) -> None:
139+
"""Close channel for given component id.
140+
141+
Args:
142+
cid: Component id
143+
"""
144+
if cid in self._all_channels:
145+
await self._all_channels[cid].close()
146+
131147
def _create_battery_channels(self) -> Dict[int, Broadcast[BatteryData]]:
132148
"""Create channels for the batteries.
133149

0 commit comments

Comments
 (0)