diff --git a/packages/celery-library/src/celery_library/errors.py b/packages/celery-library/src/celery_library/errors.py index e4ba148b881..4575bbdeaa2 100644 --- a/packages/celery-library/src/celery_library/errors.py +++ b/packages/celery-library/src/celery_library/errors.py @@ -26,5 +26,11 @@ def decode_celery_transferrable_error(error: TransferrableCeleryError) -> Except return result +class TaskSubmissionError(OsparcErrorMixin, Exception): + msg_template = ( + "Unable to submit task {task_name} with id '{task_id}' and params {task_params}" + ) + + class TaskNotFoundError(OsparcErrorMixin, Exception): msg_template = "Task with id '{task_id}' was not found" diff --git a/packages/celery-library/src/celery_library/task_manager.py b/packages/celery-library/src/celery_library/task_manager.py index 182ae53e66d..24c04ca95f3 100644 --- a/packages/celery-library/src/celery_library/task_manager.py +++ b/packages/celery-library/src/celery_library/task_manager.py @@ -4,6 +4,7 @@ from uuid import uuid4 from celery import Celery # type: ignore[import-untyped] +from celery.exceptions import CeleryError # type: ignore[import-untyped] from common_library.async_tools import make_async from models_library.progress_bar import ProgressReport from servicelib.celery.models import ( @@ -21,7 +22,7 @@ from servicelib.logging_utils import log_context from settings_library.celery import CelerySettings -from .errors import TaskNotFoundError +from .errors import TaskNotFoundError, TaskSubmissionError _logger = logging.getLogger(__name__) @@ -50,21 +51,38 @@ async def submit_task( ): task_uuid = uuid4() task_id = task_filter.create_task_id(task_uuid=task_uuid) - self._celery_app.send_task( - task_metadata.name, - task_id=task_id, - kwargs={"task_id": task_id} | task_params, - queue=task_metadata.queue.value, - ) expiry = ( self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES if task_metadata.ephemeral else self._celery_settings.CELERY_RESULT_EXPIRES ) - await self._task_info_store.create_task( - task_id, task_metadata, expiry=expiry - ) + + try: + await self._task_info_store.create_task( + task_id, task_metadata, expiry=expiry + ) + self._celery_app.send_task( + task_metadata.name, + task_id=task_id, + kwargs={"task_id": task_id} | task_params, + queue=task_metadata.queue.value, + ) + except CeleryError as exc: + try: + await self._task_info_store.remove_task(task_id) + except CeleryError: + _logger.warning( + "Unable to cleanup task '%s' during error handling", + task_id, + exc_info=True, + ) + raise TaskSubmissionError( + task_name=task_metadata.name, + task_id=task_id, + task_params=task_params, + ) from exc + return task_uuid async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None: