Skip to content
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
87e855c
add task_owner as required field in TaskFilter
bisgaard-itis Sep 18, 2025
4462b0e
client_name -> TaskOwner
bisgaard-itis Sep 18, 2025
51ef580
make AsyncJobFilter minimal
bisgaard-itis Sep 18, 2025
26abc99
improve naming
bisgaard-itis Sep 18, 2025
d2cf678
task_owner -> owner
bisgaard-itis Sep 18, 2025
87c63ef
TaskFilter -> OwnerMetadata
bisgaard-itis Sep 18, 2025
a66d944
further fixes to OwnerMetadata
bisgaard-itis Sep 19, 2025
052aa3f
minor fix
bisgaard-itis Sep 19, 2025
b716c64
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
e3033e5
cleanup
bisgaard-itis Sep 19, 2025
07531d4
cleanup task submit in webserver
bisgaard-itis Sep 19, 2025
e0ed541
test fixes
bisgaard-itis Sep 19, 2025
a8e106e
get rid of AsyncJobFilter
bisgaard-itis Sep 19, 2025
e9aa9ba
small fix in api-server
bisgaard-itis Sep 19, 2025
d690ddd
fix tests in celery-library
bisgaard-itis Sep 19, 2025
e57f34e
fix webserver
bisgaard-itis Sep 19, 2025
301fe56
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
2b9765f
fix
bisgaard-itis Sep 19, 2025
6314cdf
test fix in api-server
bisgaard-itis Sep 19, 2025
b59d1ee
fix in mocks
bisgaard-itis Sep 19, 2025
c29ef31
several test fixes
bisgaard-itis Sep 19, 2025
c5042d5
yet another minor fix
bisgaard-itis Sep 19, 2025
0f09f84
fix
bisgaard-itis Sep 19, 2025
2451056
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
5f220b8
make wildcard a basemodel
bisgaard-itis Sep 19, 2025
db10040
test fixes
bisgaard-itis Sep 19, 2025
615891e
test fix
bisgaard-itis Sep 19, 2025
24ef2be
several fixes
bisgaard-itis Sep 19, 2025
7e94b08
improve validation from task_id
bisgaard-itis Sep 19, 2025
c25f45d
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
3f44899
cosmetic change
bisgaard-itis Sep 22, 2025
a85d544
minor change
bisgaard-itis Sep 22, 2025
ca9a457
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 22, 2025
dbaf0fd
add description to owner field
bisgaard-itis Sep 22, 2025
ff51528
@pcrespov @sanderegg use APP_NAME in doc string
bisgaard-itis Sep 22, 2025
41eafbf
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
61b9668
fix type issue
bisgaard-itis Sep 22, 2025
036ae36
rename TaskFilter -> OwnerMetadata
bisgaard-itis Sep 22, 2025
0e1a971
relative imports
bisgaard-itis Sep 22, 2025
4eaab57
@pcrespov move model
bisgaard-itis Sep 22, 2025
d48ed0d
remove model
bisgaard-itis Sep 22, 2025
80a28e1
task_metadata -> execution_metadata
bisgaard-itis Sep 22, 2025
b7fa0bc
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
1f96c61
job_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
7a1c396
name fix @GitHK
bisgaard-itis Sep 22, 2025
ef0b203
name change fixes
bisgaard-itis Sep 22, 2025
5cdaeef
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
3736b88
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
4431084
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 22, 2025
4ba288c
ensure OwnerMetadata can be serialized
bisgaard-itis Sep 22, 2025
341e283
convert to OwnerMetadata baseclass before submitting rpc requests
bisgaard-itis Sep 22, 2025
419a23b
test fix
bisgaard-itis Sep 23, 2025
e442965
add extra fields in OwnerMetadata
bisgaard-itis Sep 23, 2025
9f2b002
fix test
bisgaard-itis Sep 23, 2025
cb114e6
further bug fixes
bisgaard-itis Sep 23, 2025
7dea526
add user_id input
bisgaard-itis Sep 23, 2025
a9bab0e
TaskID -> TaskKey
bisgaard-itis Sep 23, 2025
5303b6f
adapt OwnerMetaData from id to key
bisgaard-itis Sep 23, 2025
86cd121
small fix in model validator
bisgaard-itis Sep 23, 2025
6f74d33
cosmetic fix
bisgaard-itis Sep 23, 2025
d1eaa36
start changing inputs task_id -> task_uuid
bisgaard-itis Sep 23, 2025
4300f50
convert inputs names in RedisTaskInfoStore
bisgaard-itis Sep 23, 2025
66463b7
convert CeleryTaskManager task_id -> task_key
bisgaard-itis Sep 23, 2025
a023c45
convert task_id to task_key in exception models
bisgaard-itis Sep 23, 2025
2ea9853
fix
bisgaard-itis Sep 23, 2025
514bfa6
fix error messages in error models
bisgaard-itis Sep 23, 2025
cb00c7c
fix error model
bisgaard-itis Sep 23, 2025
5e3e77f
update api-server api
bisgaard-itis Sep 23, 2025
32ac020
handle task not found errors properly in api-server
bisgaard-itis Sep 23, 2025
f020bc3
task_id -> task_key in all celery tasks
bisgaard-itis Sep 23, 2025
f5f72c4
minor change
bisgaard-itis Sep 23, 2025
8da86df
futher bugfixes
bisgaard-itis Sep 23, 2025
3f4cfb9
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Sep 26, 2025
7c8cf89
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Sep 26, 2025
c30f67a
@giancarloromeo task_id -> task_key
bisgaard-itis Sep 26, 2025
0c9695e
task_id -> task_key @giancarloromeo
bisgaard-itis Sep 26, 2025
de46286
task_id -> task_key @giancarloromeo
bisgaard-itis Sep 26, 2025
1e91d03
tiask_id -> task_key @giancarloromeo
bisgaard-itis Sep 26, 2025
7674b34
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Sep 26, 2025
3523d63
@giancarloromeo task_id -> task_key
bisgaard-itis Sep 26, 2025
0233f75
fix input name
bisgaard-itis Sep 26, 2025
78d5a42
task_id -> task_key
bisgaard-itis Sep 26, 2025
39ac01d
pylint: remove unused import
bisgaard-itis Sep 26, 2025
490d945
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Sep 29, 2025
bc6ee86
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Sep 29, 2025
6077fb8
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Sep 30, 2025
67993c5
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Oct 2, 2025
1180c06
Merge branch 'master' into improve-celery-task-error-messages-and-imp…
bisgaard-itis Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions packages/celery-library/src/celery_library/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from models_library.progress_bar import ProgressReport
from pydantic import ValidationError
from servicelib.celery.models import (
WILDCARD,
ExecutionMetadata,
OwnerMetadata,
Task,
TaskFilter,
TaskID,
TaskInfoStore,
TaskMetadata,
Wildcard,
TaskKey,
)
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types

