Skip to content

Commit 4600d77

Browse files
🐛 Fix Celery task submission (#8371)
1 parent 5a5b417 commit 4600d77

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

packages/celery-library/src/celery_library/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,11 @@ def decode_celery_transferrable_error(error: TransferrableCeleryError) -> Except
2626
return result
2727

2828

29+
class TaskSubmissionError(OsparcErrorMixin, Exception):
30+
msg_template = (
31+
"Unable to submit task {task_name} with id '{task_id}' and params {task_params}"
32+
)
33+
34+
2935
class TaskNotFoundError(OsparcErrorMixin, Exception):
3036
msg_template = "Task with id '{task_id}' was not found"

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from uuid import uuid4
55

66
from celery import Celery # type: ignore[import-untyped]
7+
from celery.exceptions import CeleryError # type: ignore[import-untyped]
78
from common_library.async_tools import make_async
89
from models_library.progress_bar import ProgressReport
910
from servicelib.celery.models import (
@@ -21,7 +22,7 @@
2122
from servicelib.logging_utils import log_context
2223
from settings_library.celery import CelerySettings
2324

24-
from .errors import TaskNotFoundError
25+
from .errors import TaskNotFoundError, TaskSubmissionError
2526

2627
_logger = logging.getLogger(__name__)
2728

@@ -50,21 +51,38 @@ async def submit_task(
5051
):
5152
task_uuid = uuid4()
5253
task_id = task_filter.create_task_id(task_uuid=task_uuid)
53-
self._celery_app.send_task(
54-
task_metadata.name,
55-
task_id=task_id,
56-
kwargs={"task_id": task_id} | task_params,
57-
queue=task_metadata.queue.value,
58-
)
5954

6055
expiry = (
6156
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
6257
if task_metadata.ephemeral
6358
else self._celery_settings.CELERY_RESULT_EXPIRES
6459
)
65-
await self._task_info_store.create_task(
66-
task_id, task_metadata, expiry=expiry
67-
)
60+
61+
try:
62+
await self._task_info_store.create_task(
63+
task_id, task_metadata, expiry=expiry
64+
)
65+
self._celery_app.send_task(
66+
task_metadata.name,
67+
task_id=task_id,
68+
kwargs={"task_id": task_id} | task_params,
69+
queue=task_metadata.queue.value,
70+
)
71+
except CeleryError as exc:
72+
try:
73+
await self._task_info_store.remove_task(task_id)
74+
except CeleryError:
75+
_logger.warning(
76+
"Unable to cleanup task '%s' during error handling",
77+
task_id,
78+
exc_info=True,
79+
)
80+
raise TaskSubmissionError(
81+
task_name=task_metadata.name,
82+
task_id=task_id,
83+
task_params=task_params,
84+
) from exc
85+
6886
return task_uuid
6987

7088
async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:

0 commit comments

Comments
 (0)