diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c94a8d5aa..f355a5375 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -18,3 +18,4 @@ ## Bug Fixes - Fixed a typing issue that occurs in some cases when composing formulas with constants. +- Fixed a bug where sending tasks in the data sourcing actor might have not been properly awaited. diff --git a/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py index 9a3ce01b5..02d54b2fb 100644 --- a/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py @@ -398,32 +398,35 @@ async def _handle_data_stream( ) api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id] - senders_done: asyncio.Event = asyncio.Event() - pending_messages = 0 - - def process_msg(data: Any) -> None: - tasks = [] - for extractor, senders in stream_senders: - for sender in senders: - tasks.append( - sender.send( - Sample(data.timestamp, Quantity(extractor(data))) - ) + async def process_msg(data: Any) -> None: + async with asyncio.TaskGroup() as tg: + for extractor, senders in stream_senders: + for sender in senders: + sample = Sample(data.timestamp, Quantity(extractor(data))) + name = f"send:ts={sample.timestamp}:cid={comp_id}" + tg.create_task(sender.send(sample), name=name) + + sending_tasks: set[asyncio.Task[None]] = set() + + async def clean_tasks( + sending_tasks: set[asyncio.Task[None]], + ) -> set[asyncio.Task[None]]: + done, pending = await asyncio.wait(sending_tasks, timeout=0) + for task in done: + if error := task.exception(): + _logger.error( + "Error while processing message in task %s", + task.get_name(), + exc_info=error, ) - asyncio.gather(*tasks) - nonlocal pending_messages - pending_messages -= 1 - if pending_messages == 0: - senders_done.set() + return pending async for data in api_data_receiver: - pending_messages += 1 - senders_done.clear() - process_msg(data) - - while pending_messages > 0: - await senders_done.wait() + name = f"process_msg:cid={comp_id}" + sending_tasks.add(asyncio.create_task(process_msg(data), name=name)) + sending_tasks = await clean_tasks(sending_tasks) + await asyncio.gather(*sending_tasks) await asyncio.gather( *[ self._registry.close_and_remove(r.get_channel_name())