1515 TaskResult ,
1616 TaskStatus ,
1717)
18- from models_library .api_schemas_storage import STORAGE_RPC_NAMESPACE
1918from pydantic import BaseModel
2019from servicelib .aiohttp import status
2120from servicelib .aiohttp .long_running_tasks .server import (
3130)
3231from servicelib .celery .models import TaskFilter
3332from servicelib .long_running_tasks import lrt_api
34- from servicelib .rabbitmq .rpc_interfaces .async_jobs import async_jobs
3533from servicelib .sse .models import SSEEvent , SSEHeaders
3634
3735from .._meta import API_VTAG
3836from ..celery import get_task_manager
3937from ..login .decorators import login_required
4038from ..long_running_tasks .plugin import webserver_request_context_decorator
4139from ..models import AuthenticatedRequestContext
42- from ..rabbitmq import get_rabbitmq_rpc_client
4340from ..security .decorators import permission_required
4441from ..utils import get_job_filter
4542from ._exception_handlers import handle_exceptions
@@ -74,26 +71,25 @@ async def get_async_jobs(request: web.Request) -> web.Response:
7471
7572 _req_ctx = AuthenticatedRequestContext .model_validate (request )
7673
77- rabbitmq_rpc_client = get_rabbitmq_rpc_client (request .app )
78-
79- user_async_jobs = await async_jobs .list_jobs (
80- rabbitmq_rpc_client = rabbitmq_rpc_client ,
81- rpc_namespace = STORAGE_RPC_NAMESPACE ,
82- job_filter = get_job_filter (
83- user_id = _req_ctx .user_id ,
84- product_name = _req_ctx .product_name ,
85- ),
74+ tasks = await get_task_manager (request .app ).list_tasks (
75+ task_filter = TaskFilter .model_validate (
76+ get_job_filter (
77+ user_id = _req_ctx .user_id ,
78+ product_name = _req_ctx .product_name ,
79+ )
80+ )
8681 )
82+
8783 return create_data_response (
8884 [
8985 TaskGet (
90- task_id = f"{ job . job_id } " ,
91- task_name = job . job_name ,
92- status_href = f"{ request .url .with_path (str (request .app .router ['get_async_job_status' ].url_for (task_id = str (job . job_id ))))} " ,
93- abort_href = f"{ request .url .with_path (str (request .app .router ['cancel_async_job' ].url_for (task_id = str (job . job_id ))))} " ,
94- result_href = f"{ request .url .with_path (str (request .app .router ['get_async_job_result' ].url_for (task_id = str (job . job_id ))))} " ,
86+ task_id = f"{ task . uuid } " ,
87+ task_name = task . metadata . name ,
88+ status_href = f"{ request .url .with_path (str (request .app .router ['get_async_job_status' ].url_for (task_id = str (task . uuid ))))} " ,
89+ abort_href = f"{ request .url .with_path (str (request .app .router ['cancel_async_job' ].url_for (task_id = str (task . uuid ))))} " ,
90+ result_href = f"{ request .url .with_path (str (request .app .router ['get_async_job_result' ].url_for (task_id = str (task . uuid ))))} " ,
9591 )
96- for job in user_async_jobs
92+ for task in tasks
9793 ]
9894 + [
9995 TaskGet (
@@ -115,17 +111,15 @@ async def get_async_jobs(request: web.Request) -> web.Response:
115111@login_required
116112@handle_exceptions
117113async def get_async_job_status (request : web .Request ) -> web .Response :
118-
119114 _req_ctx = AuthenticatedRequestContext .model_validate (request )
120-
121115 path_params = parse_request_path_parameters_as (_PathParams , request )
122- task_manager = get_task_manager ( request . app )
116+
123117 task_filter = get_job_filter (
124118 user_id = _req_ctx .user_id ,
125119 product_name = _req_ctx .product_name ,
126120 )
127- task_status = await task_manager .get_task_status (
128- task_filter = TaskFilter .model_validate (task_filter . model_dump () ),
121+ task_status = await get_task_manager ( request . app ) .get_task_status (
122+ task_filter = TaskFilter .model_validate (task_filter ),
129123 task_uuid = path_params .task_id ,
130124 )
131125
@@ -159,7 +153,7 @@ async def cancel_async_job(request: web.Request) -> web.Response:
159153 product_name = _req_ctx .product_name ,
160154 )
161155 await task_manager .cancel_task (
162- task_filter = TaskFilter .model_validate (task_filter . model_dump () ),
156+ task_filter = TaskFilter .model_validate (task_filter ),
163157 task_uuid = path_params .task_id ,
164158 )
165159
@@ -177,13 +171,12 @@ async def get_async_job_result(request: web.Request) -> web.Response:
177171 _req_ctx = AuthenticatedRequestContext .model_validate (request )
178172 path_params = parse_request_path_parameters_as (_PathParams , request )
179173
180- task_manager = get_task_manager (request .app )
181174 task_filter = get_job_filter (
182175 user_id = _req_ctx .user_id ,
183176 product_name = _req_ctx .product_name ,
184177 )
185- task_result = await task_manager .get_task_result (
186- task_filter = TaskFilter .model_validate (task_filter . model_dump () ),
178+ task_result = await get_task_manager ( request . app ) .get_task_result (
179+ task_filter = TaskFilter .model_validate (task_filter ),
187180 task_uuid = path_params .task_id ,
188181 )
189182
@@ -205,14 +198,13 @@ async def get_async_job_stream(request: web.Request) -> web.Response:
205198 path_params = parse_request_path_parameters_as (_PathParams , request )
206199 header_params = parse_request_headers_as (SSEHeaders , request )
207200
208- task_manager = get_task_manager (request .app )
209201 task_filter = get_job_filter (
210202 user_id = _req_ctx .user_id ,
211203 product_name = _req_ctx .product_name ,
212204 )
213205
214206 async def event_generator ():
215- async for event_id , event in task_manager .consume_task_events (
207+ async for event_id , event in get_task_manager ( request . app ) .consume_task_events (
216208 task_filter = TaskFilter .model_validate (task_filter .model_dump ()),
217209 task_uuid = path_params .task_id ,
218210 last_id = header_params .last_event_id ,
0 commit comments