diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index fc99db45b89..1c6e033b867 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -65,6 +65,7 @@ def cancel_async_job( @router.get( "/tasks/{task_id}/result", + response_model=Any, name="get_task_result", description="Retrieves the result of a task", responses=_export_data_responses, diff --git a/api/specs/web-server/_long_running_tasks_legacy.py b/api/specs/web-server/_long_running_tasks_legacy.py index 89c85b0ff93..d17b9cceeed 100644 --- a/api/specs/web-server/_long_running_tasks_legacy.py +++ b/api/specs/web-server/_long_running_tasks_legacy.py @@ -4,7 +4,7 @@ # pylint: disable=too-many-arguments -from typing import Annotated +from typing import Annotated, Any from fastapi import APIRouter, Depends, status from models_library.generics import Envelope @@ -54,6 +54,7 @@ def cancel_and_delete_task( @router.get( "/{task_id}/result", name="get_task_result", + response_model=Any, description="Retrieves the result of a task", ) def get_task_result( diff --git a/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py b/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py index acd73831b22..392d8841451 100644 --- a/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py +++ b/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py @@ -18,9 +18,12 @@ class TaskResult(BaseModel): error: Any | None -class TaskGet(BaseModel): +class TaskBase(BaseModel): task_id: TaskId task_name: str + + +class TaskGet(TaskBase): status_href: str result_href: str abort_href: str diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py index 79594cb18b8..fe38782e9ff 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py @@ -3,9 +3,9 @@ from pydantic import PositiveFloat MINUTE: Final[PositiveFloat] = 60 -APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[ - str -] = f"{__name__ }.long_running_tasks.tasks_manager" -RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[ - str -] = f"{__name__}.long_running_tasks.context" +APP_LONG_RUNNING_MANAGER_KEY: Final[str] = ( + f"{__name__ }.long_running_tasks.tasks_manager" +) +RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[str] = ( + f"{__name__}.long_running_tasks.context" +) diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_dependencies.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_dependencies.py deleted file mode 100644 index 4a8bc455cd8..00000000000 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_dependencies.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Any - -from aiohttp import web - -from ...long_running_tasks.task import TasksManager -from ._constants import ( - APP_LONG_RUNNING_TASKS_MANAGER_KEY, - RQT_LONG_RUNNING_TASKS_CONTEXT_KEY, -) - - -def get_tasks_manager(app: web.Application) -> TasksManager: - output: TasksManager = app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] - return output - - -def get_task_context(request: web.Request) -> dict[str, Any]: - output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] - return output - - -def create_task_name_from_request(request: web.Request) -> str: - return f"{request.method} {request.rel_url}" diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_manager.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_manager.py new file mode 100644 index 00000000000..aa1fc4ea09e --- /dev/null +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_manager.py @@ -0,0 +1,41 @@ +import datetime + +from aiohttp import web + +from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager +from ...long_running_tasks.task import TaskContext, TasksManager +from ._constants import APP_LONG_RUNNING_MANAGER_KEY +from ._request import get_task_context + + +class AiohttpLongRunningManager(BaseLongRunningManager): + def __init__( + self, + app: web.Application, + stale_task_check_interval: datetime.timedelta, + stale_task_detect_timeout: datetime.timedelta, + ): + self._app = app + self._tasks_manager = TasksManager( + stale_task_check_interval=stale_task_check_interval, + stale_task_detect_timeout=stale_task_detect_timeout, + ) + + @property + def tasks_manager(self) -> TasksManager: + return self._tasks_manager + + async def setup(self) -> None: + await self._tasks_manager.setup() + + async def teardown(self) -> None: + await self._tasks_manager.teardown() + + @staticmethod + def get_task_context(request: web.Request) -> TaskContext: + return get_task_context(request) + + +def get_long_running_manager(app: web.Application) -> AiohttpLongRunningManager: + output: AiohttpLongRunningManager = app[APP_LONG_RUNNING_MANAGER_KEY] + return output diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_request.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_request.py new file mode 100644 index 00000000000..0ccfd3c6a40 --- /dev/null +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_request.py @@ -0,0 +1,10 @@ +from typing import Any + +from aiohttp import web + +from ._constants import RQT_LONG_RUNNING_TASKS_CONTEXT_KEY + + +def get_task_context(request: web.Request) -> dict[str, Any]: + output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] + return output diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py index 2603b229f68..513203f6a1e 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py @@ -1,18 +1,15 @@ -import logging from typing import Any from aiohttp import web -from common_library.json_serialization import json_dumps from pydantic import BaseModel from servicelib.aiohttp import status -from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError +from ...long_running_tasks import http_endpoint_responses from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus -from ...long_running_tasks.task import TrackedTask from ..requests_validation import parse_request_path_parameters_as -from ._dependencies import get_task_context, get_tasks_manager +from ..rest_responses import create_data_response +from ._manager import get_long_running_manager -_logger = logging.getLogger(__name__) routes = web.RouteTableDef() @@ -22,79 +19,58 @@ class _PathParam(BaseModel): @routes.get("", name="list_tasks") async def list_tasks(request: web.Request) -> web.Response: - tasks_manager = get_tasks_manager(request.app) - task_context = get_task_context(request) - tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks( - with_task_context=task_context - ) - - return web.json_response( - { - "data": [ - TaskGet( - task_id=t.task_id, - task_name=t.task_name, - status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}", - result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}", - abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}", - ) - for t in tracked_tasks - ] - }, - dumps=json_dumps, + long_running_manager = get_long_running_manager(request.app) + return create_data_response( + [ + TaskGet( + task_id=t.task_id, + task_name=t.task_name, + status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}", + result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}", + abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}", + ) + for t in http_endpoint_responses.list_tasks( + long_running_manager.tasks_manager, + long_running_manager.get_task_context(request), + ) + ] ) @routes.get("/{task_id}", name="get_task_status") async def get_task_status(request: web.Request) -> web.Response: path_params = parse_request_path_parameters_as(_PathParam, request) - tasks_manager = get_tasks_manager(request.app) - task_context = get_task_context(request) + long_running_manager = get_long_running_manager(request.app) - task_status: TaskStatus = tasks_manager.get_task_status( - task_id=path_params.task_id, with_task_context=task_context + task_status: TaskStatus = http_endpoint_responses.get_task_status( + long_running_manager.tasks_manager, + long_running_manager.get_task_context(request), + path_params.task_id, ) - return web.json_response({"data": task_status}, dumps=json_dumps) + return create_data_response(task_status) @routes.get("/{task_id}/result", name="get_task_result") async def get_task_result(request: web.Request) -> web.Response | Any: path_params = parse_request_path_parameters_as(_PathParam, request) - tasks_manager = get_tasks_manager(request.app) - task_context = get_task_context(request) + long_running_manager = get_long_running_manager(request.app) # NOTE: this might raise an exception that will be catched by the _error_handlers - try: - task_result = tasks_manager.get_task_result( - task_id=path_params.task_id, with_task_context=task_context - ) - # NOTE: this will fail if the task failed for some reason.... - await tasks_manager.remove_task( - path_params.task_id, with_task_context=task_context, reraise_errors=False - ) - return task_result - except (TaskNotFoundError, TaskNotCompletedError): - raise - except Exception: - # the task shall be removed in this case - await tasks_manager.remove_task( - path_params.task_id, with_task_context=task_context, reraise_errors=False - ) - raise + return await http_endpoint_responses.get_task_result( + long_running_manager.tasks_manager, + long_running_manager.get_task_context(request), + path_params.task_id, + ) @routes.delete("/{task_id}", name="cancel_and_delete_task") async def cancel_and_delete_task(request: web.Request) -> web.Response: path_params = parse_request_path_parameters_as(_PathParam, request) - tasks_manager = get_tasks_manager(request.app) - task_context = get_task_context(request) - await tasks_manager.remove_task(path_params.task_id, with_task_context=task_context) - return web.json_response(status=status.HTTP_204_NO_CONTENT) + long_running_manager = get_long_running_manager(request.app) - -__all__: tuple[str, ...] = ( - "get_tasks_manager", - "TaskId", - "TaskGet", - "TaskStatus", -) + await http_endpoint_responses.remove_task( + long_running_manager.tasks_manager, + long_running_manager.get_task_context(request), + path_params.task_id, + ) + return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py index 40777376815..a68bb4a67b0 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py @@ -1,32 +1,28 @@ import asyncio -import logging +import datetime from collections.abc import AsyncGenerator, Callable from functools import wraps from typing import Any from aiohttp import web from common_library.json_serialization import json_dumps -from pydantic import AnyHttpUrl, PositiveFloat, TypeAdapter +from pydantic import AnyHttpUrl, TypeAdapter from ...aiohttp import status -from ...long_running_tasks.models import TaskGet -from ...long_running_tasks.task import ( - TaskContext, - TaskProtocol, - TasksManager, - start_task, +from ...long_running_tasks.constants import ( + DEFAULT_STALE_TASK_CHECK_INTERVAL, + DEFAULT_STALE_TASK_DETECT_TIMEOUT, ) +from ...long_running_tasks.models import TaskGet +from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task from ..typing_extension import Handler from . import _routes from ._constants import ( - APP_LONG_RUNNING_TASKS_MANAGER_KEY, - MINUTE, + APP_LONG_RUNNING_MANAGER_KEY, RQT_LONG_RUNNING_TASKS_CONTEXT_KEY, ) -from ._dependencies import create_task_name_from_request, get_tasks_manager from ._error_handlers import base_long_running_error_handler - -_logger = logging.getLogger(__name__) +from ._manager import AiohttpLongRunningManager, get_long_running_manager def no_ops_decorator(handler: Handler): @@ -42,6 +38,10 @@ async def _wrap(request: web.Request): return _wrap +def _create_task_name_from_request(request: web.Request) -> str: + return f"{request.method} {request.rel_url}" + + async def start_long_running_task( # NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys request_: web.Request, @@ -51,12 +51,12 @@ async def start_long_running_task( task_context: TaskContext, **task_kwargs: Any, ) -> web.Response: - task_manager = get_tasks_manager(request_.app) - task_name = create_task_name_from_request(request_) + long_running_manager = get_long_running_manager(request_.app) + task_name = _create_task_name_from_request(request_) task_id = None try: task_id = start_task( - task_manager, + long_running_manager.tasks_manager, task_, fire_and_forget=fire_and_forget, task_context=task_context, @@ -91,8 +91,10 @@ async def start_long_running_task( except asyncio.CancelledError: # cancel the task, the client has disconnected if task_id: - task_manager = get_tasks_manager(request_.app) - await task_manager.cancel_task(task_id, with_task_context=None) + long_running_manager = get_long_running_manager(request_.app) + await long_running_manager.tasks_manager.cancel_task( + task_id, with_task_context=None + ) raise @@ -121,8 +123,8 @@ def setup( router_prefix: str, handler_check_decorator: Callable = no_ops_decorator, task_request_context_decorator: Callable = no_task_context_decorator, - stale_task_check_interval_s: PositiveFloat = 1 * MINUTE, - stale_task_detect_timeout_s: PositiveFloat = 5 * MINUTE, + stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL, + stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT, ) -> None: """ - `router_prefix` APIs are mounted on `/...`, this @@ -135,21 +137,24 @@ def setup( """ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]: + # add error handlers + app.middlewares.append(base_long_running_error_handler) + # add components to state - app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] = long_running_task_manager = ( - TasksManager( - stale_task_check_interval_s=stale_task_check_interval_s, - stale_task_detect_timeout_s=stale_task_detect_timeout_s, + app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = ( + AiohttpLongRunningManager( + app=app, + stale_task_check_interval=stale_task_check_interval, + stale_task_detect_timeout=stale_task_detect_timeout, ) ) - # add error handlers - app.middlewares.append(base_long_running_error_handler) + await long_running_manager.setup() yield # cleanup - await long_running_task_manager.close() + await long_running_manager.teardown() # add routing (done at setup-time) _wrap_and_add_routes( diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py index 81c48ed017b..ed5675a457d 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py @@ -1,9 +1,11 @@ import asyncio import logging from collections.abc import AsyncGenerator -from typing import Any +from datetime import timedelta +from typing import Any, Final from aiohttp import ClientConnectionError, ClientSession +from pydantic import PositiveFloat from tenacity import TryAgain, retry from tenacity.asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log @@ -12,7 +14,7 @@ from tenacity.wait import wait_random_exponential from yarl import URL -from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR +from ...long_running_tasks.constants import DEFAULT_POLL_INTERVAL_S from ...long_running_tasks.models import ( LRTask, RequestBody, @@ -26,6 +28,7 @@ _logger = logging.getLogger(__name__) +_DEFAULT_CLIENT_TIMEOUT_S: Final[PositiveFloat] = timedelta(hours=1).total_seconds() _DEFAULT_AIOHTTP_RETRY_POLICY: dict[str, Any] = { "retry": retry_if_exception_type(ClientConnectionError), @@ -49,7 +52,7 @@ async def _wait_for_completion( session: ClientSession, task_id: TaskId, status_url: URL, - client_timeout: int, + client_timeout: PositiveFloat, ) -> AsyncGenerator[TaskProgress, None]: try: async for attempt in AsyncRetrying( @@ -98,7 +101,7 @@ async def long_running_task_request( session: ClientSession, url: URL, json: RequestBody | None = None, - client_timeout: int = 1 * HOUR, + client_timeout: PositiveFloat = _DEFAULT_CLIENT_TIMEOUT_S, ) -> AsyncGenerator[LRTask, None]: """Will use the passed `ClientSession` to call an oSparc long running task `url` passing `json` as request body. diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py index 218fb4640f7..c5de3f1d5e1 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py @@ -6,17 +6,11 @@ running task. """ -from ._dependencies import ( - create_task_name_from_request, - get_task_context, - get_tasks_manager, -) +from ._manager import get_long_running_manager from ._server import setup, start_long_running_task __all__: tuple[str, ...] = ( - "create_task_name_from_request", - "get_task_context", - "get_tasks_manager", + "get_long_running_manager", "setup", "start_long_running_task", ) diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py index c2b93594ce1..a344e6b09a1 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py @@ -1,7 +1,6 @@ import asyncio import functools import logging -import warnings from collections.abc import Awaitable, Callable from typing import Any, Final @@ -181,16 +180,6 @@ async def cancel_and_delete_task( timeout=timeout, ) - if result.status_code == status.HTTP_200_OK: - warnings.warn( - "returning a 200 when cancelling a task has been deprecated with PR#3236" - "and will be removed after 11.2022" - "please do close your studies at least once before that date, so that the dy-sidecar" - "get replaced", - category=DeprecationWarning, - ) - return - if result.status_code not in ( status.HTTP_204_NO_CONTENT, status.HTTP_404_NOT_FOUND, diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_dependencies.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_dependencies.py index e2c2fdc4b00..ced9efa4f16 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_dependencies.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_dependencies.py @@ -1,8 +1,10 @@ from fastapi import Request -from ...long_running_tasks.task import TasksManager +from ._manager import FastAPILongRunningManager -def get_tasks_manager(request: Request) -> TasksManager: - output: TasksManager = request.app.state.long_running_task_manager - return output +def get_long_running_manager(request: Request) -> FastAPILongRunningManager: + assert isinstance( + request.app.state.long_running_manager, FastAPILongRunningManager + ) # nosec + return request.app.state.long_running_manager diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_error_handlers.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_error_handlers.py index b39eff575bc..0214e009217 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_error_handlers.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_error_handlers.py @@ -18,10 +18,10 @@ async def base_long_running_error_handler( _: Request, exception: BaseLongRunningError ) -> JSONResponse: _logger.debug("%s", exception, stack_info=True) - error_fields = dict(code=exception.code, message=f"{exception}") + error_fields = {"code": exception.code, "message": f"{exception}"} status_code = ( status.HTTP_404_NOT_FOUND - if isinstance(exception, (TaskNotFoundError, TaskNotCompletedError)) + if isinstance(exception, TaskNotFoundError | TaskNotCompletedError) else status.HTTP_400_BAD_REQUEST ) return JSONResponse(content=jsonable_encoder(error_fields), status_code=status_code) diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_manager.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_manager.py new file mode 100644 index 00000000000..bbc3e098a7e --- /dev/null +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_manager.py @@ -0,0 +1,30 @@ +import datetime + +from fastapi import FastAPI + +from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager +from ...long_running_tasks.task import TasksManager + + +class FastAPILongRunningManager(BaseLongRunningManager): + def __init__( + self, + app: FastAPI, + stale_task_check_interval: datetime.timedelta, + stale_task_detect_timeout: datetime.timedelta, + ): + self._app = app + self._tasks_manager = TasksManager( + stale_task_check_interval=stale_task_check_interval, + stale_task_detect_timeout=stale_task_detect_timeout, + ) + + @property + def tasks_manager(self) -> TasksManager: + return self._tasks_manager + + async def setup(self) -> None: + await self._tasks_manager.setup() + + async def teardown(self) -> None: + await self._tasks_manager.teardown() diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py index fa11e506012..8b474c8add9 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py @@ -2,11 +2,11 @@ from fastapi import APIRouter, Depends, Request, status -from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError +from ...long_running_tasks import http_endpoint_responses from ...long_running_tasks.models import TaskGet, TaskId, TaskResult, TaskStatus -from ...long_running_tasks.task import TasksManager from ..requests_decorators import cancel_on_disconnect -from ._dependencies import get_tasks_manager +from ._dependencies import get_long_running_manager +from ._manager import FastAPILongRunningManager router = APIRouter(prefix="/task") @@ -14,18 +14,25 @@ @router.get("", response_model=list[TaskGet]) @cancel_on_disconnect async def list_tasks( - request: Request, tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)] + request: Request, + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], ) -> list[TaskGet]: assert request # nosec return [ TaskGet( task_id=t.task_id, task_name=t.task_name, - status_href="", - result_href="", - abort_href="", + status_href=str(request.url_for("get_task_status", task_id=t.task_id)), + result_href=str(request.url_for("get_task_result", task_id=t.task_id)), + abort_href=str( + request.url_for("cancel_and_delete_task", task_id=t.task_id) + ), + ) + for t in http_endpoint_responses.list_tasks( + long_running_manager.tasks_manager, task_context=None ) - for t in tasks_manager.list_tasks(with_task_context=None) ] @@ -40,10 +47,14 @@ async def list_tasks( async def get_task_status( request: Request, task_id: TaskId, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], ) -> TaskStatus: assert request # nosec - return tasks_manager.get_task_status(task_id=task_id, with_task_context=None) + return http_endpoint_responses.get_task_status( + long_running_manager.tasks_manager, task_context=None, task_id=task_id + ) @router.get( @@ -59,23 +70,14 @@ async def get_task_status( async def get_task_result( request: Request, task_id: TaskId, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], ) -> TaskResult | Any: assert request # nosec - try: - task_result = tasks_manager.get_task_result(task_id, with_task_context=None) - await tasks_manager.remove_task( - task_id, with_task_context=None, reraise_errors=False - ) - return task_result - except (TaskNotFoundError, TaskNotCompletedError): - raise - except Exception: - # the task shall be removed in this case - await tasks_manager.remove_task( - task_id, with_task_context=None, reraise_errors=False - ) - raise + return await http_endpoint_responses.get_task_result( + long_running_manager.tasks_manager, task_context=None, task_id=task_id + ) @router.delete( @@ -91,7 +93,11 @@ async def get_task_result( async def cancel_and_delete_task( request: Request, task_id: TaskId, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], ) -> None: assert request # nosec - await tasks_manager.remove_task(task_id, with_task_context=None) + await http_endpoint_responses.remove_task( + long_running_manager.tasks_manager, task_context=None, task_id=task_id + ) diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_server.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_server.py index 26c26d10bc3..272250ae258 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_server.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_server.py @@ -1,22 +1,23 @@ -from typing import Final +import datetime from fastapi import APIRouter, FastAPI -from pydantic import PositiveFloat +from ...long_running_tasks.constants import ( + DEFAULT_STALE_TASK_CHECK_INTERVAL, + DEFAULT_STALE_TASK_DETECT_TIMEOUT, +) from ...long_running_tasks.errors import BaseLongRunningError -from ...long_running_tasks.task import TasksManager from ._error_handlers import base_long_running_error_handler +from ._manager import FastAPILongRunningManager from ._routes import router -_MINUTE: Final[PositiveFloat] = 60 - def setup( app: FastAPI, *, router_prefix: str = "", - stale_task_check_interval_s: PositiveFloat = 1 * _MINUTE, - stale_task_detect_timeout_s: PositiveFloat = 5 * _MINUTE, + stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL, + stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT, ) -> None: """ - `router_prefix` APIs are mounted on `/task/...`, this @@ -35,15 +36,19 @@ async def on_startup() -> None: app.include_router(main_router) # add components to state - app.state.long_running_task_manager = TasksManager( - stale_task_check_interval_s=stale_task_check_interval_s, - stale_task_detect_timeout_s=stale_task_detect_timeout_s, + app.state.long_running_manager = long_running_manager = ( + FastAPILongRunningManager( + app=app, + stale_task_check_interval=stale_task_check_interval, + stale_task_detect_timeout=stale_task_detect_timeout, + ) ) + await long_running_manager.setup() async def on_shutdown() -> None: - if app.state.long_running_task_manager: - task_manager: TasksManager = app.state.long_running_task_manager - await task_manager.close() + if app.state.long_running_manager: + task_manager: FastAPILongRunningManager = app.state.long_running_manager + await task_manager.teardown() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py index e0eafea2257..ab2ecb1a73f 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py @@ -2,146 +2,8 @@ Provides a convenient way to return the result given a TaskId. """ -import asyncio -import logging -from collections.abc import AsyncGenerator -from typing import Any - -import httpx -from fastapi import status -from tenacity import ( - AsyncRetrying, - TryAgain, - before_sleep_log, - retry, - retry_if_exception_type, - stop_after_delay, - wait_random_exponential, -) -from yarl import URL - -from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR -from ...long_running_tasks.models import ( - LRTask, - RequestBody, - TaskGet, - TaskProgress, - TaskStatus, -) -from ...long_running_tasks.task import TaskId -from ...rest_responses import unwrap_envelope_if_required from ._client import Client, setup -from ._context_manager import periodic_task_result - -_logger = logging.getLogger(__name__) - - -_DEFAULT_FASTAPI_RETRY_POLICY: dict[str, Any] = { - "retry": retry_if_exception_type(httpx.RequestError), - "wait": wait_random_exponential(max=20), - "stop": stop_after_delay(60), - "reraise": True, - "before_sleep": before_sleep_log(_logger, logging.INFO), -} - - -@retry(**_DEFAULT_FASTAPI_RETRY_POLICY) -async def _start( - session: httpx.AsyncClient, url: URL, json: RequestBody | None -) -> TaskGet: - response = await session.post(f"{url}", json=json) - response.raise_for_status() - data = unwrap_envelope_if_required(response.json()) - return TaskGet.model_validate(data) - - -@retry(**_DEFAULT_FASTAPI_RETRY_POLICY) -async def _wait_for_completion( - session: httpx.AsyncClient, - task_id: TaskId, - status_url: URL, - client_timeout: int, -) -> AsyncGenerator[TaskProgress, None]: - try: - async for attempt in AsyncRetrying( - stop=stop_after_delay(client_timeout), - reraise=True, - retry=retry_if_exception_type(TryAgain), - before_sleep=before_sleep_log(_logger, logging.DEBUG), - ): - with attempt: - response = await session.get(f"{status_url}") - response.raise_for_status() - data = unwrap_envelope_if_required(response.json()) - task_status = TaskStatus.model_validate(data) - - yield task_status.task_progress - if not task_status.done: - await asyncio.sleep( - float( - response.headers.get("retry-after", DEFAULT_POLL_INTERVAL_S) - ) - ) - msg = f"{task_id=}, {task_status.started=} has status: '{task_status.task_progress.message}' {task_status.task_progress.percent}%" - raise TryAgain(msg) # noqa: TRY301 - - except TryAgain as exc: - # this is a timeout - msg = f"Long running task {task_id}, calling to {status_url} timed-out after {client_timeout} seconds" - raise TimeoutError(msg) from exc - - -@retry(**_DEFAULT_FASTAPI_RETRY_POLICY) -async def _task_result(session: httpx.AsyncClient, result_url: URL) -> Any: - response = await session.get(f"{result_url}") - response.raise_for_status() - if response.status_code != status.HTTP_204_NO_CONTENT: - return unwrap_envelope_if_required(response.json()) - return None - - -@retry(**_DEFAULT_FASTAPI_RETRY_POLICY) -async def _abort_task(session: httpx.AsyncClient, abort_url: URL) -> None: - response = await session.delete(f"{abort_url}") - response.raise_for_status() - - -async def long_running_task_request( - session: httpx.AsyncClient, - url: URL, - json: RequestBody | None = None, - client_timeout: int = 1 * HOUR, -) -> AsyncGenerator[LRTask, None]: - """Will use the passed `httpx.AsyncClient` to call an oSparc long - running task `url` passing `json` as request body. - NOTE: this follows the usual aiohttp client syntax, and will raise the same errors - - Raises: - [https://docs.aiohttp.org/en/stable/client_reference.html#hierarchy-of-exceptions] - """ - task = None - try: - task = await _start(session, url, json) - last_progress = None - async for task_progress in _wait_for_completion( - session, - task.task_id, - URL(task.status_href), - client_timeout, - ): - last_progress = task_progress - yield LRTask(progress=task_progress) - assert last_progress # nosec - yield LRTask( - progress=last_progress, - _result=_task_result(session, URL(task.result_href)), - ) - - except (TimeoutError, asyncio.CancelledError): - if task: - await _abort_task(session, URL(task.abort_href)) - raise - +from ._context_manager import periodic_task_result # attach to the same object! __all__: tuple[str, ...] = ( "Client", diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/server.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/server.py index 51240434a77..b7cf0fba60a 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/server.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/server.py @@ -6,11 +6,11 @@ running task. The client will take care of recovering the result from it. """ -from ._dependencies import get_tasks_manager +from ._dependencies import get_long_running_manager from ._server import setup __all__: tuple[str, ...] = ( - "get_tasks_manager", + "get_long_running_manager", "setup", ) diff --git a/packages/service-library/src/servicelib/long_running_tasks/_constants.py b/packages/service-library/src/servicelib/long_running_tasks/_constants.py deleted file mode 100644 index 5cc87208a36..00000000000 --- a/packages/service-library/src/servicelib/long_running_tasks/_constants.py +++ /dev/null @@ -1,5 +0,0 @@ -from typing import Final - -MINUTE: Final[int] = 60 # in secs -HOUR: Final[int] = 60 * MINUTE # in secs -DEFAULT_POLL_INTERVAL_S: Final[float] = 1 diff --git a/packages/service-library/src/servicelib/long_running_tasks/base_long_running_manager.py b/packages/service-library/src/servicelib/long_running_tasks/base_long_running_manager.py new file mode 100644 index 00000000000..d09428d3aa2 --- /dev/null +++ b/packages/service-library/src/servicelib/long_running_tasks/base_long_running_manager.py @@ -0,0 +1,22 @@ +from abc import ABC, abstractmethod + +from .task import TasksManager + + +class BaseLongRunningManager(ABC): + """ + Provides a commond inteface for aiohttp and fastapi services + """ + + @property + @abstractmethod + def tasks_manager(self) -> TasksManager: + pass + + @abstractmethod + async def setup(self) -> None: + pass + + @abstractmethod + async def teardown(self) -> None: + pass diff --git a/packages/service-library/src/servicelib/long_running_tasks/constants.py b/packages/service-library/src/servicelib/long_running_tasks/constants.py new file mode 100644 index 00000000000..b5e729665cc --- /dev/null +++ b/packages/service-library/src/servicelib/long_running_tasks/constants.py @@ -0,0 +1,7 @@ +from datetime import timedelta +from typing import Final + +DEFAULT_POLL_INTERVAL_S: Final[float] = 1 + +DEFAULT_STALE_TASK_CHECK_INTERVAL: Final[timedelta] = timedelta(minutes=1) +DEFAULT_STALE_TASK_DETECT_TIMEOUT: Final[timedelta] = timedelta(minutes=5) diff --git a/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py b/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py new file mode 100644 index 00000000000..be873f1a1a2 --- /dev/null +++ b/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py @@ -0,0 +1,41 @@ +from typing import Any + +from .models import TaskBase, TaskId, TaskStatus +from .task import TaskContext, TasksManager, TrackedTask + + +def list_tasks( + tasks_manager: TasksManager, task_context: TaskContext | None +) -> list[TaskBase]: + tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks( + with_task_context=task_context + ) + return [TaskBase(task_id=t.task_id, task_name=t.task_name) for t in tracked_tasks] + + +def get_task_status( + tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId +) -> TaskStatus: + return tasks_manager.get_task_status( + task_id=task_id, with_task_context=task_context + ) + + +async def get_task_result( + tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId +) -> Any: + try: + return tasks_manager.get_task_result( + task_id=task_id, with_task_context=task_context + ) + finally: + # the task is always removed even if an error occurs + await tasks_manager.remove_task( + task_id, with_task_context=task_context, reraise_errors=False + ) + + +async def remove_task( + tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId +) -> None: + await tasks_manager.remove_task(task_id, with_task_context=task_context) diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 89fb8b1b399..37fc968568d 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -2,7 +2,7 @@ from asyncio import Task from collections.abc import Awaitable, Callable, Coroutine from dataclasses import dataclass -from datetime import datetime +from datetime import UTC, datetime from typing import Any, TypeAlias from models_library.api_schemas_long_running_tasks.base import ( @@ -12,6 +12,7 @@ TaskProgress, ) from models_library.api_schemas_long_running_tasks.tasks import ( + TaskBase, TaskGet, TaskResult, TaskStatus, @@ -41,7 +42,7 @@ class TrackedTask(BaseModel): description="if True then the task will not be auto-cancelled if no one enquires of its status", ) - started: datetime = Field(default_factory=datetime.utcnow) + started: datetime = Field(default_factory=lambda: datetime.now(UTC)) last_status_check: datetime | None = Field( default=None, description=( @@ -74,18 +75,15 @@ async def result(self) -> Any: return await self._result -# explicit export of models for api-schemas - -assert TaskResult # nosec -assert TaskGet # nosec -assert TaskStatus # nosec - __all__: tuple[str, ...] = ( "ProgressMessage", "ProgressPercent", "TaskGet", + "TaskBase", "TaskId", "TaskProgress", "TaskResult", "TaskStatus", ) + +# nopycln: file diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 1e2d331bd6a..c89e94ce476 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -1,19 +1,19 @@ import asyncio +import datetime import inspect import logging import traceback import urllib.parse from collections import deque from contextlib import suppress -from datetime import datetime -from typing import Any, Protocol, TypeAlias +from typing import Any, Final, Protocol, TypeAlias from uuid import uuid4 -from models_library.api_schemas_long_running_tasks.base import ( - ProgressPercent, - TaskProgress, -) +from models_library.api_schemas_long_running_tasks.base import TaskProgress from pydantic import PositiveFloat +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task +from servicelib.logging_utils import log_catch from .errors import ( TaskAlreadyRunningError, @@ -24,37 +24,45 @@ ) from .models import TaskId, TaskName, TaskStatus, TrackedTask -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) + +_CANCEL_TASK_TIMEOUT: Final[PositiveFloat] = datetime.timedelta( + seconds=1 +).total_seconds() + +TrackedTaskGroupDict: TypeAlias = dict[TaskId, TrackedTask] +TaskContext: TypeAlias = dict[str, Any] async def _await_task(task: asyncio.Task) -> None: await task -def _mark_task_to_remove_if_required( - task_id: TaskId, - tasks_to_remove: list[TaskId], - tracked_task: TrackedTask, - utc_now: datetime, - stale_timeout_s: float, -) -> None: - if tracked_task.fire_and_forget: - return - - if tracked_task.last_status_check is None: - # the task just added or never received a poll request - elapsed_from_start = (utc_now - tracked_task.started).seconds - if elapsed_from_start > stale_timeout_s: - tasks_to_remove.append(task_id) - else: - # the task status was already queried by the client - elapsed_from_last_poll = (utc_now - tracked_task.last_status_check).seconds - if elapsed_from_last_poll > stale_timeout_s: - tasks_to_remove.append(task_id) +def _get_tasks_to_remove( + tasks_groups: dict[TaskName, TrackedTaskGroupDict], + stale_task_detect_timeout_s: PositiveFloat, +) -> list[TaskId]: + utc_now = datetime.datetime.now(tz=datetime.UTC) + tasks_to_remove: list[TaskId] = [] + for tasks in tasks_groups.values(): + for task_id, tracked_task in tasks.items(): + if tracked_task.fire_and_forget: + continue -TrackedTaskGroupDict: TypeAlias = dict[TaskId, TrackedTask] -TaskContext: TypeAlias = dict[str, Any] + if tracked_task.last_status_check is None: + # the task just added or never received a poll request + elapsed_from_start = (utc_now - tracked_task.started).seconds + if elapsed_from_start > stale_task_detect_timeout_s: + tasks_to_remove.append(task_id) + else: + # the task status was already queried by the client + elapsed_from_last_poll = ( + utc_now - tracked_task.last_status_check + ).seconds + if elapsed_from_last_poll > stale_task_detect_timeout_s: + tasks_to_remove.append(task_id) + return tasks_to_remove class TasksManager: @@ -64,21 +72,43 @@ class TasksManager: def __init__( self, - stale_task_check_interval_s: PositiveFloat, - stale_task_detect_timeout_s: PositiveFloat, + stale_task_check_interval: datetime.timedelta, + stale_task_detect_timeout: datetime.timedelta, ): # Task groups: Every taskname maps to multiple asyncio.Task within TrackedTask model self._tasks_groups: dict[TaskName, TrackedTaskGroupDict] = {} - self._cancel_task_timeout_s: PositiveFloat = 1.0 + self.stale_task_check_interval = stale_task_check_interval + self.stale_task_detect_timeout_s: PositiveFloat = ( + stale_task_detect_timeout.total_seconds() + ) + + self._stale_tasks_monitor_task: asyncio.Task | None = None - self.stale_task_check_interval_s = stale_task_check_interval_s - self.stale_task_detect_timeout_s = stale_task_detect_timeout_s - self._stale_tasks_monitor_task: asyncio.Task = asyncio.create_task( - self._stale_tasks_monitor_worker(), - name=f"{__name__}.stale_task_monitor_worker", + async def setup(self) -> None: + self._stale_tasks_monitor_task = create_periodic_task( + task=self._stale_tasks_monitor_worker, + interval=self.stale_task_check_interval, + task_name=f"{__name__}.{self._stale_tasks_monitor_worker.__name__}", ) + async def teardown(self) -> None: + task_ids_to_remove: deque[TaskId] = deque() + + for tasks_dict in self._tasks_groups.values(): + for tracked_task in tasks_dict.values(): + task_ids_to_remove.append(tracked_task.task_id) + + for task_id in task_ids_to_remove: + # when closing we do not care about pending errors + await self.remove_task(task_id, None, reraise_errors=False) + + if self._stale_tasks_monitor_task: + with log_catch(_logger, reraise=False): + await cancel_wait_task( + self._stale_tasks_monitor_task, max_delay=_CANCEL_TASK_TIMEOUT + ) + def get_task_group(self, task_name: TaskName) -> TrackedTaskGroupDict: return self._tasks_groups[task_name] @@ -99,37 +129,25 @@ async def _stale_tasks_monitor_worker(self) -> None: # Since we own the client, we assume (for now) this # will not be the case. - while await asyncio.sleep(self.stale_task_check_interval_s, result=True): - utc_now = datetime.utcnow() - - tasks_to_remove: list[TaskId] = [] - for tasks in self._tasks_groups.values(): - for task_id, tracked_task in tasks.items(): - _mark_task_to_remove_if_required( - task_id, - tasks_to_remove, - tracked_task, - utc_now, - self.stale_task_detect_timeout_s, - ) + tasks_to_remove = _get_tasks_to_remove( + self._tasks_groups, self.stale_task_detect_timeout_s + ) - # finally remove tasks and warn - for task_id in tasks_to_remove: - # NOTE: task can be in the following cases: - # - still ongoing - # - finished with a result - # - finished with errors - # we just print the status from where one can infer the above - logger.warning( - "Removing stale task '%s' with status '%s'", - task_id, - self.get_task_status( - task_id, with_task_context=None - ).model_dump_json(), - ) - await self.remove_task( - task_id, with_task_context=None, reraise_errors=False - ) + # finally remove tasks and warn + for task_id in tasks_to_remove: + # NOTE: task can be in the following cases: + # - still ongoing + # - finished with a result + # - finished with errors + # we just print the status from where one can infer the above + _logger.warning( + "Removing stale task '%s' with status '%s'", + task_id, + self.get_task_status(task_id, with_task_context=None).model_dump_json(), + ) + await self.remove_task( + task_id, with_task_context=None, reraise_errors=False + ) @staticmethod def create_task_id(task_name: TaskName) -> str: @@ -207,7 +225,7 @@ def get_task_status( raises TaskNotFoundError if the task cannot be found """ tracked_task: TrackedTask = self._get_tracked_task(task_id, with_task_context) - tracked_task.last_status_check = datetime.utcnow() + tracked_task.last_status_check = datetime.datetime.now(tz=datetime.UTC) task = tracked_task.task done = task.done() @@ -252,18 +270,19 @@ async def cancel_task( tracked_task = self._get_tracked_task(task_id, with_task_context) await self._cancel_tracked_task(tracked_task.task, task_id, reraise_errors=True) + @staticmethod async def _cancel_asyncio_task( - self, task: asyncio.Task, reference: str, *, reraise_errors: bool + task: asyncio.Task, reference: str, *, reraise_errors: bool ) -> None: task.cancel() with suppress(asyncio.CancelledError): try: try: await asyncio.wait_for( - _await_task(task), timeout=self._cancel_task_timeout_s + _await_task(task), timeout=_CANCEL_TASK_TIMEOUT ) except TimeoutError: - logger.warning( + _logger.warning( "Timed out while awaiting for cancellation of '%s'", reference ) except Exception: # pylint:disable=broad-except @@ -304,24 +323,6 @@ async def remove_task( finally: del self._tasks_groups[tracked_task.task_name][task_id] - async def close(self) -> None: - """ - cancels all pending tasks and removes them before closing - """ - task_ids_to_remove: deque[TaskId] = deque() - - for tasks_dict in self._tasks_groups.values(): - for tracked_task in tasks_dict.values(): - task_ids_to_remove.append(tracked_task.task_id) - - for task_id in task_ids_to_remove: - # when closing we do not care about pending errors - await self.remove_task(task_id, None, reraise_errors=False) - - await self._cancel_asyncio_task( - self._stale_tasks_monitor_task, "stale_monitor", reraise_errors=False - ) - class TaskProtocol(Protocol): async def __call__( @@ -389,11 +390,11 @@ def start_task( # bind the task with progress 0 and 1 async def _progress_task(progress: TaskProgress, handler: TaskProtocol): - progress.update(message="starting", percent=ProgressPercent(0)) + progress.update(message="starting", percent=0) try: return await handler(progress, **task_kwargs) finally: - progress.update(message="finished", percent=ProgressPercent(1)) + progress.update(message="finished", percent=1) async_task = asyncio.create_task( _progress_task(task_progress, task), name=f"{task_name}" @@ -415,9 +416,9 @@ async def _progress_task(progress: TaskProgress, handler: TaskProtocol): "TaskAlreadyRunningError", "TaskCancelledError", "TaskId", - "TasksManager", "TaskProgress", "TaskProtocol", "TaskStatus", + "TasksManager", "TrackedTask", ) diff --git a/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py b/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py index 7a5b919c34e..07809f928b0 100644 --- a/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py +++ b/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py @@ -19,9 +19,10 @@ from fastapi import APIRouter, Depends, FastAPI, status from httpx import AsyncClient from pydantic import TypeAdapter +from servicelib.fastapi.long_running_tasks._manager import FastAPILongRunningManager from servicelib.fastapi.long_running_tasks.client import setup as setup_client from servicelib.fastapi.long_running_tasks.server import ( - get_tasks_manager, + get_long_running_manager, ) from servicelib.fastapi.long_running_tasks.server import setup as setup_server from servicelib.long_running_tasks.models import ( @@ -30,7 +31,7 @@ TaskProgress, TaskStatus, ) -from servicelib.long_running_tasks.task import TaskContext, TasksManager, start_task +from servicelib.long_running_tasks.task import TaskContext, start_task from tenacity.asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay @@ -69,10 +70,12 @@ async def create_string_list_task( num_strings: int, sleep_time: float, fail: bool = False, - task_manager: TasksManager = Depends(get_tasks_manager), + long_running_manager: FastAPILongRunningManager = Depends( + get_long_running_manager + ), ) -> TaskId: task_id = start_task( - task_manager, + long_running_manager.tasks_manager, _string_list_task, num_strings=num_strings, sleep_time=sleep_time, diff --git a/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py b/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py index cf9890a0ed4..9072a70ddc0 100644 --- a/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py +++ b/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py @@ -3,7 +3,7 @@ import asyncio from collections.abc import AsyncIterable -from typing import Final +from typing import Annotated, Final import pytest from asgi_lifespan import LifespanManager @@ -11,9 +11,10 @@ from httpx import AsyncClient from pydantic import AnyHttpUrl, PositiveFloat, TypeAdapter from servicelib.fastapi.long_running_tasks._context_manager import _ProgressManager +from servicelib.fastapi.long_running_tasks._manager import FastAPILongRunningManager from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result from servicelib.fastapi.long_running_tasks.client import setup as setup_client -from servicelib.fastapi.long_running_tasks.server import get_tasks_manager +from servicelib.fastapi.long_running_tasks.server import get_long_running_manager from servicelib.fastapi.long_running_tasks.server import setup as setup_server from servicelib.long_running_tasks.errors import ( TaskClientTimeoutError, @@ -24,7 +25,7 @@ TaskId, TaskProgress, ) -from servicelib.long_running_tasks.task import TasksManager, start_task +from servicelib.long_running_tasks.task import start_task TASK_SLEEP_INTERVAL: Final[PositiveFloat] = 0.1 @@ -55,17 +56,19 @@ def user_routes() -> APIRouter: @router.get("/api/success", status_code=status.HTTP_200_OK) async def create_task_user_defined_route( - tasks_manager: TasksManager = Depends(get_tasks_manager), + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], ) -> TaskId: - task_id = start_task(tasks_manager, task=a_test_task) - return task_id + return start_task(long_running_manager.tasks_manager, task=a_test_task) @router.get("/api/failing", status_code=status.HTTP_200_OK) async def create_task_which_fails( - task_manager: TasksManager = Depends(get_tasks_manager), + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], ) -> TaskId: - task_id = start_task(task_manager, task=a_failing_test_task) - return task_id + return start_task(long_running_manager.tasks_manager, task=a_failing_test_task) return router diff --git a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py index 63be0eb1262..6e3ec1522e2 100644 --- a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py +++ b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py @@ -7,7 +7,7 @@ import asyncio import urllib.parse from collections.abc import AsyncIterator -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Final import pytest @@ -71,11 +71,12 @@ async def failing_background_task(task_progress: TaskProgress): @pytest.fixture async def tasks_manager() -> AsyncIterator[TasksManager]: tasks_manager = TasksManager( - stale_task_check_interval_s=TEST_CHECK_STALE_INTERVAL_S, - stale_task_detect_timeout_s=TEST_CHECK_STALE_INTERVAL_S, + stale_task_check_interval=timedelta(seconds=TEST_CHECK_STALE_INTERVAL_S), + stale_task_detect_timeout=timedelta(seconds=TEST_CHECK_STALE_INTERVAL_S), ) + await tasks_manager.setup() yield tasks_manager - await tasks_manager.close() + await tasks_manager.teardown() @pytest.mark.parametrize("check_task_presence_before", [True, False]) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_scheduler.py b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_scheduler.py index 4f01e803dd0..322ee491051 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_scheduler.py @@ -4,7 +4,8 @@ from fastapi import APIRouter, Depends, HTTPException, status from models_library.projects_nodes_io import NodeID from pydantic import BaseModel, PositiveInt -from servicelib.fastapi.long_running_tasks.server import get_tasks_manager +from servicelib.fastapi.long_running_tasks._manager import FastAPILongRunningManager +from servicelib.fastapi.long_running_tasks.server import get_long_running_manager from servicelib.long_running_tasks.errors import TaskAlreadyRunningError from servicelib.long_running_tasks.models import ( ProgressMessage, @@ -12,7 +13,7 @@ TaskId, TaskProgress, ) -from servicelib.long_running_tasks.task import TasksManager, start_task +from servicelib.long_running_tasks.task import start_task from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.retry import retry_if_result @@ -91,7 +92,9 @@ async def update_service_observation( ) async def delete_service_containers( node_uuid: NodeID, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], dynamic_sidecars_scheduler: Annotated[ DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler) ], @@ -110,7 +113,7 @@ async def _progress_callback( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=_task_remove_service_containers, # type: ignore[arg-type] unique=True, node_uuid=node_uuid, @@ -149,7 +152,9 @@ async def get_service_state( ) async def save_service_state( node_uuid: NodeID, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], dynamic_sidecars_scheduler: Annotated[ DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler) ], @@ -169,7 +174,7 @@ async def _progress_callback( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=_task_save_service_state, # type: ignore[arg-type] unique=True, node_uuid=node_uuid, @@ -191,7 +196,9 @@ async def _progress_callback( ) async def push_service_outputs( node_uuid: NodeID, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], dynamic_sidecars_scheduler: Annotated[ DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler) ], @@ -210,7 +217,7 @@ async def _progress_callback( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=_task_push_service_outputs, # type: ignore[arg-type] unique=True, node_uuid=node_uuid, @@ -232,7 +239,9 @@ async def _progress_callback( ) async def delete_service_docker_resources( node_uuid: NodeID, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], dynamic_sidecars_scheduler: Annotated[ DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler) ], @@ -246,7 +255,7 @@ async def _task_cleanup_service_docker_resources( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=_task_cleanup_service_docker_resources, # type: ignore[arg-type] unique=True, node_uuid=node_uuid, diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 3469914a677..ce9c08b5b5d 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -50,12 +50,12 @@ from pytest_simcore.helpers.host import get_localhost_ip from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict -from servicelib.fastapi.long_running_tasks.client import ( - Client, +from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result +from servicelib.long_running_tasks.models import ( + ProgressMessage, + ProgressPercent, TaskId, - periodic_task_result, ) -from servicelib.long_running_tasks.models import ProgressMessage, ProgressPercent from servicelib.progress_bar import ProgressBarData from servicelib.sequences_utils import pairwise from settings_library.rabbit import RabbitSettings diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py index da487ce392d..17ae8d9187f 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py @@ -2,11 +2,12 @@ from typing import Annotated, cast from fastapi import APIRouter, Depends, FastAPI, Request, status -from servicelib.fastapi.long_running_tasks.server import get_tasks_manager +from servicelib.fastapi.long_running_tasks._manager import FastAPILongRunningManager +from servicelib.fastapi.long_running_tasks.server import get_long_running_manager from servicelib.fastapi.requests_decorators import cancel_on_disconnect from servicelib.long_running_tasks.errors import TaskAlreadyRunningError from servicelib.long_running_tasks.models import TaskId -from servicelib.long_running_tasks.task import TasksManager, start_task +from servicelib.long_running_tasks.task import start_task from ...core.settings import ApplicationSettings from ...models.schemas.application_health import ApplicationHealth @@ -48,7 +49,9 @@ @cancel_on_disconnect async def pull_user_servcices_docker_images( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], shared_store: Annotated[SharedStore, Depends(get_shared_store)], app: Annotated[FastAPI, Depends(get_application)], ) -> TaskId: @@ -56,7 +59,7 @@ async def pull_user_servcices_docker_images( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_pull_user_servcices_docker_images, unique=True, app=app, @@ -85,7 +88,9 @@ async def pull_user_servcices_docker_images( async def create_service_containers_task( # pylint: disable=too-many-arguments request: Request, containers_create: ContainersCreate, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], settings: Annotated[ApplicationSettings, Depends(get_settings)], shared_store: Annotated[SharedStore, Depends(get_shared_store)], app: Annotated[FastAPI, Depends(get_application)], @@ -95,7 +100,7 @@ async def create_service_containers_task( # pylint: disable=too-many-arguments try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_create_service_containers, unique=True, settings=settings, @@ -117,7 +122,9 @@ async def create_service_containers_task( # pylint: disable=too-many-arguments @cancel_on_disconnect async def runs_docker_compose_down_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], settings: Annotated[ApplicationSettings, Depends(get_settings)], shared_store: Annotated[SharedStore, Depends(get_shared_store)], app: Annotated[FastAPI, Depends(get_application)], @@ -127,7 +134,7 @@ async def runs_docker_compose_down_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_runs_docker_compose_down, unique=True, app=app, @@ -148,7 +155,9 @@ async def runs_docker_compose_down_task( @cancel_on_disconnect async def state_restore_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], settings: Annotated[ApplicationSettings, Depends(get_settings)], mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)], app: Annotated[FastAPI, Depends(get_application)], @@ -157,7 +166,7 @@ async def state_restore_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_restore_state, unique=True, settings=settings, @@ -177,7 +186,9 @@ async def state_restore_task( @cancel_on_disconnect async def state_save_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], app: Annotated[FastAPI, Depends(get_application)], mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)], settings: Annotated[ApplicationSettings, Depends(get_settings)], @@ -186,7 +197,7 @@ async def state_save_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_save_state, unique=True, settings=settings, @@ -206,7 +217,9 @@ async def state_save_task( @cancel_on_disconnect async def ports_inputs_pull_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], app: Annotated[FastAPI, Depends(get_application)], settings: Annotated[ApplicationSettings, Depends(get_settings)], mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)], @@ -217,7 +230,7 @@ async def ports_inputs_pull_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_ports_inputs_pull, unique=True, port_keys=port_keys, @@ -239,7 +252,9 @@ async def ports_inputs_pull_task( @cancel_on_disconnect async def ports_outputs_pull_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], app: Annotated[FastAPI, Depends(get_application)], mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)], port_keys: list[str] | None = None, @@ -248,7 +263,7 @@ async def ports_outputs_pull_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_ports_outputs_pull, unique=True, port_keys=port_keys, @@ -268,7 +283,9 @@ async def ports_outputs_pull_task( @cancel_on_disconnect async def ports_outputs_push_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], outputs_manager: Annotated[OutputsManager, Depends(get_outputs_manager)], app: Annotated[FastAPI, Depends(get_application)], ) -> TaskId: @@ -276,7 +293,7 @@ async def ports_outputs_push_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_ports_outputs_push, unique=True, outputs_manager=outputs_manager, @@ -295,7 +312,9 @@ async def ports_outputs_push_task( @cancel_on_disconnect async def containers_restart_task( request: Request, - tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], + long_running_manager: Annotated[ + FastAPILongRunningManager, Depends(get_long_running_manager) + ], app: Annotated[FastAPI, Depends(get_application)], settings: Annotated[ApplicationSettings, Depends(get_settings)], shared_store: Annotated[SharedStore, Depends(get_shared_store)], @@ -304,7 +323,7 @@ async def containers_restart_task( try: return start_task( - tasks_manager, + long_running_manager.tasks_manager, task=task_containers_restart, unique=True, app=app, diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py index 0731009a380..970f8aeb67e 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py @@ -28,7 +28,7 @@ from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.docker_constants import SUFFIX_EGRESS_PROXY_NAME -from servicelib.fastapi.long_running_tasks.client import TaskId +from servicelib.long_running_tasks.models import TaskId from simcore_service_dynamic_sidecar._meta import API_VTAG from simcore_service_dynamic_sidecar.api.rest.containers import _INACTIVE_FOR_LONG_TIME from simcore_service_dynamic_sidecar.core.application import AppState diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py index 75677e1d7f7..b00491ce764 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py @@ -28,12 +28,9 @@ from pydantic import AnyHttpUrl, TypeAdapter from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict -from servicelib.fastapi.long_running_tasks.client import ( - Client, - TaskId, - periodic_task_result, -) +from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result from servicelib.fastapi.long_running_tasks.client import setup as client_setup +from servicelib.long_running_tasks.models import TaskId from simcore_sdk.node_ports_common.exceptions import NodeNotFound from simcore_service_dynamic_sidecar._meta import API_VTAG from simcore_service_dynamic_sidecar.api.rest import containers_long_running_tasks diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_prometheus_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_rest_prometheus_metrics.py index 7d4454b7e0a..78e5b22046e 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_prometheus_metrics.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_prometheus_metrics.py @@ -17,12 +17,9 @@ from models_library.services_creation import CreateServiceMetricsAdditionalParams from pydantic import AnyHttpUrl, TypeAdapter from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict -from servicelib.fastapi.long_running_tasks.client import ( - Client, - TaskId, - periodic_task_result, -) +from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result from servicelib.fastapi.long_running_tasks.client import setup as client_setup +from servicelib.long_running_tasks.models import TaskId from simcore_service_dynamic_sidecar._meta import API_VTAG from simcore_service_dynamic_sidecar.models.schemas.containers import ( ContainersComposeSpec, diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py index b276f00cb2c..ee3ab015a1c 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py @@ -32,12 +32,9 @@ from pydantic import AnyHttpUrl, TypeAdapter from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict -from servicelib.fastapi.long_running_tasks.client import ( - Client, - TaskId, - periodic_task_result, -) +from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result from servicelib.fastapi.long_running_tasks.client import setup as client_setup +from servicelib.long_running_tasks.models import TaskId from simcore_service_dynamic_sidecar._meta import API_VTAG from simcore_service_dynamic_sidecar.core.docker_utils import get_container_states from simcore_service_dynamic_sidecar.models.schemas.containers import ( diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 5c7584f699b..73e8b2d4d7e 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3391,7 +3391,8 @@ paths: description: Successful Response content: application/json: - schema: {} + schema: + title: Response Get Async Job Result '404': content: application/json: @@ -3486,7 +3487,8 @@ paths: description: Successful Response content: application/json: - schema: {} + schema: + title: Response Get Task Result /v0/catalog/licensed-items: get: tags: diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index 6592af29784..9b6ce8411d8 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -22,13 +22,13 @@ from pydantic import BaseModel from servicelib.aiohttp import status from servicelib.aiohttp.long_running_tasks.server import ( - get_task_context, - get_tasks_manager, + get_long_running_manager, ) from servicelib.aiohttp.requests_validation import ( parse_request_path_parameters_as, ) from servicelib.aiohttp.rest_responses import create_data_response +from servicelib.long_running_tasks import http_endpoint_responses from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs from .._meta import API_VTAG @@ -56,9 +56,11 @@ @handle_export_data_exceptions @webserver_request_context_decorator async def get_async_jobs(request: web.Request) -> web.Response: - inprocess_task_manager = get_tasks_manager(request.app) - inprocess_task_context = get_task_context(request) - inprocess_tracked_tasks = inprocess_task_manager.list_tasks(inprocess_task_context) + inprocess_long_running_manager = get_long_running_manager(request.app) + inprocess_tracked_tasks = http_endpoint_responses.list_tasks( + inprocess_long_running_manager.tasks_manager, + inprocess_long_running_manager.get_task_context(request), + ) _req_ctx = AuthenticatedRequestContext.model_validate(request) diff --git a/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py b/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py index c6f58f29ee1..0b5f601d8f0 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py +++ b/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py @@ -6,6 +6,7 @@ # pylint: disable=no-self-argument from typing import Any +from unittest.mock import Mock import pytest from aiohttp.test_utils import TestClient @@ -77,11 +78,14 @@ async def test_listing_tasks_with_list_inprocess_tasks_error( class _DummyTaskManager: def list_tasks(self, *args, **kwargs): - raise Exception() # pylint: disable=broad-exception-raised + raise Exception # pylint: disable=broad-exception-raised # noqa: TRY002 + + mock = Mock() + mock.tasks_manager = _DummyTaskManager() mocker.patch( - "servicelib.aiohttp.long_running_tasks._routes.get_tasks_manager", - return_value=_DummyTaskManager(), + "servicelib.aiohttp.long_running_tasks._routes.get_long_running_manager", + return_value=mock, ) _async_jobs_listing_path = client.app.router["get_async_jobs"].url_for()