33import logging
44
55from celery .exceptions import CeleryError # type: ignore[import-untyped]
6- from celery_library .errors import (
7- TransferrableCeleryError ,
8- decode_celery_transferrable_error ,
9- )
10- from fastapi import FastAPI
116from models_library .api_schemas_rpc_async_jobs .async_jobs import (
127 AsyncJobGet ,
138 AsyncJobId ,
2217 JobSchedulerError ,
2318)
2419from servicelib .celery .models import TaskState
20+ from servicelib .celery .task_manager import TaskManager
2521from servicelib .logging_utils import log_catch
2622from servicelib .rabbitmq import RPCRouter
2723
28- from ...modules .celery import get_task_manager_from_app
24+ from ..errors import (
25+ TransferrableCeleryError ,
26+ decode_celery_transferrable_error ,
27+ )
2928
3029_logger = logging .getLogger (__name__ )
3130router = RPCRouter ()
3231
3332
3433@router .expose (reraise_if_error_type = (JobSchedulerError ,))
35- async def cancel (app : FastAPI , job_id : AsyncJobId , job_id_data : AsyncJobNameData ):
36- assert app # nosec
34+ async def cancel (
35+ task_manager : TaskManager , job_id : AsyncJobId , job_id_data : AsyncJobNameData
36+ ):
37+ assert task_manager # nosec
3738 assert job_id_data # nosec
3839 try :
39- await get_task_manager_from_app ( app ) .cancel_task (
40+ await task_manager .cancel_task (
4041 task_context = job_id_data .model_dump (),
4142 task_uuid = job_id ,
4243 )
@@ -46,13 +47,13 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
4647
4748@router .expose (reraise_if_error_type = (JobSchedulerError ,))
4849async def status (
49- app : FastAPI , job_id : AsyncJobId , job_id_data : AsyncJobNameData
50+ task_manager : TaskManager , job_id : AsyncJobId , job_id_data : AsyncJobNameData
5051) -> AsyncJobStatus :
51- assert app # nosec
52+ assert task_manager # nosec
5253 assert job_id_data # nosec
5354
5455 try :
55- task_status = await get_task_manager_from_app ( app ) .get_task_status (
56+ task_status = await task_manager .get_task_status (
5657 task_context = job_id_data .model_dump (),
5758 task_uuid = job_id ,
5859 )
@@ -75,20 +76,20 @@ async def status(
7576 )
7677)
7778async def result (
78- app : FastAPI , job_id : AsyncJobId , job_id_data : AsyncJobNameData
79+ task_manager : TaskManager , job_id : AsyncJobId , job_id_data : AsyncJobNameData
7980) -> AsyncJobResult :
80- assert app # nosec
81+ assert task_manager # nosec
8182 assert job_id # nosec
8283 assert job_id_data # nosec
8384
8485 try :
85- _status = await get_task_manager_from_app ( app ) .get_task_status (
86+ _status = await task_manager .get_task_status (
8687 task_context = job_id_data .model_dump (),
8788 task_uuid = job_id ,
8889 )
8990 if not _status .is_done :
9091 raise JobNotDoneError (job_id = job_id )
91- _result = await get_task_manager_from_app ( app ) .get_task_result (
92+ _result = await task_manager .get_task_result (
9293 task_context = job_id_data .model_dump (),
9394 task_uuid = job_id ,
9495 )
@@ -122,12 +123,12 @@ async def result(
122123
123124@router .expose (reraise_if_error_type = (JobSchedulerError ,))
124125async def list_jobs (
125- app : FastAPI , filter_ : str , job_id_data : AsyncJobNameData
126+ task_manager : TaskManager , filter_ : str , job_id_data : AsyncJobNameData
126127) -> list [AsyncJobGet ]:
127128 _ = filter_
128- assert app # nosec
129+ assert task_manager # nosec
129130 try :
130- tasks = await get_task_manager_from_app ( app ) .list_tasks (
131+ tasks = await task_manager .list_tasks (
131132 task_context = job_id_data .model_dump (),
132133 )
133134 except CeleryError as exc :
0 commit comments