Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d8e9e10
introduce TaslkFilterBase model
bisgaard-itis Jul 8, 2025
94bb1ae
use base model for job filter
bisgaard-itis Jul 8, 2025
58c8ac1
further corrections
bisgaard-itis Jul 8, 2025
92115e2
add client name in api-server
bisgaard-itis Jul 8, 2025
ac9cf2b
fixes in storage
bisgaard-itis Jul 8, 2025
564e0e1
fixes in webserver
bisgaard-itis Jul 8, 2025
34c5f3f
handle storage client
bisgaard-itis Jul 8, 2025
b488f9c
minor fix
bisgaard-itis Jul 8, 2025
2eb1918
cleanup
bisgaard-itis Jul 8, 2025
c4c4fe8
cleanup
bisgaard-itis Jul 8, 2025
1fbd307
cleanup celery rpc interface
bisgaard-itis Jul 8, 2025
9a461d5
further refinements
bisgaard-itis Jul 8, 2025
8a3623f
fixes in storage
bisgaard-itis Jul 8, 2025
d723594
cleanup in storage
bisgaard-itis Jul 8, 2025
a9e2742
small cleanup
bisgaard-itis Jul 8, 2025
9d6cfc6
add missing return statement
bisgaard-itis Jul 8, 2025
820ea99
fix
bisgaard-itis Jul 8, 2025
ebb9ee6
test fixes
bisgaard-itis Jul 8, 2025
8c6e407
fix
bisgaard-itis Jul 8, 2025
eebad5d
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
bisgaard-itis Jul 9, 2025
4d8511a
@GitHK _dict -> filter_dict
bisgaard-itis Jul 9, 2025
4492c2e
@GitHK use Self
bisgaard-itis Jul 9, 2025
8ba1080
use Final[str]
bisgaard-itis Jul 9, 2025
a80e293
improve function naming
bisgaard-itis Jul 9, 2025
55cfa00
decouple models @sanderegg
bisgaard-itis Jul 9, 2025
96573d7
fix test
bisgaard-itis Jul 9, 2025
96a29e8
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
bisgaard-itis Jul 9, 2025
06d0f1a
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
bisgaard-itis Jul 9, 2025
4cf59e8
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
bisgaard-itis Jul 10, 2025
d3af22b
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
bisgaard-itis Jul 10, 2025
ef33a50
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
giancarloromeo Jul 10, 2025
ceba30e
Merge branch 'master' into 8071-introduce-task-filter-class-in-celery
bisgaard-itis Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import ValidationError
from servicelib.celery.models import (
Task,
TaskContext,
TaskFilter,
TaskID,
TaskMetadata,
TaskUUID,
Expand Down Expand Up @@ -82,10 +82,10 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
)
return None

async def list_tasks(self, task_context: TaskContext) -> list[Task]:
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
search_key = (
_CELERY_TASK_INFO_PREFIX
+ build_task_id_prefix(task_context)
+ build_task_id_prefix(task_filter)
+ _CELERY_TASK_ID_KEY_SEPARATOR
)
search_key_len = len(search_key)
Expand Down
33 changes: 19 additions & 14 deletions packages/celery-library/src/celery_library/rpc/_async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobFilter,
AsyncJobGet,
AsyncJobId,
AsyncJobNameData,
AsyncJobResult,
AsyncJobStatus,
)
Expand All @@ -16,7 +16,7 @@
JobNotDoneError,
JobSchedulerError,
)
from servicelib.celery.models import TaskState
from servicelib.celery.models import TaskFilter, TaskState
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_utils import log_catch
from servicelib.rabbitmq import RPCRouter
Expand All @@ -32,13 +32,14 @@

