Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
87e855c
add task_owner as required field in TaskFilter
bisgaard-itis Sep 18, 2025
4462b0e
client_name -> TaskOwner
bisgaard-itis Sep 18, 2025
51ef580
make AsyncJobFilter minimal
bisgaard-itis Sep 18, 2025
26abc99
improve naming
bisgaard-itis Sep 18, 2025
d2cf678
task_owner -> owner
bisgaard-itis Sep 18, 2025
87c63ef
TaskFilter -> OwnerMetadata
bisgaard-itis Sep 18, 2025
a66d944
further fixes to OwnerMetadata
bisgaard-itis Sep 19, 2025
052aa3f
minor fix
bisgaard-itis Sep 19, 2025
b716c64
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
e3033e5
cleanup
bisgaard-itis Sep 19, 2025
07531d4
cleanup task submit in webserver
bisgaard-itis Sep 19, 2025
e0ed541
test fixes
bisgaard-itis Sep 19, 2025
a8e106e
get rid of AsyncJobFilter
bisgaard-itis Sep 19, 2025
e9aa9ba
small fix in api-server
bisgaard-itis Sep 19, 2025
d690ddd
fix tests in celery-library
bisgaard-itis Sep 19, 2025
e57f34e
fix webserver
bisgaard-itis Sep 19, 2025
301fe56
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
2b9765f
fix
bisgaard-itis Sep 19, 2025
6314cdf
test fix in api-server
bisgaard-itis Sep 19, 2025
b59d1ee
fix in mocks
bisgaard-itis Sep 19, 2025
c29ef31
several test fixes
bisgaard-itis Sep 19, 2025
c5042d5
yet another minor fix
bisgaard-itis Sep 19, 2025
0f09f84
fix
bisgaard-itis Sep 19, 2025
2451056
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
5f220b8
make wildcard a basemodel
bisgaard-itis Sep 19, 2025
db10040
test fixes
bisgaard-itis Sep 19, 2025
615891e
test fix
bisgaard-itis Sep 19, 2025
24ef2be
several fixes
bisgaard-itis Sep 19, 2025
7e94b08
improve validation from task_id
bisgaard-itis Sep 19, 2025
c25f45d
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 19, 2025
3f44899
cosmetic change
bisgaard-itis Sep 22, 2025
a85d544
minor change
bisgaard-itis Sep 22, 2025
ca9a457
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 22, 2025
dbaf0fd
add description to owner field
bisgaard-itis Sep 22, 2025
ff51528
@pcrespov @sanderegg use APP_NAME in doc string
bisgaard-itis Sep 22, 2025
41eafbf
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
61b9668
fix type issue
bisgaard-itis Sep 22, 2025
036ae36
rename TaskFilter -> OwnerMetadata
bisgaard-itis Sep 22, 2025
0e1a971
relative imports
bisgaard-itis Sep 22, 2025
4eaab57
@pcrespov move model
bisgaard-itis Sep 22, 2025
d48ed0d
remove model
bisgaard-itis Sep 22, 2025
80a28e1
task_metadata -> execution_metadata
bisgaard-itis Sep 22, 2025
b7fa0bc
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
1f96c61
job_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
7a1c396
name fix @GitHK
bisgaard-itis Sep 22, 2025
ef0b203
name change fixes
bisgaard-itis Sep 22, 2025
5cdaeef
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
3736b88
task_filter -> owner_metadata
bisgaard-itis Sep 22, 2025
4431084
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 22, 2025
4ba288c
ensure OwnerMetadata can be serialized
bisgaard-itis Sep 22, 2025
341e283
convert to OwnerMetadata baseclass before submitting rpc requests
bisgaard-itis Sep 22, 2025
419a23b
test fix
bisgaard-itis Sep 23, 2025
e442965
add extra fields in OwnerMetadata
bisgaard-itis Sep 23, 2025
9f2b002
fix test
bisgaard-itis Sep 23, 2025
cb114e6
further bug fixes
bisgaard-itis Sep 23, 2025
7dea526
add user_id input
bisgaard-itis Sep 23, 2025
2354efd
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
bisgaard-itis Sep 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions packages/celery-library/src/celery_library/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
from models_library.progress_bar import ProgressReport
from pydantic import ValidationError
from servicelib.celery.models import (
ExecutionMetadata,
OwnerMetadata,
Task,
TaskFilter,
TaskID,
TaskInfoStore,
TaskMetadata,
Wildcard,
)
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types

