Skip to content

Commit f418d29

Browse files
fix: job missing error
1 parent 9336de9 commit f418d29

File tree

4 files changed

+40
-17
lines changed

4 files changed

+40
-17
lines changed

packages/celery-library/src/celery_library/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import base64
22
import pickle
33

4+
from common_library.errors_classes import OsparcErrorMixin
5+
46

57
class TransferrableCeleryError(Exception):
68
def __repr__(self) -> str:
@@ -22,3 +24,7 @@ def decode_celery_transferrable_error(error: TransferrableCeleryError) -> Except
2224
assert isinstance(error, TransferrableCeleryError) # nosec
2325
result: Exception = pickle.loads(base64.b64decode(error.args[0])) # noqa: S301
2426
return result
27+
28+
29+
class TaskNotFoundError(OsparcErrorMixin, Exception):
30+
msg_template = "Task with id '{task_id}' not found"

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from models_library.api_schemas_rpc_async_jobs.exceptions import (
1414
JobAbortedError,
1515
JobError,
16+
JobMissingError,
1617
JobNotDoneError,
1718
JobSchedulerError,
1819
)
@@ -22,6 +23,7 @@
2223
from servicelib.rabbitmq import RPCRouter
2324

2425
from ..errors import (
26+
TaskNotFoundError,
2527
TransferrableCeleryError,
2628
decode_celery_transferrable_error,
2729
)
@@ -30,7 +32,7 @@
3032
router = RPCRouter()
3133

3234

33-
@router.expose(reraise_if_error_type=(JobSchedulerError,))
35+
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
3436
async def cancel(
3537
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
3638
):
@@ -42,11 +44,13 @@ async def cancel(
4244
task_filter=task_filter,
4345
task_uuid=job_id,
4446
)
47+
except TaskNotFoundError as exc:
48+
raise JobMissingError(exc=f"{exc}") from exc
4549
except CeleryError as exc:
4650
raise JobSchedulerError(exc=f"{exc}") from exc
4751

4852

49-
@router.expose(reraise_if_error_type=(JobSchedulerError,))
53+
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
5054
async def status(
5155
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
5256
) -> AsyncJobStatus:
@@ -59,6 +63,8 @@ async def status(
5963
task_filter=task_filter,
6064
task_uuid=job_id,
6165
)
66+
except TaskNotFoundError as exc:
67+
raise JobMissingError(exc=f"{exc}") from exc
6268
except CeleryError as exc:
6369
raise JobSchedulerError(exc=f"{exc}") from exc
6470

@@ -71,9 +77,10 @@ async def status(
7177

7278
@router.expose(
7379
reraise_if_error_type=(
80+
JobAbortedError,
7481
JobError,
82+
JobMissingError,
7583
JobNotDoneError,
76-
JobAbortedError,
7784
JobSchedulerError,
7885
)
7986
)
@@ -97,6 +104,8 @@ async def result(
97104
task_filter=task_filter,
98105
task_uuid=job_id,
99106
)
107+
except TaskNotFoundError as exc:
108+
raise JobMissingError(exc=f"{exc}") from exc
100109
except CeleryError as exc:
101110
raise JobSchedulerError(exc=f"{exc}") from exc
102111

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from servicelib.logging_utils import log_context
2222
from settings_library.celery import CelerySettings
2323

24+
from .errors import TaskNotFoundError
2425
from .utils import build_task_id
2526

2627
_logger = logging.getLogger(__name__)
@@ -74,6 +75,9 @@ async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> Non
7475
msg=f"task cancellation: {task_filter=} {task_uuid=}",
7576
):
7677
task_id = build_task_id(task_filter, task_uuid)
78+
if not await self.exists_task(task_id):
79+
raise TaskNotFoundError(task_id=task_id)
80+
7781
await self._task_info_store.remove_task(task_id)
7882
await self._forget_task(task_id)
7983

@@ -93,6 +97,9 @@ async def get_task_result(
9397
msg=f"Get task result: {task_filter=} {task_uuid=}",
9498
):
9599
task_id = build_task_id(task_filter, task_uuid)
100+
if not await self.exists_task(task_id):
101+
raise TaskNotFoundError(task_id=task_id)
102+
96103
async_result = self._celery_app.AsyncResult(task_id)
97104
result = async_result.result
98105
if async_result.ready():
@@ -103,10 +110,9 @@ async def get_task_result(
103110
return result
104111

105112
async def _get_task_progress_report(
106-
self, task_filter: TaskFilter, task_uuid: TaskUUID, task_state: TaskState
113+
self, task_id: TaskID, task_state: TaskState
107114
) -> ProgressReport:
108115
if task_state in (TaskState.STARTED, TaskState.RETRY):
109-
task_id = build_task_id(task_filter, task_uuid)
110116
progress = await self._task_info_store.get_task_progress(task_id)
111117
if progress is not None:
112118
return progress
@@ -135,14 +141,14 @@ async def get_task_status(
135141
):
136142
task_id = build_task_id(task_filter, task_uuid)
137143
if not await self.exists_task(task_id):
138-
task_state = TaskState.ABORTED
139-
else:
140-
task_state = await self._get_task_celery_state(task_id)
144+
raise TaskNotFoundError(task_id=task_id)
145+
146+
task_state = await self._get_task_celery_state(task_id)
141147
return TaskStatus(
142148
task_uuid=task_uuid,
143149
task_state=task_state,
144150
progress_report=await self._get_task_progress_report(
145-
task_filter, task_uuid, task_state
151+
task_id, task_state
146152
),
147153
)
148154

packages/celery-library/tests/unit/test_async_jobs.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
AsyncJobGet,
2121
)
2222
from models_library.api_schemas_rpc_async_jobs.exceptions import (
23-
JobAbortedError,
2423
JobError,
24+
JobMissingError,
2525
)
2626
from models_library.products import ProductName
2727
from models_library.rabbitmq_basic_types import RPCNamespace
@@ -308,12 +308,6 @@ async def test_async_jobs_cancel(
308308
job_filter=job_filter,
309309
)
310310

311-
await _wait_for_job(
312-
async_jobs_rabbitmq_rpc_client,
313-
async_job_get=async_job_get,
314-
job_filter=job_filter,
315-
)
316-
317311
jobs = await async_jobs.list_jobs(
318312
async_jobs_rabbitmq_rpc_client,
319313
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
@@ -322,7 +316,15 @@ async def test_async_jobs_cancel(
322316
)
323317
assert async_job_get.job_id not in [job.job_id for job in jobs]
324318

325-
with pytest.raises(JobAbortedError):
319+
with pytest.raises(JobMissingError):
320+
await async_jobs.status(
321+
async_jobs_rabbitmq_rpc_client,
322+
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,
323+
job_id=async_job_get.job_id,
324+
job_filter=job_filter,
325+
)
326+
327+
with pytest.raises(JobMissingError):
326328
await async_jobs.result(
327329
async_jobs_rabbitmq_rpc_client,
328330
rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE,

0 commit comments

Comments
 (0)