1414from models_library .api_schemas_rpc_async_jobs .exceptions import (
1515 JobAbortedError ,
1616 JobError ,
17- JobMissingError ,
1817 JobNotDoneError ,
1918 JobSchedulerError ,
2019)
2120from servicelib .logging_utils import log_catch
2221from servicelib .rabbitmq import RPCRouter
2322
2423from ...modules .celery import get_celery_client
25- from ...modules .celery .client import CeleryTaskQueueClient
2624from ...modules .celery .models import TaskError , TaskState
2725
2826_logger = logging .getLogger (__name__ )
2927router = RPCRouter ()
3028
3129
32- async def _assert_job_exists (
33- * ,
34- job_id : AsyncJobId ,
35- job_id_data : AsyncJobNameData ,
36- celery_client : CeleryTaskQueueClient ,
37- ) -> None :
38- """Raises JobMissingError if job doesn't exist"""
39- job_ids = await celery_client .get_task_uuids (
40- task_context = job_id_data .model_dump (),
41- )
42- if job_id not in job_ids :
43- raise JobMissingError (job_id = f"{ job_id } " )
44-
45-
46- @router .expose (reraise_if_error_type = (JobSchedulerError , JobMissingError ))
30+ @router .expose (reraise_if_error_type = (JobSchedulerError ,))
4731async def cancel (app : FastAPI , job_id : AsyncJobId , job_id_data : AsyncJobNameData ):
4832 assert app # nosec
4933 assert job_id_data # nosec
5034 try :
51- await _assert_job_exists (
52- job_id = job_id , job_id_data = job_id_data , celery_client = get_celery_client (app )
53- )
5435 await get_celery_client (app ).abort_task (
5536 task_context = job_id_data .model_dump (),
5637 task_uuid = job_id ,
@@ -59,17 +40,14 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
5940 raise JobSchedulerError (exc = f"{ exc } " ) from exc
6041
6142
62- @router .expose (reraise_if_error_type = (JobSchedulerError , JobMissingError ))
43+ @router .expose (reraise_if_error_type = (JobSchedulerError ,))
6344async def status (
6445 app : FastAPI , job_id : AsyncJobId , job_id_data : AsyncJobNameData
6546) -> AsyncJobStatus :
6647 assert app # nosec
6748 assert job_id_data # nosec
6849
6950 try :
70- await _assert_job_exists (
71- job_id = job_id , job_id_data = job_id_data , celery_client = get_celery_client (app )
72- )
7351 task_status = await get_celery_client (app ).get_task_status (
7452 task_context = job_id_data .model_dump (),
7553 task_uuid = job_id ,
@@ -90,7 +68,6 @@ async def status(
9068 JobNotDoneError ,
9169 JobAbortedError ,
9270 JobSchedulerError ,
93- JobMissingError ,
9471 )
9572)
9673async def result (
@@ -101,9 +78,6 @@ async def result(
10178 assert job_id_data # nosec
10279
10380 try :
104- await _assert_job_exists (
105- job_id = job_id , job_id_data = job_id_data , celery_client = get_celery_client (app )
106- )
10781 _status = await get_celery_client (app ).get_task_status (
10882 task_context = job_id_data .model_dump (),
10983 task_uuid = job_id ,
0 commit comments