Expand All @@ -35,7 +34,7 @@ def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
async def create_task(
self,
task_id: TaskID,
task_metadata: TaskMetadata,
task_metadata: ExecutionMetadata,
expiry: timedelta,
) -> None:
task_key = _build_key(task_id)
Expand All @@ -51,7 +50,7 @@ async def create_task(
expiry,
)

async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
async def get_task_metadata(self, task_id: TaskID) -> ExecutionMetadata | None:
raw_result = await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hget(
_build_key(task_id), _CELERY_TASK_METADATA_KEY
Expand All @@ -61,7 +60,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
return None

try:
return TaskMetadata.model_validate_json(raw_result)
return ExecutionMetadata.model_validate_json(raw_result)
except ValidationError as exc:
_logger.debug(
"Failed to deserialize task metadata for task %s: %s", task_id, f"{exc}"
Expand All @@ -85,9 +84,9 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
)
return None

async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
async def list_tasks(self, task_filter: OwnerMetadata) -> list[Task]:
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
task_uuid=Wildcard()
task_uuid="*"
)

keys: list[str] = []
Expand All @@ -112,10 +111,10 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
continue

with contextlib.suppress(ValidationError):
task_metadata = TaskMetadata.model_validate_json(raw_metadata)
task_metadata = ExecutionMetadata.model_validate_json(raw_metadata)
tasks.append(
Task(
uuid=TaskFilter.get_task_uuid(key),
uuid=OwnerMetadata.get_task_uuid(key),
metadata=task_metadata,
)
)
Expand Down
32 changes: 13 additions & 19 deletions packages/celery-library/src/celery_library/rpc/_async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobFilter,
AsyncJobGet,
AsyncJobId,
AsyncJobResult,
Expand All @@ -17,7 +16,7 @@
JobNotDoneError,
JobSchedulerError,
)
from servicelib.celery.models import TaskFilter, TaskState
from servicelib.celery.models import OwnerMetadata, TaskState
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_utils import log_catch
from servicelib.rabbitmq import RPCRouter
Expand All @@ -34,14 +33,13 @@

