Skip to content
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
9bc9c2e
add Celery to requirements
giancarloromeo Sep 30, 2025
1771429
add Celery settings
giancarloromeo Sep 30, 2025
4eea09e
add Celery to Redis client
giancarloromeo Sep 30, 2025
68dce90
add Celery plugin
giancarloromeo Sep 30, 2025
10a74ae
add event-stream support
giancarloromeo Sep 30, 2025
27909c3
fix long-running-tasks api
giancarloromeo Sep 30, 2025
56a8623
fix REST interface in tasks plugin
giancarloromeo Sep 30, 2025
482dc74
fix tasks endpoints
giancarloromeo Sep 30, 2025
c2b6ecb
fix exception
giancarloromeo Sep 30, 2025
c74830b
fix typecheck
giancarloromeo Sep 30, 2025
495a403
fix path op names
giancarloromeo Sep 30, 2025
abd4bd4
fix path op names
giancarloromeo Sep 30, 2025
3a7560b
fix openapi-spec
giancarloromeo Sep 30, 2025
d9617e7
fix async job path op
giancarloromeo Sep 30, 2025
09ed1b2
fix test
giancarloromeo Sep 30, 2025
5536db0
fix test
giancarloromeo Sep 30, 2025
758594c
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Sep 30, 2025
f1b8bc1
Merge remote-tracking branch 'upstream/master' into is8102/add-celery…
giancarloromeo Sep 30, 2025
4471da5
rename
giancarloromeo Sep 30, 2025
8ae38a0
Merge branch 'is8102/add-celery-task-manager-to-webserver' of github.…
giancarloromeo Sep 30, 2025
9b5c02b
fix property name
giancarloromeo Sep 30, 2025
7ef024a
fix mock
giancarloromeo Sep 30, 2025
2b2f64b
disable Celery in wb_auth tests
giancarloromeo Sep 30, 2025
19b1ed8
add async jobs stream
giancarloromeo Sep 30, 2025
71b3a21
change task key prefix
giancarloromeo Sep 30, 2025
80dba0a
fix mock
giancarloromeo Sep 30, 2025
d7bea56
disable Celery
giancarloromeo Sep 30, 2025
f81d9ca
move event_generator logic down to service
giancarloromeo Oct 1, 2025
5e46968
move sse to models
giancarloromeo Oct 1, 2025
5b9642f
remove unused
giancarloromeo Oct 1, 2025
821d1f0
add test
giancarloromeo Oct 1, 2025
f6676de
fix name
giancarloromeo Oct 1, 2025
6ac1607
move tasks tests
giancarloromeo Oct 1, 2025
73cec60
add tests
giancarloromeo Oct 1, 2025
1057281
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 1, 2025
2db3c93
revert
giancarloromeo Oct 1, 2025
ada49e3
fix fixture
giancarloromeo Oct 1, 2025
b4a5c95
typecheck
giancarloromeo Oct 1, 2025
f3bb46c
add cleanup
giancarloromeo Oct 1, 2025
3776869
add exception handling
giancarloromeo Oct 1, 2025
48c107b
reraise
giancarloromeo Oct 1, 2025
aba2a54
relative import
giancarloromeo Oct 2, 2025
97fa75b
add tests
giancarloromeo Oct 2, 2025
f1e2ce3
remove streaming
giancarloromeo Oct 2, 2025
a0a2319
fix test
giancarloromeo Oct 2, 2025
51fd3c2
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 2, 2025
7df82ae
fix mock
giancarloromeo Oct 2, 2025
adf70d7
Merge branch 'is8102/add-celery-task-manager-to-webserver' of github.…
giancarloromeo Oct 2, 2025
e62a523
typecheck
giancarloromeo Oct 2, 2025
ad5c6c1
revert
giancarloromeo Oct 2, 2025
259f5de
frozen
giancarloromeo Oct 2, 2025
59868c2
rename
giancarloromeo Oct 2, 2025
2a43ab2
fix
giancarloromeo Oct 3, 2025
c9f3e5e
Merge remote-tracking branch 'upstream/master' into is8102/add-celery…
giancarloromeo Oct 6, 2025
a2b27cc
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 7, 2025
38a83fb
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 7, 2025
010470f
use mock handler
giancarloromeo Oct 8, 2025
f41ce7e
fix
giancarloromeo Oct 8, 2025
eabc46e
refactor
giancarloromeo Oct 8, 2025
45b07b7
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 8, 2025
87607f2
fix mock
giancarloromeo Oct 8, 2025
5eb114b
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,24 @@
from servicelib.aiohttp.long_running_tasks._routes import _PathParam
from servicelib.long_running_tasks.models import TaskGet, TaskStatus
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.tasks._exception_handlers import (
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
from simcore_service_webserver.tasks._controller._rest_exceptions import (
_TO_HTTP_ERROR_MAP,
)

router = APIRouter(
prefix=f"/{API_VTAG}",
tags=[
"long-running-tasks",
],
responses={
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
},
)

_export_data_responses: dict[int | str, dict[str, Any]] = {
i.status_code: {"model": EnvelopedError}
for i in export_data_http_error_map.values()
}


@router.get(
"/tasks",
response_model=Envelope[list[TaskGet]],
responses=_export_data_responses,
)
def get_async_jobs():
"""Lists all long running tasks"""
Expand All @@ -41,7 +38,6 @@ def get_async_jobs():
@router.get(
"/tasks/{task_id}",
response_model=Envelope[TaskStatus],
responses=_export_data_responses,
)
def get_async_job_status(
_path_params: Annotated[_PathParam, Depends()],
Expand All @@ -51,7 +47,6 @@ def get_async_job_status(

@router.delete(
"/tasks/{task_id}",
responses=_export_data_responses,
status_code=status.HTTP_204_NO_CONTENT,
)
def cancel_async_job(
Expand All @@ -63,7 +58,6 @@ def cancel_async_job(
@router.get(
"/tasks/{task_id}/result",
response_model=Any,
responses=_export_data_responses,
)
def get_async_job_result(
_path_params: Annotated[_PathParam, Depends()],
Expand Down
17 changes: 6 additions & 11 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# pylint: disable=too-many-arguments


from typing import Annotated, Any, TypeAlias
from typing import Annotated, TypeAlias

from fastapi import APIRouter, Depends, Query, status
from models_library.api_schemas_long_running_tasks.tasks import (
Expand Down Expand Up @@ -35,8 +35,8 @@
from servicelib.fastapi.rest_pagination import CustomizedPathsCursorPage
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.storage.schemas import DatasetMetaData, FileMetaData
from simcore_service_webserver.tasks._exception_handlers import (
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
from simcore_service_webserver.tasks._controller._rest_exceptions import (
_TO_HTTP_ERROR_MAP,
)

router = APIRouter(
Expand Down Expand Up @@ -220,19 +220,14 @@ async def is_completed_upload_file(
"""Returns state of upload completion"""


# data export
_export_data_responses: dict[int | str, dict[str, Any]] = {
i.status_code: {"model": EnvelopedError}
for i in export_data_http_error_map.values()
}


@router.post(
"/storage/locations/{location_id}/export-data",
response_model=Envelope[TaskGet],
name="export_data",
description="Export data",
responses=_export_data_responses,
responses={
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
},
)
async def export_data(export_data: DataExportPost, location_id: LocationID):
"""Trigger data export. Returns async job id for getting status and results"""
18 changes: 10 additions & 8 deletions packages/celery-library/src/celery_library/backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import logging
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING, Final

Expand All @@ -11,26 +12,27 @@
OwnerMetadata,
Task,
TaskID,
TaskInfoStore,
TaskStore,
)
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types

_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
_CELERY_TASK_PREFIX: Final[str] = "celery-task-"
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"


_logger = logging.getLogger(__name__)


def _build_key(task_id: TaskID) -> str:
return _CELERY_TASK_INFO_PREFIX + task_id
return _CELERY_TASK_PREFIX + task_id


class RedisTaskInfoStore:
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
self._redis_client_sdk = redis_client_sdk
@dataclass(frozen=True)
class RedisTaskStore:
_redis_client_sdk: RedisClientSDK

async def create_task(
self,
Expand Down Expand Up @@ -86,7 +88,7 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
return None

async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
search_key = _CELERY_TASK_INFO_PREFIX + owner_metadata.model_dump_task_id(
search_key = _CELERY_TASK_PREFIX + owner_metadata.model_dump_task_id(
task_uuid=WILDCARD
)

Expand Down Expand Up @@ -141,4 +143,4 @@ async def task_exists(self, task_id: TaskID) -> bool:


if TYPE_CHECKING:
_: type[TaskInfoStore] = RedisTaskInfoStore
_: type[TaskStore] = RedisTaskStore
17 changes: 17 additions & 0 deletions packages/celery-library/src/celery_library/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import base64
import pickle
from functools import wraps

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from common_library.errors_classes import OsparcErrorMixin


Expand Down Expand Up @@ -34,3 +36,18 @@ class TaskSubmissionError(OsparcErrorMixin, Exception):

class TaskNotFoundError(OsparcErrorMixin, Exception):
msg_template = "Task with id '{task_id}' was not found"


class TaskManagerError(OsparcErrorMixin, Exception):
msg_template = "An internal error occurred"


def handle_celery_errors(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except CeleryError as exc:
raise TaskManagerError from exc

return wrapper
12 changes: 9 additions & 3 deletions packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
OwnerMetadata,
Task,
TaskID,
TaskInfoStore,
TaskState,
TaskStatus,
TaskStore,
TaskUUID,
)
from servicelib.celery.task_manager import TaskManager
from servicelib.logging_utils import log_context
from settings_library.celery import CelerySettings

from .errors import TaskNotFoundError, TaskSubmissionError
from .errors import TaskNotFoundError, TaskSubmissionError, handle_celery_errors

_logger = logging.getLogger(__name__)

Expand All @@ -35,8 +35,9 @@
class CeleryTaskManager:
_celery_app: Celery
_celery_settings: CelerySettings
_task_info_store: TaskInfoStore
_task_info_store: TaskStore

@handle_celery_errors
async def submit_task(
self,
execution_metadata: ExecutionMetadata,
Expand Down Expand Up @@ -85,6 +86,7 @@ async def submit_task(

return task_uuid

@handle_celery_errors
async def cancel_task(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> None:
Expand All @@ -107,6 +109,7 @@ async def task_exists(self, task_id: TaskID) -> bool:
def _forget_task(self, task_id: TaskID) -> None:
self._celery_app.AsyncResult(task_id).forget()

@handle_celery_errors
async def get_task_result(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> Any:
Expand Down Expand Up @@ -150,6 +153,7 @@ async def _get_task_progress_report(
def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
return TaskState(self._celery_app.AsyncResult(task_id).state)

@handle_celery_errors
async def get_task_status(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> TaskStatus:
Expand All @@ -171,6 +175,7 @@ async def get_task_status(
),
)

@handle_celery_errors
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
with log_context(
_logger,
Expand All @@ -179,6 +184,7 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
):
return await self._task_info_store.list_tasks(owner_metadata)

@handle_celery_errors
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
await self._task_info_store.set_task_progress(
task_id=task_id,
Expand Down
10 changes: 5 additions & 5 deletions packages/celery-library/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from celery.signals import worker_init, worker_shutdown
from celery.worker.worker import WorkController
from celery_library.backends.redis import RedisTaskInfoStore
from celery_library.backends.redis import RedisTaskStore
from celery_library.signals import on_worker_init, on_worker_shutdown
from celery_library.task_manager import CeleryTaskManager
from celery_library.types import register_celery_types
Expand Down Expand Up @@ -66,7 +66,7 @@ async def run_until_shutdown(
self._task_manager = CeleryTaskManager(
self._app,
self._settings,
RedisTaskInfoStore(redis_client_sdk),
RedisTaskStore(redis_client_sdk),
)

startup_completed_event.set()
Expand Down Expand Up @@ -156,11 +156,11 @@ async def mock_celery_app(celery_config: dict[str, Any]) -> Celery:


@pytest.fixture
async def celery_task_manager(
async def task_manager(
mock_celery_app: Celery,
celery_settings: CelerySettings,
use_in_memory_redis: RedisSettings,
) -> AsyncIterator[CeleryTaskManager]:
) -> AsyncIterator[TaskManager]:
register_celery_types()

try:
Expand All @@ -173,7 +173,7 @@ async def celery_task_manager(
yield CeleryTaskManager(
mock_celery_app,
celery_settings,
RedisTaskInfoStore(redis_client_sdk),
RedisTaskStore(redis_client_sdk),
)
finally:
await redis_client_sdk.shutdown()
6 changes: 3 additions & 3 deletions packages/celery-library/tests/unit/test_async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ async def async_job(task: Task, task_id: TaskID, action: Action, payload: Any) -

@pytest.fixture
async def register_rpc_routes(
async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, celery_task_manager: TaskManager
async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, task_manager: TaskManager
) -> None:
await async_jobs_rabbitmq_rpc_client.register_router(
_async_jobs.router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=celery_task_manager
_async_jobs.router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=task_manager
)
await async_jobs_rabbitmq_rpc_client.register_router(
router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=celery_task_manager
router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=task_manager
)


Expand Down
Loading
Loading