Skip to content

Commit 0eaadfd

Browse files
committed
refactor
1 parent 0b59b6f commit 0eaadfd

File tree

4 files changed

+40
-47
lines changed

4 files changed

+40
-47
lines changed

packages/service-library/src/servicelib/async_utils.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import asyncio
2+
import contextlib
23
import datetime
34
import logging
45
from collections import deque
56
from collections.abc import Awaitable, Callable, Coroutine
67
from contextlib import suppress
78
from dataclasses import dataclass
89
from functools import wraps
9-
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
10+
from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypeVar
11+
12+
from tenacity import before_sleep_log, retry, stop_after_attempt
1013

1114
from . import tracing
1215
from .utils_profiling_middleware import dont_profile, is_profiling, profile_context
@@ -229,3 +232,32 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
229232
return wrapper
230233

231234
return decorator
235+
236+
237+
_MAX_TASK_CANCELLATION_ATTEMPTS: Final[int] = 3
238+
239+
240+
@retry(
241+
reraise=True,
242+
stop=stop_after_attempt(_MAX_TASK_CANCELLATION_ATTEMPTS),
243+
before_sleep=before_sleep_log(_logger, logging.WARNING, exc_info=True),
244+
)
245+
async def retried_cancel_task(
246+
task: asyncio.Task,
247+
*,
248+
timeout: float | None, # noqa: ASYNC109
249+
) -> None:
250+
"""Reliable task cancellation. Some libraries will just hang without
251+
cancelling the task. The operation is retried _MAX_TASK_CANCELLATION_ATTEMPTS times
252+
in case of timeout.
253+
254+
:param task: task to be canceled
255+
:param timeout_delay: duration (in seconds) to wait before giving
256+
up the cancellation. If None it waits forever. Will be repeated _MAX_TASK_CANCELLATION_ATTEMPTS times.
257+
:raises TimeoutError: raised if cannot cancel the task. Errors during cancellations are suppressed (backwards compatibility).
258+
"""
259+
260+
task.cancel()
261+
async with asyncio.timeout(timeout):
262+
with contextlib.suppress(BaseException):
263+
await task

packages/service-library/src/servicelib/background_task.py

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,17 @@
55
from collections.abc import AsyncIterator, Awaitable, Callable
66
from typing import Final
77

8-
from common_library.errors_classes import OsparcErrorMixin
98
from tenacity import TryAgain, retry_always, retry_if_exception_type
109
from tenacity.asyncio import AsyncRetrying
11-
from tenacity.stop import stop_after_attempt
1210
from tenacity.wait import wait_fixed
1311

14-
from .async_utils import with_delay
12+
from .async_utils import retried_cancel_task, with_delay
1513
from .logging_utils import log_catch, log_context
1614

1715
_logger = logging.getLogger(__name__)
1816

1917

2018
_DEFAULT_STOP_TIMEOUT_S: Final[int] = 5
21-
_MAX_TASK_CANCELLATION_ATTEMPTS: Final[int] = 3
22-
23-
24-
class PeriodicTaskCancellationError(OsparcErrorMixin, Exception):
25-
msg_template: str = "Could not cancel task '{task_name}'"
2619

2720

2821
class SleepUsingAsyncioEvent:
@@ -108,35 +101,6 @@ def start_periodic_task(
108101
)
109102

110103

111-
async def cancel_task(
112-
task: asyncio.Task,
113-
*,
114-
timeout: float | None,
115-
cancellation_attempts: int = _MAX_TASK_CANCELLATION_ATTEMPTS,
116-
) -> None:
117-
"""Reliable task cancellation. Some libraries will just hang without
118-
cancelling the task. It is important to retry the operation to provide
119-
a timeout in that situation to avoid forever pending tasks.
120-
121-
:param task: task to be canceled
122-
:param timeout: total duration (in seconds) to wait before giving
123-
up the cancellation. If None it waits forever.
124-
:raises TryAgain: raised if cannot cancel the task.
125-
"""
126-
async for attempt in AsyncRetrying(
127-
stop=stop_after_attempt(cancellation_attempts), reraise=True
128-
):
129-
with attempt:
130-
task.cancel()
131-
_, pending = await asyncio.wait((task,), timeout=timeout)
132-
if pending:
133-
task_name = task.get_name()
134-
_logger.info(
135-
"tried to cancel '%s' but timed-out! %s", task_name, pending
136-
)
137-
raise PeriodicTaskCancellationError(task_name=task_name)
138-
139-
140104
async def stop_periodic_task(
141105
asyncio_task: asyncio.Task, *, timeout: float | None = None
142106
) -> None:
@@ -145,7 +109,7 @@ async def stop_periodic_task(
145109
logging.DEBUG,
146110
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
147111
):
148-
await cancel_task(asyncio_task, timeout=timeout)
112+
await retried_cancel_task(asyncio_task, timeout=timeout)
149113

150114

151115
@contextlib.asynccontextmanager

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,8 @@
4141
from models_library.users import UserID
4242
from models_library.wallets import WalletID
4343
from pydantic import NonNegativeFloat
44-
from servicelib.background_task import (
45-
cancel_task,
46-
start_periodic_task,
47-
stop_periodic_task,
48-
)
44+
from servicelib.async_utils import retried_cancel_task
45+
from servicelib.background_task import start_periodic_task, stop_periodic_task
4946
from servicelib.fastapi.long_running_tasks.client import ProgressCallback
5047
from servicelib.fastapi.long_running_tasks.server import TaskProgress
5148
from servicelib.redis import RedisClientsManager, exclusive
@@ -368,7 +365,7 @@ async def mark_service_for_removal(
368365
self._service_observation_task[service_name]
369366
)
370367
if isinstance(service_task, asyncio.Task):
371-
await cancel_task(service_task, timeout=10)
368+
await retried_cancel_task(service_task, timeout=10)
372369

373370
if skip_observation_recreation:
374371
return

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from fastapi import FastAPI, status
1010
from models_library.callbacks_mapping import CallbacksMapping, UserServiceCommand
1111
from pydantic import BaseModel, NonNegativeFloat, NonNegativeInt
12-
from servicelib.background_task import cancel_task
12+
from servicelib.async_utils import retried_cancel_task
1313
from servicelib.logging_utils import log_context
1414
from servicelib.sequences_utils import pairwise
1515
from simcore_service_dynamic_sidecar.core.errors import (
@@ -143,7 +143,7 @@ async def start(self) -> None:
143143
async def stop(self) -> None:
144144
with log_context(_logger, logging.INFO, "shutdown service metrics recovery"):
145145
if self._metrics_recovery_task:
146-
await cancel_task(
146+
await retried_cancel_task(
147147
self._metrics_recovery_task, timeout=_TASK_CANCELLATION_TIMEOUT_S
148148
)
149149

0 commit comments

Comments
 (0)