@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
async def cancel(
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
):
assert task_manager # nosec
assert job_filter # nosec
task_filter = TaskFilter.model_validate(job_filter.model_dump())
assert owner_metadata # nosec
try:
await task_manager.cancel_task(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
except TaskNotFoundError as exc:
Expand All @@ -52,15 +50,14 @@ async def cancel(

@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
async def status(
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
) -> AsyncJobStatus:
assert task_manager # nosec
assert job_filter # nosec
assert owner_metadata # nosec

task_filter = TaskFilter.model_validate(job_filter.model_dump())
try:
task_status = await task_manager.get_task_status(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
except TaskNotFoundError as exc:
Expand All @@ -85,23 +82,21 @@ async def status(
)
)
async def result(
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata
) -> AsyncJobResult:
assert task_manager # nosec
assert job_id # nosec
assert job_filter # nosec

task_filter = TaskFilter.model_validate(job_filter.model_dump())
assert owner_metadata # nosec

try:
_status = await task_manager.get_task_status(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
if not _status.is_done:
raise JobNotDoneError(job_id=job_id)
_result = await task_manager.get_task_result(
task_filter=task_filter,
owner_metadata=owner_metadata,
task_uuid=job_id,
)
except TaskNotFoundError as exc:
Expand Down Expand Up @@ -134,13 +129,12 @@ async def result(

@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def list_jobs(
task_manager: TaskManager, job_filter: AsyncJobFilter
task_manager: TaskManager, owner_metadata: OwnerMetadata
) -> list[AsyncJobGet]:
assert task_manager # nosec
task_filter = TaskFilter.model_validate(job_filter.model_dump())
try:
tasks = await task_manager.list_tasks(
task_filter=task_filter,
owner_metadata=owner_metadata,
)
except CeleryError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc
Expand Down
48 changes: 25 additions & 23 deletions packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from models_library.progress_bar import ProgressReport
from servicelib.celery.models import (
TASK_DONE_STATES,
ExecutionMetadata,
OwnerMetadata,
Task,
TaskFilter,
TaskID,
TaskInfoStore,
TaskMetadata,
TaskState,
TaskStatus,
TaskUUID,
Expand All @@ -39,34 +39,34 @@ class CeleryTaskManager:

async def submit_task(
self,
task_metadata: TaskMetadata,
execution_metadata: ExecutionMetadata,
*,
task_filter: TaskFilter,
owner_metadata: OwnerMetadata,
**task_params,
) -> TaskUUID:
with log_context(
_logger,
logging.DEBUG,
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
msg=f"Submit {execution_metadata.name=}: {owner_metadata=} {task_params=}",
):
task_uuid = uuid4()
task_id = task_filter.create_task_id(task_uuid=task_uuid)
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)

expiry = (
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
if task_metadata.ephemeral
if execution_metadata.ephemeral
else self._celery_settings.CELERY_RESULT_EXPIRES
)

try:
await self._task_info_store.create_task(
task_id, task_metadata, expiry=expiry
task_id, execution_metadata, expiry=expiry
)
self._celery_app.send_task(
task_metadata.name,
execution_metadata.name,
task_id=task_id,
kwargs={"task_id": task_id} | task_params,
queue=task_metadata.queue.value,
queue=execution_metadata.queue.value,
)
except CeleryError as exc:
try:
Expand All @@ -78,20 +78,22 @@ async def submit_task(
exc_info=True,
)
raise TaskSubmissionError(
task_name=task_metadata.name,
task_name=execution_metadata.name,
task_id=task_id,
task_params=task_params,
) from exc

return task_uuid

async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:
async def cancel_task(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"task cancellation: {task_filter=} {task_uuid=}",
msg=f"task cancellation: {owner_metadata=} {task_uuid=}",
):
task_id = task_filter.create_task_id(task_uuid=task_uuid)
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
if not await self.task_exists(task_id):
raise TaskNotFoundError(task_id=task_id)

Expand All @@ -106,14 +108,14 @@ def _forget_task(self, task_id: TaskID) -> None:
self._celery_app.AsyncResult(task_id).forget()

async def get_task_result(
self, task_filter: TaskFilter, task_uuid: TaskUUID
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> Any:
with log_context(
_logger,
logging.DEBUG,
msg=f"Get task result: {task_filter=} {task_uuid=}",
msg=f"Get task result: {owner_metadata=} {task_uuid=}",
):
task_id = task_filter.create_task_id(task_uuid=task_uuid)
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
if not await self.task_exists(task_id):
raise TaskNotFoundError(task_id=task_id)

Expand Down Expand Up @@ -149,14 +151,14 @@ def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
return TaskState(self._celery_app.AsyncResult(task_id).state)

async def get_task_status(
self, task_filter: TaskFilter, task_uuid: TaskUUID
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> TaskStatus:
with log_context(
_logger,
logging.DEBUG,
msg=f"Getting task status: {task_filter=} {task_uuid=}",
msg=f"Getting task status: {owner_metadata=} {task_uuid=}",
):
task_id = task_filter.create_task_id(task_uuid=task_uuid)
task_id = owner_metadata.create_task_id(task_uuid=task_uuid)
if not await self.task_exists(task_id):
raise TaskNotFoundError(task_id=task_id)

Expand All @@ -169,13 +171,13 @@ async def get_task_status(
),
)

async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
with log_context(
_logger,
logging.DEBUG,
msg=f"Listing tasks: {task_filter=}",
msg=f"Listing tasks: {owner_metadata=}",
):
return await self._task_info_store.list_tasks(task_filter)
return await self._task_info_store.list_tasks(owner_metadata)

async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
await self._task_info_store.set_task_progress(
Expand Down
Loading
Loading