Skip to content
6 changes: 6 additions & 0 deletions packages/celery-library/src/celery_library/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@ def decode_celery_transferrable_error(error: TransferrableCeleryError) -> Except
return result


class TaskSubmissionError(OsparcErrorMixin, Exception):
msg_template = (
"Unable to submit task {task_name} with id '{task_id}' and params {task_params}"
)


class TaskNotFoundError(OsparcErrorMixin, Exception):
msg_template = "Task with id '{task_id}' was not found"
49 changes: 39 additions & 10 deletions packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from uuid import uuid4

from celery import Celery # type: ignore[import-untyped]
from celery.exceptions import CeleryError # type: ignore[import-untyped]
from common_library.async_tools import make_async
from models_library.progress_bar import ProgressReport
from servicelib.celery.models import (
Expand All @@ -18,10 +19,11 @@
TaskUUID,
)
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_errors import create_troubleshootting_log_kwargs
from servicelib.logging_utils import log_context
from settings_library.celery import CelerySettings

from .errors import TaskNotFoundError
from .errors import TaskNotFoundError, TaskSubmissionError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,21 +52,48 @@ async def submit_task(
):
task_uuid = uuid4()
task_id = task_filter.create_task_id(task_uuid=task_uuid)
self._celery_app.send_task(
task_metadata.name,
task_id=task_id,
kwargs={"task_id": task_id} | task_params,
queue=task_metadata.queue.value,
)

expiry = (
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
if task_metadata.ephemeral
else self._celery_settings.CELERY_RESULT_EXPIRES
)
await self._task_info_store.create_task(
task_id, task_metadata, expiry=expiry
)

try:
await self._task_info_store.create_task(
task_id, task_metadata, expiry=expiry
)
self._celery_app.send_task(
task_metadata.name,
task_id=task_id,
kwargs={"task_id": task_id} | task_params,
queue=task_metadata.queue.value,
)
except CeleryError as exc:
_logger.exception(
**create_troubleshootting_log_kwargs(
user_error_msg="Unable to submit task",
error=exc,
error_context={
"task_id": task_id,
"task_name": task_metadata.name,
},
)
)
try:
await self._task_info_store.remove_task(task_id)
except CeleryError:
_logger.warning(
"Unable to cleanup task '%s' during error handling",
task_id,
exc_info=True,
)
raise TaskSubmissionError(
task_name=task_metadata.name,
task_id=task_id,
task_params=task_params,
) from exc

return task_uuid

async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:
Expand Down
Loading