-
Couldn't load subscription status.
- Fork 20
Stop raising CancelledError when actor is cancelled #1166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop raising CancelledError when actor is cancelled #1166
Conversation
7901673 to
b9caa91
Compare
b9caa91 to
4579627
Compare
|
Looks like the release notes were not cleared earlier after the previous release. Could you base on top of this: #1167 |
4579627 to
fa0d7f7
Compare
|
Done, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think wait() is working as it should. wait() is used to implement __await__ and __await__ doesn't swallow CancelledErrors in a Task, so I think we should be consistent with that.
In BackgroundService there is stop() though, which is basically:
self.cancel()
try:
await self.wait()
except CancelledError:
passSo there is already a convenience method to stop a background service without caring about cancellation. With wait() we can't assume the user doesn't want to be notified about CancelledError, as they might be awaiting on the service without actually ever calling cancel(), so getting a CancelledError might be an actual unexpected situation/error.
fa0d7f7 to
34cabac
Compare
|
Right! Thanks for checking. |
src/frequenz/sdk/actor/_run_utils.py
Outdated
|
|
||
|
|
||
| def _was_cancelled(task: asyncio.Task[Any]) -> bool: | ||
| if task.cancelled(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting, so a task that stops because a cancellation error was raised in its code is not considered cancelled? So I guess cancelled will only be true if we call .cancel() to the internal task created only to await the actor finalization. If this is the case, task.cancelled() should never be true, as these tasks are internal to this function and we never call .cancel(). Maybe it is worth adding a note about this.
[...after doing some experiments...]
No, that doesn't seem to be the case:
import asyncio
async def cancelled_task() -> None:
await asyncio.sleep(10)
async def wrapper_task() -> None:
task = asyncio.create_task(cancelled_task())
await asyncio.sleep(1)
task.cancel()
await task
async def main() -> None:
pending = {asyncio.create_task(wrapper_task())}
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
if task.cancelled():
print("Wrapper task was cancelled")
elif exception := task.exception():
match exception:
case asyncio.CancelledError:
print("Sub-task was cancelled")
case _:
print(f"Wrapper task raised an exception: {exception}")
asyncio.run(main())This prints:
Wrapper task was cancelled
So the issue is the BaseExceptionGroup (or exception groups in general). When a CancelledError is wrapped in a group, then this exception is not properly translated to a wrapper task cancellation, and instead the task re-raises the whole group.
import asyncio
async def cancelled_task() -> None:
try:
await asyncio.sleep(10)
except asyncio.CancelledError as exc:
raise BaseExceptionGroup("Sub-task was cancelled", [exc])
async def wrapper_task() -> None:
task = asyncio.create_task(cancelled_task())
await asyncio.sleep(1)
task.cancel()
await task
async def main() -> None:
pending = {asyncio.create_task(wrapper_task())}
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
if task.cancelled():
print("Wrapper task was cancelled")
elif exception := task.exception():
match exception:
case asyncio.CancelledError:
print("Sub-task was cancelled")
case _:
print(f"Wrapper task raised an exception: {exception!r}")
asyncio.run(main())This prints:
Wrapper task raised an exception: BaseExceptionGroup('Sub-task was cancelled', [CancelledError()])
So to handle this properly, we need to use except* clauses, so we can split of the exception we want to catch as subgroups.
import asyncio
from collections.abc import Coroutine
async def cancelled_task() -> None:
await asyncio.sleep(10)
async def cancelled_tasks() -> None:
try:
await asyncio.sleep(10)
except asyncio.CancelledError as exc:
raise BaseExceptionGroup(
"Sub-tasks have errors",
[
exc,
Exception("Some other exception"),
asyncio.CancelledError("another CancelledError"),
],
)
async def wrapper_task(coro: Coroutine[None, None, None]) -> None:
task = asyncio.create_task(coro)
await asyncio.sleep(1)
task.cancel()
await task
async def main() -> None:
pending = {
asyncio.create_task(wrapper_task(cancelled_task()), name="cancelled_task"),
asyncio.create_task(wrapper_task(cancelled_tasks()), name="cancelled_tasks"),
}
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
if task.cancelled():
print(f"{task.get_name()} was cancelled")
else:
try:
await task
except* asyncio.CancelledError as exc:
print(f"{task.get_name()} has some cancellations: {exc!r}")
except* Exception as exc:
print(
f"Wrapper task for {task.get_name()} raised an exception: {exc!r}"
)
asyncio.run(main())This prints:
cancelled_task was cancelled
cancelled_tasks has some cancellations: BaseExceptionGroup('Sub-tasks have errors', [CancelledError(), CancelledError('another CancelledError')])
Wrapper task for cancelled_tasks raised an exception: ExceptionGroup('Sub-tasks have errors', [Exception('Some other exception')])
34cabac to
f9e2d04
Compare
|
Wow... this was tricky. I didn't know about |
f9e2d04 to
aa5e6a0
Compare
src/frequenz/sdk/actor/_run_utils.py
Outdated
| _logger.error( | ||
| "Actor %s: Raised an exception while running.", | ||
| task.get_name(), | ||
| exc_info=exception, | ||
| exc_info=err, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could change this with _logger.exception() now, as it is inside a try/except.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/frequenz/sdk/actor/_run_utils.py
Outdated
| except* asyncio.CancelledError: | ||
| _logger.info("Actor %s: Cancelled while running.", task.get_name()) | ||
| elif exception := task.exception(): | ||
| except* BaseException as err: # pylint: disable=broad-exception-caught |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a change of behaviour, but I think we should only catch Exception here, we probably don't want to catch stuff like KeyboardInterrupt, GeneratorExit or SystemExit. CancelledError is probably the only BaseException we ever want to catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so,too. But we had dedicated tests for BaseException.
Removed in last commit.
5d8ec3d to
d02067a
Compare
BacgroundService raises BaseExceptionGroup. We can check if task was cancelled by checking if there are CancelledError in the list of exception. Signed-off-by: Elzbieta Kotulska <[email protected]>
CancellError is the only BaseException that needs to be catched. Signed-off-by: Elzbieta Kotulska <[email protected]>
d02067a to
eff167b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, damn, so BaseException was not actually handled, just logged and bubbled up. That is fine too, but also not that important, as a base exception, if it is not only used for flow control (like exiting from the program), will probably be printed with a backtrace, so we should know it came from an actor.
So I think both approaches are OK (logging or not logging BaseExceptions), I'm approving and leaving it up to you.
Thanks! This was a very tricky bug.
|
(disabled auto-merge in case you want to keep logging base exceptions, but after disabling it, I think the current approach is more correct, logging them will probably just be noise anyway) |
|
No no, If I understand correctly: And I think new approach is correct, because we stop as soon as something raise BaseException. |
Cancelling BackgroundService should not raise exception. But we see exception when we stop actor, after calling
actor.wait().