-
Couldn't load subscription status.
- Fork 20
Description
What's needed?
Currently the PowerDistributingActor uses asyncio primitives to manage tasks and power requests. This approach makes the code complex and prone to errors because it mixes the actor's logic with task synchronization and error handling.
Proposed solution
Once PersistentTaskGroup is available in https://github.com/frequenz-floss/frequenz-core-python it can be used in the PowerDistributingActor, that should make the code less convoluted, less error-prone and focus just in the logic of the actor without the need of taking care of tasks logic.
This is the comment with the proposal:
I actually stayed on this issue thinking about it again...
[.... one hour later due to some of the changes you did with the request ID :D ...]
It was a good exercise to see how to apply the new class. The result is pretty different to what I had in mind before, and it doesn't look particularly simple either in retrospect, but I think it is more efficient, because we only keep one task for the same req_id, as long as we have quests queued up, instead of starting and finishing a task for every pending request.
async def _run(self) -> None:
await self._component_manager.start()
async with PersistentTaskGroup() as group:
async for request in self._requests_receiver:
self._handle_request(request, group)
async for task in group.as_completed(timeout=0):
try:
task.result()
except Exception: # pylint: disable=broad-except
_logger.exception("Failed power request: %s", request)
async def _handle_request(self, request: ..., group: ...) -> None:
req_id = self._get_request_id(request)
if pending_request := self._pending_requests.get(req_id):
_logger.debug(
"Pending request: %s, overwritten with request: %s",
pending_request,
request,
)
return
self._pending_requests[req_id] = request
task = group.create_task(
self._distribute_power(req_ir),
name=f"{type(self).__name__}:{request}",
)
async def _distribute_power(self, req_id: ...) -> None:
while request := self._pending_requests.get(req_id):
self._component_manager.distribute_power(request)At some point I want to add a way to get as_completed() as a Receiver, so we can put it in a select(), like:
async with PersistentTaskGroup() as group:
completed_tasks = CompletedTaskReceiver(group) # This needs to be added to channels
async for selected in select(self._requests_receiver, completed_tasks):
if selected_from(self._requests_receiver, selected):
self._handle_request(selected.message, group)
elif selected_from(completed_tasks, selected):
try:
task.result()
except Exception: # pylint: disable=broad-except
_logger.exception("Failed power request: %s", request)But for now we can live with using it with timeout=0 so it doesn't block, and we only ACK finished tasks after we handle a power request, which should be OK, we should receive requests often enough I guess.
Additional context
See the original post and draft example (not tested) in #1023 (comment)
Metadata
Metadata
Assignees
Labels
Type
Projects
Status