Skip to content

Commit 5c98eb9

Browse files
committed
Update datasourcing actor to close channels.
When the senders closed the channels, the datasourcing actor will also close its sending channels. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 133c735 commit 5c98eb9

File tree

1 file changed

+22
-1
lines changed

1 file changed

+22
-1
lines changed

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,18 +343,39 @@ async def _handle_data_stream(
343343
stream_senders = self._get_metric_senders(
344344
category, self._req_streaming_metrics[comp_id]
345345
)
346-
api_data_receiver = self.comp_data_receivers[comp_id]
346+
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]
347+
348+
senders_done: asyncio.Event = asyncio.Event()
349+
pending_messages = 0
347350

348351
def process_msg(data: Any) -> None:
349352
tasks = []
350353
for extractor, senders in stream_senders:
351354
for sender in senders:
352355
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
353356
asyncio.gather(*tasks)
357+
nonlocal pending_messages
358+
pending_messages -= 1
359+
if pending_messages == 0:
360+
senders_done.set()
354361

355362
async for data in api_data_receiver:
363+
pending_messages += 1
364+
senders_done.clear()
356365
process_msg(data)
357366

367+
while pending_messages > 0:
368+
await senders_done.wait()
369+
370+
await asyncio.gather(
371+
*[
372+
# pylint: disable=protected-access
373+
self._registry._close_channel(r.get_channel_name())
374+
for requests in self._req_streaming_metrics[comp_id].values()
375+
for r in requests
376+
]
377+
)
378+
358379
async def _update_streams(
359380
self,
360381
comp_id: int,

0 commit comments

Comments
 (0)