11import asyncio
2+ import concurrent .futures
23import inspect
34import logging
45from collections .abc import Callable , Coroutine
1112 AbortableAsyncResult ,
1213 AbortableTask ,
1314)
15+ from celery .exceptions import Ignore # type: ignore[import-untyped]
1416from pydantic import NonNegativeInt
1517from servicelib .async_utils import cancel_wait_task
1618
3234R = TypeVar ("R" )
3335
3436
37+ class TaskAbortedError (Exception ): ...
38+
39+
3540def _async_task_wrapper (
3641 app : Celery ,
3742) -> Callable [
@@ -44,7 +49,6 @@ def decorator(
4449 @wraps (coro )
4550 def wrapper (task : AbortableTask , * args : P .args , ** kwargs : P .kwargs ) -> R :
4651 fastapi_app = get_fastapi_app (app )
47- _logger .debug ("task id: %s" , task .request .id )
4852 # NOTE: task.request is a thread local object, so we need to pass the id explicitly
4953 assert task .request .id is not None # nosec
5054
@@ -56,7 +60,6 @@ async def run_task(task_id: TaskID) -> R:
5660 if AbortableAsyncResult (task_id ).is_aborted ():
5761 _logger .warning ("Task %s was aborted by user." , task_id )
5862 await cancel_wait_task (task_coro , max_delay = 5 ) # to constant
59- raise asyncio .CancelledError
6063 if task_coro .done ():
6164 break
6265
@@ -89,6 +92,9 @@ def decorator(
8992 def wrapper (task : AbortableTask , * args : P .args , ** kwargs : P .kwargs ) -> R :
9093 try :
9194 return func (task , * args , ** kwargs )
95+ except concurrent .futures .CancelledError as exc :
96+ _logger .warning ("Task %s was cancelled" , task .request .id )
97+ raise Ignore from exc
9298 except Exception as exc :
9399 if isinstance (exc , dont_autoretry_for ):
94100 _logger .debug ("Not retrying for exception %s" , type (exc ).__name__ )
0 commit comments