@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def cancel(
task_manager: TaskManager, job_id: AsyncJobId, job_id_data: AsyncJobNameData
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
):
assert task_manager # nosec
assert job_id_data # nosec
assert job_filter # nosec
task_filter = TaskFilter.from_async_job_filter(job_filter)
try:
await task_manager.cancel_task(
task_context=job_id_data.model_dump(),
task_filter=task_filter,
task_uuid=job_id,
)
except CeleryError as exc:
Expand All @@ -47,14 +48,15 @@ async def cancel(

@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def status(
task_manager: TaskManager, job_id: AsyncJobId, job_id_data: AsyncJobNameData
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
) -> AsyncJobStatus:
assert task_manager # nosec
assert job_id_data # nosec
assert job_filter # nosec

task_filter = TaskFilter.from_async_job_filter(job_filter)
try:
task_status = await task_manager.get_task_status(
task_context=job_id_data.model_dump(),
task_filter=task_filter,
task_uuid=job_id,
)
except CeleryError as exc:
Expand All @@ -76,21 +78,23 @@ async def status(
)
)
async def result(
task_manager: TaskManager, job_id: AsyncJobId, job_id_data: AsyncJobNameData
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
) -> AsyncJobResult:
assert task_manager # nosec
assert job_id # nosec
assert job_id_data # nosec
assert job_filter # nosec

task_filter = TaskFilter.from_async_job_filter(job_filter)