Expand All @@ -24,7 +24,7 @@
_logger = logging.getLogger(__name__)


def _build_key(task_id: TaskID) -> str:
def _build_key(task_id: TaskKey) -> str:
return _CELERY_TASK_INFO_PREFIX + task_id


Expand All @@ -34,44 +34,46 @@ def __init__(self, redis_client_sdk: RedisClientSDK) -> None:

async def create_task(
self,
task_id: TaskID,
task_metadata: TaskMetadata,
task_id: TaskKey,
execution_metadata: ExecutionMetadata,
expiry: timedelta,
) -> None:
task_key = _build_key(task_id)
await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hset(
name=task_key,
key=_CELERY_TASK_METADATA_KEY,
value=task_metadata.model_dump_json(),
value=execution_metadata.model_dump_json(),
)
)
await self._redis_client_sdk.redis.expire(
task_key,
expiry,
)

async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
async def get_task_metadata(self, task_key: TaskKey) -> ExecutionMetadata | None:
raw_result = await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hget(
_build_key(task_id), _CELERY_TASK_METADATA_KEY
_build_key(task_key), _CELERY_TASK_METADATA_KEY
)
)
if not raw_result:
return None

try:
return TaskMetadata.model_validate_json(raw_result)
return ExecutionMetadata.model_validate_json(raw_result)
except ValidationError as exc:
_logger.debug(
"Failed to deserialize task metadata for task %s: %s", task_id, f"{exc}"
"Failed to deserialize task metadata for task %s: %s",
task_key,
f"{exc}",
)
return None

async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
async def get_task_progress(self, task_key: TaskKey) -> ProgressReport | None:
raw_result = await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hget(
_build_key(task_id), _CELERY_TASK_PROGRESS_KEY
_build_key(task_key), _CELERY_TASK_PROGRESS_KEY
)
)
if not raw_result:
Expand All @@ -81,13 +83,15 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
return ProgressReport.model_validate_json(raw_result)
except ValidationError as exc:
_logger.debug(
"Failed to deserialize task progress for task %s: %s", task_id, f"{exc}"
"Failed to deserialize task progress for task %s: %s",
task_key,
f"{exc}",
)
return None

async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
task_uuid=Wildcard()
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
search_key = _CELERY_TASK_INFO_PREFIX + owner_metadata.model_dump_task_key(
task_uuid=WILDCARD
)

keys: list[str] = []
Expand All @@ -112,30 +116,32 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
continue

with contextlib.suppress(ValidationError):
task_metadata = TaskMetadata.model_validate_json(raw_metadata)
execution_metadata = ExecutionMetadata.model_validate_json(raw_metadata)
tasks.append(
Task(
uuid=TaskFilter.get_task_uuid(key),
metadata=task_metadata,
uuid=OwnerMetadata.get_task_uuid(key),
metadata=execution_metadata,
)
)

return tasks

async def remove_task(self, task_id: TaskID) -> None:
await self._redis_client_sdk.redis.delete(_build_key(task_id))
async def remove_task(self, task_key: TaskKey) -> None:
await self._redis_client_sdk.redis.delete(_build_key(task_key))

async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
async def set_task_progress(
self, task_key: TaskKey, report: ProgressReport
) -> None:
await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hset(
name=_build_key(task_id),
name=_build_key(task_key),
key=_CELERY_TASK_PROGRESS_KEY,
value=report.model_dump_json(),
)
)

