Skip to content
19 changes: 13 additions & 6 deletions packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ async def submit_task(
):
task_uuid = uuid4()
task_id = task_filter.create_task_id(task_uuid=task_uuid)
self._celery_app.send_task(
task_metadata.name,
task_id=task_id,
kwargs={"task_id": task_id} | task_params,
queue=task_metadata.queue.value,
)

expiry = (
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
Expand All @@ -65,6 +59,19 @@ async def submit_task(
await self._task_info_store.create_task(
task_id, task_metadata, expiry=expiry
)

try:
self._celery_app.send_task(
task_metadata.name,
task_id=task_id,
kwargs={"task_id": task_id} | task_params,
queue=task_metadata.queue.value,
)
except Exception:
_logger.exception("Task '%s' submission failed", task_metadata.name)
await self._task_info_store.remove_task(task_id)
raise

return task_uuid

async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:
Expand Down
Loading