try:
_status = await task_manager.get_task_status(
task_context=job_id_data.model_dump(),
task_filter=task_filter,
task_uuid=job_id,
)
if not _status.is_done:
raise JobNotDoneError(job_id=job_id)
_result = await task_manager.get_task_result(
task_context=job_id_data.model_dump(),
task_filter=task_filter,
task_uuid=job_id,
)
except CeleryError as exc:
Expand Down Expand Up @@ -123,13 +127,14 @@ async def result(

@router.expose(reraise_if_error_type=(JobSchedulerError,))
async def list_jobs(
task_manager: TaskManager, filter_: str, job_id_data: AsyncJobNameData
task_manager: TaskManager, filter_: str, job_filter: AsyncJobFilter
) -> list[AsyncJobGet]:
_ = filter_
assert task_manager # nosec
task_filter = TaskFilter.from_async_job_filter(job_filter)
try:
tasks = await task_manager.list_tasks(
task_context=job_id_data.model_dump(),
task_filter=task_filter,
)
except CeleryError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc
Expand Down
47 changes: 26 additions & 21 deletions packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from typing import Any
from typing import TYPE_CHECKING, Any
from uuid import uuid4

from celery import Celery # type: ignore[import-untyped]
Expand All @@ -11,14 +11,15 @@
from models_library.progress_bar import ProgressReport
from servicelib.celery.models import (
Task,
TaskContext,
TaskFilter,
TaskID,
TaskInfoStore,
TaskMetadata,
TaskState,
TaskStatus,
TaskUUID,
)
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_utils import log_context
from settings_library.celery import CelerySettings

Expand All @@ -41,16 +42,16 @@ async def submit_task(
self,
task_metadata: TaskMetadata,
*,
task_context: TaskContext,
task_filter: TaskFilter,
**task_params,
) -> TaskUUID:
with log_context(
_logger,
logging.DEBUG,
msg=f"Submit {task_metadata.name=}: {task_context=} {task_params=}",
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
):
task_uuid = uuid4()
task_id = build_task_id(task_context, task_uuid)
task_id = build_task_id(task_filter, task_uuid)
self._celery_app.send_task(
task_metadata.name,
task_id=task_id,
Expand All @@ -72,14 +73,14 @@ async def submit_task(
def _abort_task(self, task_id: TaskID) -> None:
AbortableAsyncResult(task_id, app=self._celery_app).abort()

async def cancel_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None:
async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"task cancellation: {task_context=} {task_uuid=}",
msg=f"task cancellation: {task_filter=} {task_uuid=}",
):
task_id = build_task_id(task_context, task_uuid)
if not (await self.get_task_status(task_context, task_uuid)).is_done:
task_id = build_task_id(task_filter, task_uuid)
if not (await self.get_task_status(task_filter, task_uuid)).is_done:
await self._abort_task(task_id)
await self._task_info_store.remove_task(task_id)

Expand All @@ -88,14 +89,14 @@ def _forget_task(self, task_id: TaskID) -> None:
AbortableAsyncResult(task_id, app=self._celery_app).forget()

async def get_task_result(
self, task_context: TaskContext, task_uuid: TaskUUID
self, task_filter: TaskFilter, task_uuid: TaskUUID
) -> Any:
with log_context(
_logger,
logging.DEBUG,
msg=f"Get task result: {task_context=} {task_uuid=}",
msg=f"Get task result: {task_filter=} {task_uuid=}",
):
task_id = build_task_id(task_context, task_uuid)
task_id = build_task_id(task_filter, task_uuid)
async_result = self._celery_app.AsyncResult(task_id)
result = async_result.result
if async_result.ready():
Expand All @@ -106,10 +107,10 @@ async def get_task_result(
return result

async def _get_task_progress_report(
self, task_context: TaskContext, task_uuid: TaskUUID, task_state: TaskState
self, task_filter: TaskFilter, task_uuid: TaskUUID, task_state: TaskState
) -> ProgressReport:
if task_state in (TaskState.STARTED, TaskState.RETRY, TaskState.ABORTED):
task_id = build_task_id(task_context, task_uuid)
task_id = build_task_id(task_filter, task_uuid)
progress = await self._task_info_store.get_task_progress(task_id)
if progress is not None:
return progress
Expand All @@ -131,33 +132,37 @@ def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
return TaskState(self._celery_app.AsyncResult(task_id).state)

async def get_task_status(
self, task_context: TaskContext, task_uuid: TaskUUID
self, task_filter: TaskFilter, task_uuid: TaskUUID
) -> TaskStatus:
with log_context(
_logger,
logging.DEBUG,
msg=f"Getting task status: {task_context=} {task_uuid=}",
msg=f"Getting task status: {task_filter=} {task_uuid=}",
):
task_id = build_task_id(task_context, task_uuid)
task_id = build_task_id(task_filter, task_uuid)
task_state = await self._get_task_celery_state(task_id)
return TaskStatus(
task_uuid=task_uuid,
task_state=task_state,
progress_report=await self._get_task_progress_report(
task_context, task_uuid, task_state
task_filter, task_uuid, task_state
),
)

async def list_tasks(self, task_context: TaskContext) -> list[Task]:
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
with log_context(
_logger,
logging.DEBUG,
msg=f"Listing tasks: {task_context=}",
msg=f"Listing tasks: {task_filter=}",
):
return await self._task_info_store.list_tasks(task_context)
return await self._task_info_store.list_tasks(task_filter)

async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
await self._task_info_store.set_task_progress(
task_id=task_id,
report=report,
)


if TYPE_CHECKING:
_: type[TaskManager] = CeleryTaskManager
11 changes: 6 additions & 5 deletions packages/celery-library/src/celery_library/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@

from celery import Celery # type: ignore[import-untyped]
from servicelib.celery.app_server import BaseAppServer
from servicelib.celery.models import TaskContext, TaskID, TaskUUID
from servicelib.celery.models import TaskFilter, TaskID, TaskUUID

_APP_SERVER_KEY = "app_server"

_TASK_ID_KEY_DELIMITATOR: Final[str] = ":"


def build_task_id_prefix(task_context: TaskContext) -> str:
def build_task_id_prefix(task_filter: TaskFilter) -> str:
filter_dict = task_filter.model_dump()
return _TASK_ID_KEY_DELIMITATOR.join(
[f"{task_context[key]}" for key in sorted(task_context)]
[f"{filter_dict[key]}" for key in sorted(filter_dict)]
)


def build_task_id(task_context: TaskContext, task_uuid: TaskUUID) -> TaskID:
def build_task_id(task_filter: TaskFilter, task_uuid: TaskUUID) -> TaskID:
return _TASK_ID_KEY_DELIMITATOR.join(
[build_task_id_prefix(task_context), f"{task_uuid}"]
[build_task_id_prefix(task_filter), f"{task_uuid}"]
)


Expand Down
Loading
Loading