Skip to content

Commit 70c353e

Browse files
committed
@sanderegg review: replace cancel_wait_task by cancel_and_wait
1 parent f10be68 commit 70c353e

File tree

34 files changed

+85
-91
lines changed

34 files changed

+85
-91
lines changed

packages/celery-library/src/celery_library/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
AbortableTask,
1313
)
1414
from celery.exceptions import Ignore # type: ignore[import-untyped]
15+
from common_library.async_tools import cancel_and_wait
1516
from pydantic import NonNegativeInt
16-
from servicelib.async_utils import cancel_wait_task
1717
from servicelib.celery.models import TaskID
1818

1919
from .errors import encode_celery_transferrable_error
@@ -62,7 +62,7 @@ async def abort_monitor():
6262
abortable_result = AbortableAsyncResult(task_id, app=app)
6363
while not main_task.done():
6464
if abortable_result.is_aborted():
65-
await cancel_wait_task(
65+
await cancel_and_wait(
6666
main_task,
6767
max_delay=_DEFAULT_CANCEL_TASK_TIMEOUT.total_seconds(),
6868
)

packages/common-library/src/common_library/async_tools.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,29 @@ async def maybe_await(
6767
return obj
6868

6969

70-
async def cancel_and_wait(task: asyncio.Task) -> None:
70+
async def cancel_and_wait(
71+
task: asyncio.Task, *, max_delay: float | None = None
72+
) -> None:
7173
"""Cancels the given task and waits for it to complete.
7274
75+
76+
:param task: task to be canceled
77+
:param max_delay: duration (in seconds) to wait before giving
78+
up the cancellation. If None it waits forever.
79+
:raises TimeoutError: raised if cannot cancel the task.
80+
7381
Accounts for the case where the tasks's owner function is being cancelled.
7482
In that case, it propagates the cancellation exception upstream.
83+
7584
"""
85+
7686
task.cancel()
87+
7788
try:
78-
# NOTE shield ensures that cancellation of the caller function won’t stop you
89+
90+
# NOTE shield ensures that cancellation of the caller function won't stop you
7991
# from observing the cancellation/finalization of task.
80-
await asyncio.shield(task)
92+
await asyncio.shield(asyncio.wait_for(task, timeout=max_delay))
8193

8294
except asyncio.CancelledError:
8395
if not task.cancelled():

packages/service-library/src/servicelib/async_utils.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from functools import wraps
99
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
1010

11-
from common_library.async_tools import cancel_and_wait
12-
1311
from . import tracing
1412
from .utils_profiling_middleware import dont_profile, is_profiling, profile_context
1513

@@ -231,19 +229,3 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
231229
return _wrapper
232230

233231
return _decorator
234-
235-
236-
async def cancel_wait_task(
237-
task: asyncio.Task,
238-
*,
239-
max_delay: float | None = None,
240-
) -> None:
241-
"""Cancel a asyncio.Task and waits for it to finish.
242-
243-
:param task: task to be canceled
244-
:param max_delay: duration (in seconds) to wait before giving
245-
up the cancellation. If None it waits forever.
246-
:raises TimeoutError: raised if cannot cancel the task.
247-
"""
248-
async with asyncio.timeout(max_delay):
249-
await cancel_and_wait(task)

packages/service-library/src/servicelib/background_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
1010
from tenacity.wait import wait_fixed
1111

12-
from .async_utils import cancel_wait_task, delayed_start
12+
from .async_utils import cancel_and_wait, delayed_start
1313
from .logging_utils import log_catch, log_context
1414

1515
_logger = logging.getLogger(__name__)
@@ -142,4 +142,4 @@ async def periodic_task(
142142
if asyncio_task is not None:
143143
# NOTE: this stopping is shielded to prevent the cancellation to propagate
144144
# into the stopping procedure
145-
await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout))
145+
await asyncio.shield(cancel_and_wait(asyncio_task, max_delay=stop_timeout))

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
from typing import Any, Final, Protocol, TypeAlias
1010
from uuid import uuid4
1111

12+
from common_library.async_tools import cancel_and_wait
1213
from models_library.api_schemas_long_running_tasks.base import TaskProgress
1314
from pydantic import PositiveFloat
14-
from servicelib.async_utils import cancel_wait_task
1515
from servicelib.background_task import create_periodic_task
1616
from servicelib.logging_utils import log_catch
1717

@@ -105,7 +105,7 @@ async def teardown(self) -> None:
105105

106106
if self._stale_tasks_monitor_task:
107107
with log_catch(_logger, reraise=False):
108-
await cancel_wait_task(
108+
await cancel_and_wait(
109109
self._stale_tasks_monitor_task, max_delay=_CANCEL_TASK_TIMEOUT
110110
)
111111

packages/service-library/src/servicelib/redis/_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from redis.asyncio.retry import Retry
1313
from redis.backoff import ExponentialBackoff
1414

15-
from ..async_utils import cancel_wait_task
15+
from ..async_utils import cancel_and_wait
1616
from ..background_task import periodic
1717
from ..logging_utils import log_catch, log_context
1818
from ._constants import (
@@ -88,7 +88,7 @@ async def shutdown(self) -> None:
8888
assert self._health_check_task_started_event # nosec
8989
# NOTE: wait for the health check task to have started once before we can cancel it
9090
await self._health_check_task_started_event.wait()
91-
await cancel_wait_task(
91+
await cancel_and_wait(
9292
self._health_check_task, max_delay=_HEALTHCHECK_TASK_TIMEOUT_S
9393
)
9494

packages/service-library/tests/rabbitmq/test_rabbitmq_rpc_interfaces_async_jobs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dataclasses import dataclass, field
55

66
import pytest
7+
from common_library.async_tools import cancel_and_wait
78
from faker import Faker
89
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
910
AsyncJobGet,
@@ -16,7 +17,6 @@
1617
from models_library.progress_bar import ProgressReport
1718
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
1819
from pydantic import TypeAdapter
19-
from servicelib.async_utils import cancel_wait_task
2020
from servicelib.rabbitmq import RabbitMQRPCClient, RemoteMethodNotRegisteredError
2121
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import (
2222
list_jobs,
@@ -137,7 +137,7 @@ async def setup(self) -> None:
137137
yield
138138

139139
for task in fake_server.tasks:
140-
await cancel_wait_task(task)
140+
await cancel_and_wait(task)
141141

142142

143143
@pytest.mark.parametrize("method", ["result", "status", "cancel"])

packages/service-library/tests/redis/test_project_lock.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
from uuid import UUID
1111

1212
import pytest
13+
from common_library.async_tools import cancel_and_wait
1314
from faker import Faker
1415
from models_library.projects import ProjectID
1516
from models_library.projects_access import Owner
1617
from models_library.projects_state import ProjectLocked, ProjectStatus
17-
from servicelib.async_utils import cancel_wait_task
1818
from servicelib.redis import (
1919
ProjectLockError,
2020
RedisClientSDK,
@@ -141,4 +141,4 @@ async def _locked_fct() -> None:
141141
with pytest.raises(ProjectLockError):
142142
await _locked_fct()
143143

144-
await cancel_wait_task(task1)
144+
await cancel_and_wait(task1)

packages/service-library/tests/test_background_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
from unittest.mock import AsyncMock
1414

1515
import pytest
16+
from common_library.async_tools import cancel_and_wait
1617
from faker import Faker
1718
from pytest_mock.plugin import MockerFixture
18-
from servicelib.async_utils import cancel_wait_task
1919
from servicelib.background_task import create_periodic_task, periodic, periodic_task
2020

2121
pytest_simcore_core_services_selection = [
@@ -78,7 +78,7 @@ async def _creator(
7878
yield _creator
7979
# cleanup
8080
await asyncio.gather(
81-
*(cancel_wait_task(t, max_delay=stop_task_timeout) for t in created_tasks)
81+
*(cancel_and_wait(t, max_delay=stop_task_timeout) for t in created_tasks)
8282
)
8383

8484

packages/service-library/tests/test_background_task_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import arrow
1515
import pytest
16-
from servicelib.async_utils import cancel_wait_task
16+
from common_library.async_tools import cancel_and_wait
1717
from servicelib.background_task_utils import exclusive_periodic
1818
from servicelib.redis import RedisClientSDK
1919
from settings_library.redis import RedisDatabase
@@ -71,7 +71,7 @@ async def _sleep_task(sleep_interval: float, on_sleep_events: mock.Mock) -> None
7171

7272
await _assert_on_sleep_done(sleep_events, stop_after=stop_after)
7373

74-
await cancel_wait_task(task, max_delay=5)
74+
await cancel_and_wait(task, max_delay=5)
7575

7676
events_timestamps: tuple[float, ...] = tuple(
7777
x.args[0].timestamp() for x in sleep_events.call_args_list

0 commit comments

Comments
 (0)