From 17b79c99ea5b7d808474bcb07938e78197ad11cd Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 12 May 2025 10:29:18 +0200 Subject: [PATCH] call long running tasks module directly --- .../aiohttp/long_running_tasks/server.py | 8 +++- .../long_running_tasks.py | 4 +- .../simcore_service_webserver/tasks/_rest.py | 38 +++++++++---------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py index b1766f8035fb..55d1295c1977 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py @@ -5,6 +5,7 @@ The server only has to return a `TaskId` in the handler creating the long running task. """ + from ...long_running_tasks._errors import TaskAlreadyRunningError, TaskCancelledError from ...long_running_tasks._models import ProgressMessage, ProgressPercent from ...long_running_tasks._task import ( @@ -14,12 +15,17 @@ TasksManager, TaskStatus, ) -from ._dependencies import create_task_name_from_request, get_tasks_manager +from ._dependencies import ( + create_task_name_from_request, + get_task_context, + get_tasks_manager, +) from ._routes import TaskGet from ._server import setup, start_long_running_task __all__: tuple[str, ...] = ( "create_task_name_from_request", + "get_task_context", "get_tasks_manager", "ProgressMessage", "ProgressPercent", diff --git a/services/web/server/src/simcore_service_webserver/long_running_tasks.py b/services/web/server/src/simcore_service_webserver/long_running_tasks.py index c2f842eab7a4..b98bf7ed7036 100644 --- a/services/web/server/src/simcore_service_webserver/long_running_tasks.py +++ b/services/web/server/src/simcore_service_webserver/long_running_tasks.py @@ -13,7 +13,7 @@ from .models import RequestContext -def _webserver_request_context_decorator(handler: Handler): +def webserver_request_context_decorator(handler: Handler): @wraps(handler) async def _test_task_context_decorator( request: web.Request, @@ -31,5 +31,5 @@ def setup_long_running_tasks(app: web.Application) -> None: app, router_prefix=f"/{API_VTAG}/tasks-legacy", handler_check_decorator=login_required, - task_request_context_decorator=_webserver_request_context_decorator, + task_request_context_decorator=webserver_request_context_decorator, ) diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index a4c95a6e1ccd..039b746e4892 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -19,10 +19,12 @@ AsyncJobNameData, ) from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE -from models_library.generics import Envelope from pydantic import BaseModel from servicelib.aiohttp import status -from servicelib.aiohttp.client_session import get_client_session +from servicelib.aiohttp.long_running_tasks.server import ( + get_task_context, + get_tasks_manager, +) from servicelib.aiohttp.requests_validation import ( parse_request_path_parameters_as, ) @@ -31,6 +33,7 @@ from .._meta import API_VTAG from ..login.decorators import login_required +from ..long_running_tasks import webserver_request_context_decorator from ..models import RequestContext from ..rabbitmq import get_rabbitmq_rpc_client from ..security.decorators import permission_required @@ -51,23 +54,11 @@ @login_required @permission_required("storage.files.*") @handle_export_data_exceptions +@webserver_request_context_decorator async def get_async_jobs(request: web.Request) -> web.Response: - session = get_client_session(request.app) - async with session.request( - "GET", - request.url.with_path(str(request.app.router["list_tasks"].url_for())), - cookies=request.cookies, - ) as resp: - if resp.status != status.HTTP_200_OK: - return web.Response( - status=resp.status, - body=await resp.read(), - content_type=resp.content_type, - ) - inprocess_tasks = ( - Envelope[list[TaskGet]].model_validate_json(await resp.text()).data - ) - assert inprocess_tasks is not None # nosec + inprocess_task_manager = get_tasks_manager(request.app) + inprocess_task_context = get_task_context(request) + inprocess_tracked_tasks = inprocess_task_manager.list_tasks(inprocess_task_context) _req_ctx = RequestContext.model_validate(request) @@ -92,7 +83,16 @@ async def get_async_jobs(request: web.Request) -> web.Response: ) for job in user_async_jobs ] - + inprocess_tasks, + + [ + TaskGet( + task_id=f"{task.task_id}", + task_name=task.task_name, + status_href=f"{request.app.router['get_task_status'].url_for(task_id=task.task_id)}", + abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=task.task_id)}", + result_href=f"{request.app.router['get_task_result'].url_for(task_id=task.task_id)}", + ) + for task in inprocess_tracked_tasks + ], status=status.HTTP_200_OK, )