Skip to content

Commit 110c727

Browse files
committed
fix cancelling code
1 parent 2d632cc commit 110c727

File tree

8 files changed

+22
-26
lines changed

8 files changed

+22
-26
lines changed

packages/service-library/src/servicelib/async_utils.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import logging
33
from collections import deque
44
from collections.abc import Awaitable, Callable
5-
from contextlib import suppress
65
from dataclasses import dataclass
76
from functools import wraps
87
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
98

9+
from common_library.async_tools import cancel_wait_task
10+
1011
from . import tracing
1112
from .utils_profiling_middleware import dont_profile, is_profiling, profile_context
1213

@@ -54,9 +55,7 @@ async def _safe_cancel(context: Context) -> None:
5455
try:
5556
await context.in_queue.put(None)
5657
if context.task is not None:
57-
context.task.cancel()
58-
with suppress(asyncio.CancelledError):
59-
await context.task
58+
await cancel_wait_task(context.task, max_delay=None)
6059
except RuntimeError as e:
6160
if "Event loop is closed" in f"{e}":
6261
_logger.warning("event loop is closed and could not cancel %s", context)

packages/service-library/src/servicelib/deferred_tasks/_worker_tracker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ async def handle_run(
5757
result_to_return = TaskResultSuccess(value=task_result)
5858
except asyncio.CancelledError:
5959
result_to_return = TaskResultCancelledError()
60+
current_task = asyncio.current_task()
61+
assert current_task is not None # nosec
62+
if current_task.cancelling() > 0:
63+
# owner function is being cancelled -> propagate cancellation
64+
raise
6065
except Exception as e: # pylint:disable=broad-exception-caught
6166
result_to_return = TaskResultError(
6267
error=_format_exception(e),

packages/service-library/src/servicelib/fastapi/requests_decorators.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from functools import wraps
55
from typing import Any, Protocol
66

7+
from common_library.async_tools import cancel_wait_task
78
from fastapi import Request, status
89
from fastapi.exceptions import HTTPException
910

@@ -13,8 +14,7 @@
1314
class _HandlerWithRequestArg(Protocol):
1415
__name__: str
1516

16-
async def __call__(self, request: Request, *args: Any, **kwargs: Any) -> Any:
17-
...
17+
async def __call__(self, request: Request, *args: Any, **kwargs: Any) -> Any: ...
1818

1919

2020
def _validate_signature(handler: _HandlerWithRequestArg):
@@ -75,13 +75,8 @@ async def wrapper(request: Request, *args, **kwargs):
7575

7676
# One has completed, cancel the other
7777
for t in pending:
78-
t.cancel()
79-
8078
try:
81-
await asyncio.wait_for(t, timeout=3)
82-
83-
except asyncio.CancelledError:
84-
pass
79+
await cancel_wait_task(t, max_delay=3)
8580
except Exception: # pylint: disable=broad-except
8681
if t is handler_task:
8782
raise

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,11 @@ async def _tasks_monitor(self) -> None:
335335
result_field = ResultField(
336336
str_error=dumps(TaskCancelledError(task_id=task_id))
337337
)
338+
current_task = asyncio.current_task()
339+
assert current_task is not None # nosec
340+
if current_task.cancelling() > 0:
341+
# owner function is being cancelled -> propagate cancellation
342+
raise
338343
except Exception as e: # pylint:disable=broad-except
339344
allowed_errors = TaskRegistry.get_allowed_errors(
340345
task_data.registered_task_name

packages/service-library/src/servicelib/utils.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" General utils
1+
"""General utils
22
33
IMPORTANT: lowest level module
44
I order to avoid cyclic dependences, please
@@ -245,7 +245,7 @@ async def limited_as_completed(
245245
future.set_name(f"{tasks_group_prefix}-{future.get_name()}")
246246
pending_futures.add(future)
247247

248-
except (StopIteration, StopAsyncIteration): # noqa: PERF203
248+
except (StopIteration, StopAsyncIteration):
249249
completed_all_awaitables = True
250250
if not pending_futures:
251251
return
@@ -294,8 +294,7 @@ async def limited_gather(
294294
log: logging.Logger = _DEFAULT_LOGGER,
295295
limit: int = _DEFAULT_LIMITED_CONCURRENCY,
296296
tasks_group_prefix: str | None = None,
297-
) -> list[T]:
298-
...
297+
) -> list[T]: ...
299298

300299

301300
@overload
@@ -305,8 +304,7 @@ async def limited_gather(
305304
log: logging.Logger = _DEFAULT_LOGGER,
306305
limit: int = _DEFAULT_LIMITED_CONCURRENCY,
307306
tasks_group_prefix: str | None = None,
308-
) -> list[T | BaseException]:
309-
...
307+
) -> list[T | BaseException]: ...
310308

311309

312310
async def limited_gather(

services/catalog/src/simcore_service_catalog/core/background_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async def _sync_services_task(app: FastAPI) -> None:
203203

204204
await asyncio.sleep(app.state.settings.CATALOG_BACKGROUND_TASK_REST_TIME)
205205

206-
except asyncio.CancelledError: # noqa: PERF203
206+
except asyncio.CancelledError:
207207
# task is stopped
208208
_logger.info("registry syncing task cancelled")
209209
raise

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# pylint: disable=relative-beyond-top-level
22

3-
import asyncio
43
import logging
54
from copy import deepcopy
65
from math import floor
@@ -138,8 +137,6 @@ async def observing_single_service(
138137
try:
139138
await _apply_observation_cycle(scheduler, scheduler_data)
140139
logger.debug("completed observation cycle of %s", f"{service_name=}")
141-
except asyncio.CancelledError: # pylint: disable=try-except-raise
142-
raise # pragma: no cover
143140
except Exception as exc: # pylint: disable=broad-except
144141
service_name = scheduler_data.service_name
145142

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
"""
1515

1616
import asyncio
17-
import contextlib
1817
import functools
1918
import logging
2019
import time
@@ -131,9 +130,7 @@ async def shutdown(self) -> None:
131130
if self._trigger_observation_queue_task is not None:
132131
await self._trigger_observation_queue.put(None)
133132

134-
self._trigger_observation_queue_task.cancel()
135-
with contextlib.suppress(asyncio.CancelledError):
136-
await self._trigger_observation_queue_task
133+
await cancel_wait_task(self._trigger_observation_queue_task, max_delay=None)
137134
self._trigger_observation_queue_task = None
138135
self._trigger_observation_queue = Queue()
139136

0 commit comments

Comments
 (0)