diff --git a/packages/celery-library/src/celery_library/task.py b/packages/celery-library/src/celery_library/task.py index f14771cf207..075e10036bc 100644 --- a/packages/celery-library/src/celery_library/task.py +++ b/packages/celery-library/src/celery_library/task.py @@ -12,8 +12,8 @@ AbortableTask, ) from celery.exceptions import Ignore # type: ignore[import-untyped] +from common_library.async_tools import cancel_wait_task from pydantic import NonNegativeInt -from servicelib.async_utils import cancel_wait_task from servicelib.celery.models import TaskID from .errors import encode_celery_transferrable_error diff --git a/packages/common-library/src/common_library/async_tools.py b/packages/common-library/src/common_library/async_tools.py index 205de066851..8d4a276deab 100644 --- a/packages/common-library/src/common_library/async_tools.py +++ b/packages/common-library/src/common_library/async_tools.py @@ -1,9 +1,14 @@ import asyncio +import datetime import functools -from collections.abc import Awaitable, Callable +import logging +from collections.abc import Awaitable, Callable, Coroutine from concurrent.futures import Executor +from functools import wraps from inspect import isawaitable -from typing import ParamSpec, TypeVar, overload +from typing import Any, ParamSpec, TypeVar, overload + +_logger = logging.getLogger(__name__) R = TypeVar("R") P = ParamSpec("P") @@ -62,3 +67,69 @@ async def maybe_await( return await obj assert not isawaitable(obj) # nosec return obj + + +async def cancel_wait_task( + task: asyncio.Task, *, max_delay: float | None = None +) -> None: + """Cancels the given task and waits for it to complete + + Arguments: + task -- task to be canceled + + + Keyword Arguments: + max_delay -- duration (in seconds) to wait before giving + up the cancellation. This timeout should be an upper bound to the + time needed for the task to cleanup after being canceled and + avoids that the cancellation takes forever. If None the timeout is not + set. (default: {None}) + + Raises: + TimeoutError: raised if cannot cancel the task. + CancelledError: raised ONLY if owner is being cancelled. + """ + + cancelling = task.cancel() + if not cancelling: + return # task was alredy cancelled + + assert task.cancelling() # nosec + assert not task.cancelled() # nosec + + try: + + await asyncio.shield( + # NOTE shield ensures that cancellation of the caller function won't stop you + # from observing the cancellation/finalization of task. + asyncio.wait_for(task, timeout=max_delay) + ) + + except asyncio.CancelledError: + if not task.cancelled(): + # task owner function is being cancelled -> propagate cancellation + raise + + # else: task cancellation is complete, we can safely ignore it + _logger.debug( + "Task %s cancellation is complete", + task.get_name(), + ) + + +def delayed_start( + delay: datetime.timedelta, +) -> Callable[ + [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]] +]: + def _decorator( + func: Callable[P, Coroutine[Any, Any, R]], + ) -> Callable[P, Coroutine[Any, Any, R]]: + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + await asyncio.sleep(delay.total_seconds()) + return await func(*args, **kwargs) + + return _wrapper + + return _decorator diff --git a/packages/common-library/tests/test_async_tools.py b/packages/common-library/tests/test_async_tools.py index 850945d39b2..9e9e081056f 100644 --- a/packages/common-library/tests/test_async_tools.py +++ b/packages/common-library/tests/test_async_tools.py @@ -1,9 +1,16 @@ import asyncio +import time from concurrent.futures import ThreadPoolExecutor +from datetime import timedelta from typing import Any import pytest -from common_library.async_tools import make_async, maybe_await +from common_library.async_tools import ( + cancel_wait_task, + delayed_start, + make_async, + maybe_await, +) @make_async() @@ -13,7 +20,8 @@ def sync_function(x: int, y: int) -> int: @make_async() def sync_function_with_exception() -> None: - raise ValueError("This is an error!") + msg = "This is an error!" + raise ValueError(msg) @pytest.mark.asyncio @@ -93,3 +101,118 @@ def fetchone(self) -> Any: # pylint: disable=no-self-use sync_result = await maybe_await(SyncResultProxy().fetchone()) assert sync_result == {"id": 2, "name": "test2"} + + +async def test_cancel_and_wait(): + state = {"started": False, "cancelled": False, "cleaned_up": False} + SLEEP_TIME = 5 # seconds + + async def coro(): + try: + state["started"] = True + await asyncio.sleep(SLEEP_TIME) + except asyncio.CancelledError: + state["cancelled"] = True + raise + finally: + state["cleaned_up"] = True + + task = asyncio.create_task(coro()) + await asyncio.sleep(0.1) # Let coro start + + start = time.time() + await cancel_wait_task(task) + + elapsed = time.time() - start + assert elapsed < SLEEP_TIME, "Task should be cancelled quickly" + assert task.done() + assert task.cancelled() + assert state["started"] + assert state["cancelled"] + assert state["cleaned_up"] + + +async def test_cancel_and_wait_propagates_external_cancel(): + """ + This test ensures that if the caller of cancel_and_wait is cancelled, + the CancelledError is not swallowed. + """ + + async def coro(): + try: + await asyncio.sleep(4) + except asyncio.CancelledError: + await asyncio.sleep(1) # simulate cleanup + raise + + inner_task = asyncio.create_task(coro()) + + async def outer_coro(): + try: + await cancel_wait_task(inner_task) + except asyncio.CancelledError: + assert ( + not inner_task.cancelled() + ), "Internal Task DOES NOT RAISE CancelledError" + raise + + # Cancel the wrapper after a short delay + outer_task = asyncio.create_task(outer_coro()) + await asyncio.sleep(0.1) + outer_task.cancel() + + with pytest.raises(asyncio.CancelledError): + await outer_task + + # Ensure the task was cancelled + assert inner_task.cancelled() is False, "Task should not be cancelled initially" + + done_event = asyncio.Event() + + def on_done(_): + done_event.set() + + inner_task.add_done_callback(on_done) + await done_event.wait() + + +async def test_cancel_and_wait_timeout_on_slow_cleanup(): + """Test that cancel_and_wait raises TimeoutError when cleanup takes longer than max_delay""" + + CLEANUP_TIME = 2 # seconds + + async def slow_cleanup_coro(): + try: + await asyncio.sleep(10) # Long running task + except asyncio.CancelledError: + # Simulate slow cleanup that exceeds max_delay! + await asyncio.sleep(CLEANUP_TIME) + raise + + task = asyncio.create_task(slow_cleanup_coro()) + await asyncio.sleep(0.1) # Let the task start + + # Cancel with a max_delay shorter than cleanup time + with pytest.raises(TimeoutError): + await cancel_wait_task( + task, max_delay=CLEANUP_TIME / 10 + ) # 0.2 seconds < 2 seconds cleanup + + assert task.cancelled() + + +async def test_with_delay(): + @delayed_start(timedelta(seconds=0.2)) + async def decorated_awaitable() -> int: + return 42 + + assert await decorated_awaitable() == 42 + + async def another_awaitable() -> int: + return 42 + + decorated_another_awaitable = delayed_start(timedelta(seconds=0.2))( + another_awaitable + ) + + assert await decorated_another_awaitable() == 42 diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index c6466df0a70..d2c62ba55ff 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -1,9 +1,7 @@ import asyncio -import contextlib -import datetime import logging from collections import deque -from collections.abc import Awaitable, Callable, Coroutine +from collections.abc import Awaitable, Callable from contextlib import suppress from dataclasses import dataclass from functools import wraps @@ -212,40 +210,3 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None: return wrapper return decorator - - -def delayed_start( - delay: datetime.timedelta, -) -> Callable[ - [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]] -]: - def _decorator( - func: Callable[P, Coroutine[Any, Any, R]], - ) -> Callable[P, Coroutine[Any, Any, R]]: - @wraps(func) - async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - await asyncio.sleep(delay.total_seconds()) - return await func(*args, **kwargs) - - return _wrapper - - return _decorator - - -async def cancel_wait_task( - task: asyncio.Task, - *, - max_delay: float | None = None, -) -> None: - """Cancel a asyncio.Task and waits for it to finish. - - :param task: task to be canceled - :param max_delay: duration (in seconds) to wait before giving - up the cancellation. If None it waits forever. - :raises TimeoutError: raised if cannot cancel the task. - """ - - task.cancel() - async with asyncio.timeout(max_delay): - with contextlib.suppress(asyncio.CancelledError): - await task diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index 508f34b99ee..506dd314ea1 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -6,10 +6,10 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine from typing import Any, Final, ParamSpec, TypeVar +from common_library.async_tools import cancel_wait_task, delayed_start from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type from tenacity.wait import wait_fixed -from .async_utils import cancel_wait_task, delayed_start from .logging_utils import log_catch, log_context _logger = logging.getLogger(__name__) @@ -142,4 +142,4 @@ async def periodic_task( if asyncio_task is not None: # NOTE: this stopping is shielded to prevent the cancellation to propagate # into the stopping procedure - await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout)) + await cancel_wait_task(asyncio_task, max_delay=stop_timeout) diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index c89e94ce476..a6007e2059a 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -9,9 +9,9 @@ from typing import Any, Final, Protocol, TypeAlias from uuid import uuid4 +from common_library.async_tools import cancel_wait_task from models_library.api_schemas_long_running_tasks.base import TaskProgress from pydantic import PositiveFloat -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_catch diff --git a/packages/service-library/src/servicelib/redis/_client.py b/packages/service-library/src/servicelib/redis/_client.py index c2a08154110..e961a6a73e9 100644 --- a/packages/service-library/src/servicelib/redis/_client.py +++ b/packages/service-library/src/servicelib/redis/_client.py @@ -8,11 +8,11 @@ import redis.asyncio as aioredis import redis.exceptions +from common_library.async_tools import cancel_wait_task from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry from redis.backoff import ExponentialBackoff -from ..async_utils import cancel_wait_task from ..background_task import periodic from ..logging_utils import log_catch, log_context from ._constants import ( diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq_rpc_interfaces_async_jobs.py b/packages/service-library/tests/rabbitmq/test_rabbitmq_rpc_interfaces_async_jobs.py index 72ecc9a8aa6..40455ee6d7f 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq_rpc_interfaces_async_jobs.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq_rpc_interfaces_async_jobs.py @@ -4,6 +4,7 @@ from dataclasses import dataclass, field import pytest +from common_library.async_tools import cancel_wait_task from faker import Faker from models_library.api_schemas_rpc_async_jobs.async_jobs import ( AsyncJobGet, @@ -16,7 +17,6 @@ from models_library.progress_bar import ProgressReport from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace from pydantic import TypeAdapter -from servicelib.async_utils import cancel_wait_task from servicelib.rabbitmq import RabbitMQRPCClient, RemoteMethodNotRegisteredError from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( list_jobs, diff --git a/packages/service-library/tests/redis/test_project_lock.py b/packages/service-library/tests/redis/test_project_lock.py index aa9d7fd1c74..03fe4f0e462 100644 --- a/packages/service-library/tests/redis/test_project_lock.py +++ b/packages/service-library/tests/redis/test_project_lock.py @@ -10,11 +10,11 @@ from uuid import UUID import pytest +from common_library.async_tools import cancel_wait_task from faker import Faker from models_library.projects import ProjectID from models_library.projects_access import Owner from models_library.projects_state import ProjectLocked, ProjectStatus -from servicelib.async_utils import cancel_wait_task from servicelib.redis import ( ProjectLockError, RedisClientSDK, diff --git a/packages/service-library/tests/test_async_utils.py b/packages/service-library/tests/test_async_utils.py index 9bb1b4fff45..e7164417fc6 100644 --- a/packages/service-library/tests/test_async_utils.py +++ b/packages/service-library/tests/test_async_utils.py @@ -7,7 +7,6 @@ import random from collections import deque from dataclasses import dataclass -from datetime import timedelta from time import time from typing import Any @@ -15,7 +14,6 @@ from faker import Faker from servicelib.async_utils import ( _sequential_jobs_contexts, - delayed_start, run_sequentially_in_context, ) @@ -225,20 +223,3 @@ async def test_multiple_context_calls(context_param: int) -> int: assert i == await test_multiple_context_calls(i) assert len(_sequential_jobs_contexts) == RETRIES - - -async def test_with_delay(): - @delayed_start(timedelta(seconds=0.2)) - async def decorated_awaitable() -> int: - return 42 - - assert await decorated_awaitable() == 42 - - async def another_awaitable() -> int: - return 42 - - decorated_another_awaitable = delayed_start(timedelta(seconds=0.2))( - another_awaitable - ) - - assert await decorated_another_awaitable() == 42 diff --git a/packages/service-library/tests/test_background_task.py b/packages/service-library/tests/test_background_task.py index 8c508bf8979..715aaec92b6 100644 --- a/packages/service-library/tests/test_background_task.py +++ b/packages/service-library/tests/test_background_task.py @@ -13,9 +13,9 @@ from unittest.mock import AsyncMock import pytest +from common_library.async_tools import cancel_wait_task from faker import Faker from pytest_mock.plugin import MockerFixture -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task, periodic, periodic_task pytest_simcore_core_services_selection = [ diff --git a/packages/service-library/tests/test_background_task_utils.py b/packages/service-library/tests/test_background_task_utils.py index 9a03a6c3541..92d0337eca7 100644 --- a/packages/service-library/tests/test_background_task_utils.py +++ b/packages/service-library/tests/test_background_task_utils.py @@ -13,7 +13,7 @@ import arrow import pytest -from servicelib.async_utils import cancel_wait_task +from common_library.async_tools import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.redis import RedisClientSDK from settings_library.redis import RedisDatabase diff --git a/services/agent/src/simcore_service_agent/services/volumes_manager.py b/services/agent/src/simcore_service_agent/services/volumes_manager.py index 860ab86d0e2..1ef6ef1d0cb 100644 --- a/services/agent/src/simcore_service_agent/services/volumes_manager.py +++ b/services/agent/src/simcore_service_agent/services/volumes_manager.py @@ -6,10 +6,10 @@ import arrow from aiodocker.docker import Docker +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.projects_nodes_io import NodeID from pydantic import NonNegativeFloat -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.fastapi.app_state import SingletonInAppStateMixin from servicelib.logging_utils import log_context diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index f19bac34a76..68bef9b369b 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -4,10 +4,10 @@ from datetime import timedelta from typing import Final, cast +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from prometheus_client import CollectorRegistry, Gauge from pydantic import PositiveInt -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.fastapi.monitoring import ( setup_prometheus_instrumentation as setup_rest_instrumentation, diff --git a/services/api-server/src/simcore_service_api_server/core/health_checker.py b/services/api-server/src/simcore_service_api_server/core/health_checker.py index b5a5180b12b..1e882569f39 100644 --- a/services/api-server/src/simcore_service_api_server/core/health_checker.py +++ b/services/api-server/src/simcore_service_api_server/core/health_checker.py @@ -5,11 +5,11 @@ from typing import Annotated, Final, cast from uuid import uuid4 +from common_library.async_tools import cancel_wait_task from fastapi import Depends, FastAPI from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.users import UserID from pydantic import NonNegativeInt, PositiveFloat, PositiveInt -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.fastapi.dependencies import get_app from servicelib.logging_utils import log_catch @@ -95,7 +95,7 @@ async def _background_task_method(self): self._dummy_queue.get(), timeout=self._timeout_seconds ) self._health_check_failure_count = 0 - except asyncio.TimeoutError: + except TimeoutError: self._increment_health_check_failure_count() diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py index 5ebc6a190f8..b3d5ab8e193 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py @@ -2,8 +2,8 @@ from collections.abc import Awaitable, Callable from typing import Final +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py index 2985e2ffcc4..625c0170476 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py @@ -2,8 +2,8 @@ from collections.abc import Awaitable, Callable from typing import Final +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py index c540d7b160f..70e80c550f8 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py @@ -2,8 +2,8 @@ import logging from collections.abc import Awaitable, Callable +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py index ba4936284d7..ef288fea483 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py @@ -6,7 +6,7 @@ from typing import Final import distributed -from servicelib.async_utils import cancel_wait_task +from common_library.async_tools import cancel_wait_task from servicelib.logging_utils import log_catch, log_context from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive from servicelib.rabbitmq._models import RabbitMessage diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py index 09478d4d02e..430bb4a871e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py @@ -2,10 +2,10 @@ from typing import Final import networkx as nx +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.projects import ProjectID from models_library.users import UserID -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.exception_utils import suppress_exceptions from servicelib.logging_utils import log_context diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index ea36680240d..d6618d6cbfd 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -23,6 +23,7 @@ from typing import Final import arrow +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceCreate, @@ -41,7 +42,6 @@ from models_library.users import UserID from models_library.wallets import WalletID from pydantic import NonNegativeFloat -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress from servicelib.redis import RedisClientsManager, exclusive diff --git a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py index ea685777a0d..25d5dca72f3 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py @@ -4,8 +4,8 @@ from datetime import timedelta from typing import Final, Generic, TypeVar +from common_library.async_tools import cancel_wait_task from pydantic import NonNegativeInt -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_catch, log_context from servicelib.redis import RedisClientSDK diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 56b5d812f8c..177460f915c 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -8,9 +8,9 @@ import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] +from common_library.async_tools import cancel_wait_task from common_library.json_serialization import json_loads from fastapi import FastAPI, status -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.fastapi.client_session import get_client_session from servicelib.logging_utils import log_catch, log_context diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py index 750b0dbdc63..5e05384c990 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py @@ -5,10 +5,10 @@ from typing import Final import arrow +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.projects_nodes_io import NodeID from pydantic import NonNegativeFloat, NonNegativeInt -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.utils import limited_gather from settings_library.redis import RedisDatabase @@ -79,9 +79,9 @@ async def _worker_check_services_require_status_update(self) -> None: # NOTE: this worker runs on only once across all instances of the scheduler - models: dict[ - NodeID, TrackedServiceModel - ] = await service_tracker.get_all_tracked_services(self.app) + models: dict[NodeID, TrackedServiceModel] = ( + await service_tracker.get_all_tracked_services(self.app) + ) to_remove: list[NodeID] = [] to_start: list[NodeID] = [] diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index f29f26358e2..367d5dc224a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -6,12 +6,12 @@ from datetime import timedelta from functools import partial +from common_library.async_tools import cancel_wait_task from common_library.errors_classes import OsparcErrorMixin from fastapi import FastAPI from models_library.rabbitmq_messages import ProgressType from pydantic import PositiveFloat from servicelib import progress_bar -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_catch, log_context from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py index eb7ad93ed9e..7a6c47f6649 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py @@ -6,10 +6,10 @@ from typing import Final import arrow +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI, status from models_library.callbacks_mapping import CallbacksMapping, UserServiceCommand from pydantic import BaseModel, NonNegativeFloat, NonNegativeInt -from servicelib.async_utils import cancel_wait_task from servicelib.logging_utils import log_context from servicelib.sequences_utils import pairwise from simcore_service_dynamic_sidecar.core.errors import ( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index eecbfd2089e..d61402e6d28 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -2,6 +2,7 @@ import logging from typing import Final +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.generated_models.docker_rest_api import ContainerState from models_library.rabbitmq_messages import ( @@ -14,7 +15,6 @@ from models_library.services import ServiceType from models_library.services_creation import CreateServiceMetricsAdditionalParams from pydantic import NonNegativeFloat -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_context diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py index d2148842ef5..fc6941d1a54 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py @@ -7,6 +7,7 @@ from typing import Final import psutil +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.api_schemas_dynamic_sidecar.telemetry import ( DiskUsage, @@ -14,7 +15,6 @@ ) from models_library.projects_nodes_io import NodeID from models_library.users import UserID -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_context from servicelib.utils import logged_gather diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py index e1480f84b20..ae88f0fdb84 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -3,8 +3,8 @@ from collections.abc import Awaitable, Callable from datetime import timedelta +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_catch, log_context diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py index a38411f56a1..5ca03f7bd9e 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py @@ -1,8 +1,8 @@ import logging from collections.abc import Awaitable, Callable +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.logging_utils import log_catch, log_context _logger = logging.getLogger(__name__) diff --git a/services/notifications/src/simcore_service_notifications/clients/postgres/_liveness.py b/services/notifications/src/simcore_service_notifications/clients/postgres/_liveness.py index 57bc7a40076..6d26fd83e93 100644 --- a/services/notifications/src/simcore_service_notifications/clients/postgres/_liveness.py +++ b/services/notifications/src/simcore_service_notifications/clients/postgres/_liveness.py @@ -3,9 +3,9 @@ from datetime import timedelta from typing import Final +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from models_library.healthchecks import IsResponsive, LivenessResult -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.db_asyncpg_utils import check_postgres_liveness from servicelib.fastapi.db_asyncpg_engine import get_engine diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py index abaefe1e9b7..a747cd5d476 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py @@ -3,8 +3,8 @@ from collections.abc import Awaitable, Callable from typing import TypedDict +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_catch, log_context diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/fire_and_forget_setup.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/fire_and_forget_setup.py index 2523a069974..a1e7db5ac30 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/fire_and_forget_setup.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/fire_and_forget_setup.py @@ -1,8 +1,8 @@ import logging from collections.abc import Awaitable, Callable +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.logging_utils import log_catch, log_context _logger = logging.getLogger(__name__) diff --git a/services/storage/src/simcore_service_storage/dsm_cleaner.py b/services/storage/src/simcore_service_storage/dsm_cleaner.py index d09c83e4f5d..6194d61a835 100644 --- a/services/storage/src/simcore_service_storage/dsm_cleaner.py +++ b/services/storage/src/simcore_service_storage/dsm_cleaner.py @@ -23,8 +23,8 @@ from datetime import timedelta from typing import cast +from common_library.async_tools import cancel_wait_task from fastapi import FastAPI -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_context diff --git a/services/storage/src/simcore_service_storage/utils/s3_utils.py b/services/storage/src/simcore_service_storage/utils/s3_utils.py index 3fcb17d0c45..0ebec26a6bb 100644 --- a/services/storage/src/simcore_service_storage/utils/s3_utils.py +++ b/services/storage/src/simcore_service_storage/utils/s3_utils.py @@ -4,8 +4,8 @@ from collections import defaultdict from dataclasses import dataclass, field +from common_library.async_tools import cancel_wait_task from pydantic import ByteSize, TypeAdapter -from servicelib.async_utils import cancel_wait_task from servicelib.background_task import create_periodic_task from servicelib.progress_bar import ProgressBarData diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py index b992d25b387..73c2d570ebf 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_api_keys.py @@ -8,6 +8,7 @@ from collections.abc import AsyncIterator, Callable from aiohttp import web +from common_library.async_tools import cancel_wait_task from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.wait import wait_exponential @@ -67,10 +68,6 @@ async def _cleanup_ctx_fun( yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py index dfb7237d97f..255c9d006dc 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_core.py @@ -1,4 +1,4 @@ -""" Setup and running of periodic background task +"""Setup and running of periodic background task Specifics of the gc implementation should go into garbage_collector_core.py @@ -9,6 +9,7 @@ from collections.abc import AsyncGenerator from aiohttp import web +from common_library.async_tools import cancel_wait_task from servicelib.logging_utils import log_context from ._core import collect_garbage @@ -49,18 +50,14 @@ async def run_background_task(app: web.Application) -> AsyncGenerator: # TEAR-DOWN ----- # controlled cancelation of the gc task - try: - _logger.info("Stopping garbage collector...") + _logger.info("Stopping garbage collector...") - ack = gc_bg_task.cancel() - assert ack # nosec + ack = gc_bg_task.cancel() + assert ack # nosec - app[_GC_TASK_CONFIG]["force_stop"] = True + app[_GC_TASK_CONFIG]["force_stop"] = True - await gc_bg_task - - except asyncio.CancelledError: - assert gc_bg_task.cancelled() # nosec + await cancel_wait_task(gc_bg_task) async def _collect_garbage_periodically(app: web.Application): diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py index 46df72c0a70..121b8b79ee0 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_trash.py @@ -1,5 +1,5 @@ """ - Scheduled tasks addressing users +Scheduled tasks addressing users """ @@ -8,6 +8,7 @@ from collections.abc import AsyncIterator, Callable from aiohttp import web +from common_library.async_tools import cancel_wait_task from servicelib.logging_utils import log_context from tenacity import retry from tenacity.before_sleep import before_sleep_log @@ -55,10 +56,6 @@ async def _cleanup_ctx_fun( yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 3b834c71ab7..26cdb9e053c 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -8,6 +8,7 @@ from collections.abc import AsyncIterator, Callable from aiohttp import web +from common_library.async_tools import cancel_wait_task from models_library.users import UserID from servicelib.logging_utils import get_log_record_extra, log_context from tenacity import retry @@ -107,10 +108,6 @@ async def _cleanup_ctx_fun( yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/licenses/_itis_vip_syncer_service.py b/services/web/server/src/simcore_service_webserver/licenses/_itis_vip_syncer_service.py index 5f04cea6a3c..7cca49731e0 100644 --- a/services/web/server/src/simcore_service_webserver/licenses/_itis_vip_syncer_service.py +++ b/services/web/server/src/simcore_service_webserver/licenses/_itis_vip_syncer_service.py @@ -4,9 +4,9 @@ from datetime import timedelta from aiohttp import web +from common_library.async_tools import cancel_wait_task from httpx import AsyncClient from models_library.licenses import LicensedResourceType -from servicelib.async_utils import cancel_wait_task from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_catch, log_context diff --git a/services/web/server/src/simcore_service_webserver/payments/_tasks.py b/services/web/server/src/simcore_service_webserver/payments/_tasks.py index b87465f5f3e..5e6cd74c05b 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_tasks.py +++ b/services/web/server/src/simcore_service_webserver/payments/_tasks.py @@ -5,6 +5,7 @@ from typing import Any from aiohttp import web +from common_library.async_tools import cancel_wait_task from models_library.api_schemas_webserver.wallets import PaymentID, PaymentMethodID from pydantic import HttpUrl, TypeAdapter from servicelib.aiohttp.typing_extension import CleanupContextFunc @@ -143,10 +144,6 @@ async def _cleanup_ctx_fun( yield # tear-down - task.cancel() - try: - await task - except asyncio.CancelledError: - assert task.cancelled() # nosec + await cancel_wait_task(task) return _cleanup_ctx_fun