Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e4402b4
add log
giancarloromeo Mar 26, 2026
6537791
Merge remote-tracking branch 'upstream/master' into fix/remove-epheme…
giancarloromeo Mar 26, 2026
6943d47
add get_result
giancarloromeo Mar 26, 2026
7826861
add test
giancarloromeo Mar 26, 2026
b132513
use strategy
giancarloromeo Mar 26, 2026
a7232f5
use simple dispatch
giancarloromeo Mar 26, 2026
7b532ab
pylint
giancarloromeo Mar 26, 2026
de717d5
remove log
giancarloromeo Mar 26, 2026
205d536
Merge branch 'master' into fix/remove-ephemeral-group-tasks-on-complete
giancarloromeo Mar 26, 2026
fd4e8f5
fix
giancarloromeo Mar 26, 2026
284256b
Merge branch 'fix/remove-ephemeral-group-tasks-on-complete' of github…
giancarloromeo Mar 26, 2026
57701c3
clean interface
giancarloromeo Mar 27, 2026
dacac92
pylint
giancarloromeo Mar 27, 2026
730aa1d
fix
giancarloromeo Mar 27, 2026
5ffbff2
fix
giancarloromeo Mar 27, 2026
136fbbf
fix
giancarloromeo Mar 27, 2026
e0bed38
fix
giancarloromeo Mar 27, 2026
d7cad2c
Merge branch 'master' into fix/remove-ephemeral-group-tasks-on-complete
giancarloromeo Mar 27, 2026
8f8c1ba
fix
giancarloromeo Mar 27, 2026
4095a88
fix
giancarloromeo Mar 27, 2026
f31f02a
fix
giancarloromeo Mar 27, 2026
c2c99e5
fix
giancarloromeo Mar 27, 2026
9ce02cc
fix
giancarloromeo Mar 27, 2026
519f231
fix
giancarloromeo Mar 27, 2026
0f5bdd5
fix
giancarloromeo Mar 27, 2026
f00a347
fix
giancarloromeo Mar 27, 2026
6d2da42
Merge branch 'master' into fix/remove-ephemeral-group-tasks-on-complete
giancarloromeo Mar 27, 2026
f67f69e
fix
giancarloromeo Mar 27, 2026
db57d43
Merge branch 'fix/remove-ephemeral-group-tasks-on-complete' of github…
giancarloromeo Mar 27, 2026
42cc90f
Merge branch 'master' into fix/remove-ephemeral-group-tasks-on-complete
giancarloromeo Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 88 additions & 61 deletions packages/celery-library/src/celery_library/_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,14 @@ async def submit_task(
return task_uuid

@handle_celery_errors
async def cancel_task(self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"task cancellation: {owner_metadata=} {task_uuid=}",
):
task_key = owner_metadata.model_dump_key(task_or_group_uuid=task_uuid)
if not await self.task_or_group_exists(task_key):
raise TaskNotFoundError(task_uuid=task_uuid, owner_metadata=owner_metadata)

await self._task_store.remove_task(task_key)
await self._forget_task(task_key)
async def cancel(self, owner_metadata: OwnerMetadata, task_or_group_uuid: TaskUUID | GroupUUID) -> None:
if await self._is_group(owner_metadata, task_or_group_uuid):
await self._cancel_group(owner_metadata, task_or_group_uuid)
else:
await self._cancel_task(owner_metadata, task_or_group_uuid)

@handle_celery_errors
async def cancel_group(self, owner_metadata: OwnerMetadata, group_uuid: GroupUUID) -> None:
async def _cancel_group(self, owner_metadata: OwnerMetadata, group_uuid: GroupUUID) -> None:
with log_context(
_logger,
logging.DEBUG,
Expand All @@ -249,6 +242,20 @@ async def cancel_group(self, owner_metadata: OwnerMetadata, group_uuid: GroupUUI

await self._task_store.remove_task(group_key)

@handle_celery_errors
async def _cancel_task(self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"task cancellation: {owner_metadata=} {task_uuid=}",
):
task_key = owner_metadata.model_dump_key(task_or_group_uuid=task_uuid)
if not await self.task_or_group_exists(task_key):
raise TaskNotFoundError(task_uuid=task_uuid, owner_metadata=owner_metadata)

await self._task_store.remove_task(task_key)
await self._forget_task(task_key)

async def task_or_group_exists(self, task_or_group_key: TaskKey | GroupKey) -> bool:
return await self._task_store.task_or_group_exists(task_or_group_key)

Expand All @@ -257,7 +264,7 @@ def _forget_task(self, task_key: TaskKey) -> None:
self._app.AsyncResult(task_key).forget()

@handle_celery_errors
async def get_task_result(self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID) -> Any:
async def _get_task_result(self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID) -> Any:
with log_context(
_logger,
logging.DEBUG,
Expand Down Expand Up @@ -292,62 +299,30 @@ async def _get_task_progress_report(self, task_key: TaskKey, task_state: TaskSta
def _get_task_celery_state(self, task_key: TaskKey) -> TaskState:
return TaskState(self._app.AsyncResult(task_key).state)

@handle_celery_errors
async def get_task_status(self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID) -> TaskStatus:
async def _get_group_result(self, owner_metadata: OwnerMetadata, group_uuid: GroupUUID) -> list[Any]:
with log_context(
_logger,
logging.DEBUG,
msg=f"Getting task status: {owner_metadata=} {task_uuid=}",
msg=f"Get group result: {owner_metadata=} {group_uuid=}",
):
task_key = owner_metadata.model_dump_key(task_or_group_uuid=task_uuid)
if not await self.task_or_group_exists(task_key):
raise TaskNotFoundError(task_uuid=task_uuid, owner_metadata=owner_metadata)

task_state = await self._get_task_celery_state(task_key)
return TaskStatus(
task_uuid=task_uuid,
task_state=task_state,
progress_report=await self._get_task_progress_report(task_key, task_state),
)

@handle_celery_errors
async def cancel(self, owner_metadata: OwnerMetadata, task_or_group_uuid: TaskUUID | GroupUUID) -> None:
task_or_group_key = owner_metadata.model_dump_key(task_or_group_uuid=task_or_group_uuid)
if not await self.task_or_group_exists(task_or_group_key):
raise TaskNotFoundError(task_uuid=task_or_group_uuid, owner_metadata=owner_metadata)

task_metadata = await self._task_store.get_task_metadata(task_or_group_key)
if task_metadata and task_metadata.type == ExecutorType.GROUP:
await self.cancel_group(owner_metadata, task_or_group_uuid)
return

await self.cancel_task(owner_metadata, task_or_group_uuid)
group_key = owner_metadata.model_dump_key(task_or_group_uuid=group_uuid)
if not await self.task_or_group_exists(group_key):
raise GroupNotFoundError(group_uuid=group_uuid, owner_metadata=owner_metadata)

@handle_celery_errors
async def get_status(
self, owner_metadata: OwnerMetadata, task_or_group_uuid: TaskUUID | GroupUUID
) -> TaskStatus | GroupStatus:
task_or_group_key = owner_metadata.model_dump_key(task_or_group_uuid=task_or_group_uuid)
if not await self.task_or_group_exists(task_or_group_key):
raise TaskNotFoundError(task_uuid=task_or_group_uuid, owner_metadata=owner_metadata)
group_result = await self._restore_group_result(group_uuid)
if group_result is None:
raise GroupNotFoundError(group_uuid=group_uuid, owner_metadata=owner_metadata)

task_metadata = await self._task_store.get_task_metadata(task_or_group_key)
if task_metadata and task_metadata.type == ExecutorType.GROUP:
return await self.get_group_status(owner_metadata, task_or_group_uuid) # type: ignore[no-any-return]
results: list[Any] = [async_result.result for async_result in (group_result.results or [])]

return await self.get_task_status(owner_metadata, task_or_group_uuid) # type: ignore[no-any-return]
if group_result.ready():
task_metadata = await self._task_store.get_task_metadata(group_key)
if task_metadata is not None and task_metadata.ephemeral:
await self._cancel_group(owner_metadata, group_uuid)

@make_async()
def _restore_group_result(self, group_uuid: GroupUUID) -> GroupResult | None:
"""Restore a GroupResult from its ID."""
try:
return GroupResult.restore(f"{group_uuid}", app=self._app)
except (KeyError, AttributeError):
# Group not found or invalid
return None
return results

@handle_celery_errors
async def get_group_status(self, owner_metadata: OwnerMetadata, group_uuid: GroupUUID) -> GroupStatus:
async def _get_group_status(self, owner_metadata: OwnerMetadata, group_uuid: GroupUUID) -> GroupStatus:
with log_context(
_logger,
logging.DEBUG,
Expand Down Expand Up @@ -387,6 +362,58 @@ async def get_group_status(self, owner_metadata: OwnerMetadata, group_uuid: Grou
),
)

async def _get_task_status(self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID) -> TaskStatus:
with log_context(
_logger,
logging.DEBUG,
msg=f"Getting task status: {owner_metadata=} {task_uuid=}",
):
task_key = owner_metadata.model_dump_key(task_or_group_uuid=task_uuid)
if not await self.task_or_group_exists(task_key):
raise TaskNotFoundError(task_uuid=task_uuid, owner_metadata=owner_metadata)

task_state = await self._get_task_celery_state(task_key)
return TaskStatus(
task_uuid=task_uuid,
task_state=task_state,
progress_report=await self._get_task_progress_report(task_key, task_state),
)

async def _is_group(
self,
owner_metadata: OwnerMetadata,
task_or_group_uuid: TaskUUID | GroupUUID,
) -> bool:
task_or_group_key = owner_metadata.model_dump_key(task_or_group_uuid=task_or_group_uuid)
if not await self.task_or_group_exists(task_or_group_key):
raise TaskNotFoundError(task_uuid=task_or_group_uuid, owner_metadata=owner_metadata)

task_metadata = await self._task_store.get_task_metadata(task_or_group_key)
return task_metadata is not None and task_metadata.type == ExecutorType.GROUP

@handle_celery_errors
async def get_result(self, owner_metadata: OwnerMetadata, task_or_group_uuid: TaskUUID | GroupUUID) -> Any:
if await self._is_group(owner_metadata, task_or_group_uuid):
return await self._get_group_result(owner_metadata, task_or_group_uuid)
return await self._get_task_result(owner_metadata, task_or_group_uuid)

@handle_celery_errors
async def get_status(
self, owner_metadata: OwnerMetadata, task_or_group_uuid: TaskUUID | GroupUUID
) -> TaskStatus | GroupStatus:
if await self._is_group(owner_metadata, task_or_group_uuid):
return await self._get_group_status(owner_metadata, task_or_group_uuid)
return await self._get_task_status(owner_metadata, task_or_group_uuid)

@make_async()
def _restore_group_result(self, group_uuid: GroupUUID) -> GroupResult | None:
"""Restore a GroupResult from its ID."""
try:
return GroupResult.restore(f"{group_uuid}", app=self._app)
except (KeyError, AttributeError):
# Group not found or invalid
return None

@handle_celery_errors
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
with log_context(_logger, logging.DEBUG, "Listing tasks: owner_metadata=%s", owner_metadata):
Expand Down
16 changes: 8 additions & 8 deletions packages/celery-library/src/celery_library/async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
JobNotDoneError,
JobSchedulerError,
)
from models_library.celery import OwnerMetadata, TaskExecutionMetadata, TaskState
from models_library.celery import OwnerMetadata, TaskExecutionMetadata, TaskState, TaskStatus
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_utils import log_catch
from tenacity import (
Expand Down Expand Up @@ -50,9 +50,9 @@ async def cancel_job(
job_id: AsyncJobId,
) -> None:
try:
await task_manager.cancel_task(
await task_manager.cancel(
owner_metadata=owner_metadata,
task_uuid=job_id,
task_or_group_uuid=job_id,
)
except TaskNotFoundError as exc:
raise JobMissingError(job_id=job_id) from exc
Expand All @@ -71,22 +71,22 @@ async def get_job_result(
assert owner_metadata # nosec

try:
task_status = await task_manager.get_task_status(
task_status = await task_manager.get_status(
owner_metadata=owner_metadata,
task_uuid=job_id,
task_or_group_uuid=job_id,
)
if not task_status.is_done:
raise JobNotDoneError(job_id=job_id)
task_result = await task_manager.get_task_result(
task_result = await task_manager.get_result(
owner_metadata=owner_metadata,
task_uuid=job_id,
task_or_group_uuid=job_id,
)
except TaskNotFoundError as exc:
raise JobMissingError(job_id=job_id) from exc
except TaskManagerError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc

if task_status.task_state == TaskState.FAILURE:
if isinstance(task_status, TaskStatus) and task_status.task_state == TaskState.FAILURE:
# fallback exception to report
exc_type = type(task_result).__name__
exc_msg = f"{task_result}"
Expand Down
9 changes: 7 additions & 2 deletions packages/celery-library/src/celery_library/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import base64
import pickle
from collections.abc import Callable
from functools import wraps
from types import CoroutineType
from typing import Any

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from common_library.errors_classes import OsparcErrorMixin
Expand Down Expand Up @@ -48,9 +51,11 @@ class TaskManagerError(OsparcErrorMixin, Exception):
msg_template = "An internal error occurred"


def handle_celery_errors(func):
def handle_celery_errors[**P, R](
func: Callable[P, CoroutineType[Any, Any, R]],
) -> Callable[P, CoroutineType[Any, Any, R]]:
@wraps(func)
async def wrapper(*args, **kwargs):
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
return await func(*args, **kwargs)
except CeleryError as exc:
Expand Down
Loading
Loading