async def task_exists(self, task_id: TaskID) -> bool:
n = await self._redis_client_sdk.redis.exists(_build_key(task_id))
async def task_exists(self, task_key: TaskKey) -> bool:
n = await self._redis_client_sdk.redis.exists(_build_key(task_key))
assert isinstance(n, int) # nosec
return n > 0

Expand Down
6 changes: 2 additions & 4 deletions packages/celery-library/src/celery_library/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ def decode_celery_transferrable_error(error: TransferrableCeleryError) -> Except


class TaskSubmissionError(OsparcErrorMixin, Exception):
msg_template = (
"Unable to submit task {task_name} with id '{task_id}' and params {task_params}"
)
msg_template = "Unable to submit task {task_name} with key '{task_key}' and params {task_params}"


class TaskNotFoundError(OsparcErrorMixin, Exception):
msg_template = "Task with id '{task_id}' was not found"
msg_template = "Task with uuid '{task_uuid}' and owner_metadata '{owner_metadata}' was not found"
32 changes: 13 additions & 19 deletions packages/celery-library/src/celery_library/rpc/_async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobFilter,
AsyncJobGet,
AsyncJobId,
AsyncJobResult,
Expand All @@ -17,7 +16,7 @@
JobNotDoneError,
JobSchedulerError,
)
from servicelib.celery.models import TaskFilter, TaskState
from servicelib.celery.models import OwnerMetadata, TaskState
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_utils import log_catch
from servicelib.rabbitmq import RPCRouter
Expand All @@ -34,14 +33,13 @@

@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
async def cancel(
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
):
assert task_manager # nosec
assert job_filter # nosec
task_filter = TaskFilter.model_validate(job_filter.model_dump())
assert owner_metadata # nosec
try:
await task_manager.cancel_task(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
except TaskNotFoundError as exc:
Expand All @@ -52,15 +50,14 @@ async def cancel(

@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
async def status(
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
) -> AsyncJobStatus:
assert task_manager # nosec
assert job_filter # nosec
assert owner_metadata # nosec

task_filter = TaskFilter.model_validate(job_filter.model_dump())
try:
task_status = await task_manager.get_task_status(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
except TaskNotFoundError as exc:
Expand All @@ -85,23 +82,21 @@ async def status(
)
)
async def result(
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
) -> AsyncJobResult:
assert task_manager # nosec
assert job_id # nosec
assert job_filter # nosec

task_filter = TaskFilter.model_validate(job_filter.model_dump())
assert owner_metadata # nosec

try:
_status = await task_manager.get_task_status(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
if not _status.is_done:
raise JobNotDoneError(job_id=job_id)
_result = await task_manager.get_task_result(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
except TaskNotFoundError as exc:
Expand Down Expand Up @@ -134,13 +129,12 @@ async def result(

@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def list_jobs(
task_manager: TaskManager, job_filter: AsyncJobFilter
task_manager: TaskManager, owner_metadata: OwnerMetadata
) -> list[AsyncJobGet]:
assert task_manager # nosec
task_filter = TaskFilter.model_validate(job_filter.model_dump())
try:
tasks = await task_manager.list_tasks(
task_filter=task_filter,
owner_metadata=owner_metadata,
)
except CeleryError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc
Expand Down
8 changes: 4 additions & 4 deletions packages/celery-library/src/celery_library/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from celery.exceptions import Ignore # type: ignore[import-untyped]
from common_library.async_tools import cancel_wait_task
from pydantic import NonNegativeInt
from servicelib.celery.models import TaskID
from servicelib.celery.models import TaskKey

from .errors import encode_celery_transferrable_error
from .utils import get_app_server
Expand Down Expand Up @@ -47,7 +47,7 @@ def wrapper(task: Task, *args: P.args, **kwargs: P.kwargs) -> R:
# NOTE: task.request is a thread local object, so we need to pass the id explicitly
assert task.request.id is not None # nosec

async def _run_task(task_id: TaskID) -> R:
async def _run_task(task_id: TaskKey) -> R:
try:
async with asyncio.TaskGroup() as tg:
async_io_task = tg.create_task(
Expand Down Expand Up @@ -140,7 +140,7 @@ def wrapper(task: Task, *args: P.args, **kwargs: P.kwargs) -> R:
@overload
def register_task(
app: Celery,
fn: Callable[Concatenate[Task, TaskID, P], Coroutine[Any, Any, R]],
fn: Callable[Concatenate[Task, TaskKey, P], Coroutine[Any, Any, R]],
task_name: str | None = None,
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
max_retries: NonNegativeInt = _DEFAULT_MAX_RETRIES,
Expand All @@ -164,7 +164,7 @@ def register_task(
def register_task( # type: ignore[misc]
app: Celery,
fn: (
Callable[Concatenate[Task, TaskID, P], Coroutine[Any, Any, R]]
Callable[Concatenate[Task, TaskKey, P], Coroutine[Any, Any, R]]
| Callable[Concatenate[Task, P], R]
),
task_name: str | None = None,
Expand Down
Loading
Loading