Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
9bc9c2e
add Celery to requirements
giancarloromeo Sep 30, 2025
1771429
add Celery settings
giancarloromeo Sep 30, 2025
4eea09e
add Celery to Redis client
giancarloromeo Sep 30, 2025
68dce90
add Celery plugin
giancarloromeo Sep 30, 2025
10a74ae
add event-stream support
giancarloromeo Sep 30, 2025
27909c3
fix long-running-tasks api
giancarloromeo Sep 30, 2025
56a8623
fix REST interface in tasks plugin
giancarloromeo Sep 30, 2025
482dc74
fix tasks endpoints
giancarloromeo Sep 30, 2025
c2b6ecb
fix exception
giancarloromeo Sep 30, 2025
c74830b
fix typecheck
giancarloromeo Sep 30, 2025
495a403
fix path op names
giancarloromeo Sep 30, 2025
abd4bd4
fix path op names
giancarloromeo Sep 30, 2025
3a7560b
fix openapi-spec
giancarloromeo Sep 30, 2025
d9617e7
fix async job path op
giancarloromeo Sep 30, 2025
09ed1b2
fix test
giancarloromeo Sep 30, 2025
5536db0
fix test
giancarloromeo Sep 30, 2025
758594c
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Sep 30, 2025
f1b8bc1
Merge remote-tracking branch 'upstream/master' into is8102/add-celery…
giancarloromeo Sep 30, 2025
4471da5
rename
giancarloromeo Sep 30, 2025
8ae38a0
Merge branch 'is8102/add-celery-task-manager-to-webserver' of github.…
giancarloromeo Sep 30, 2025
9b5c02b
fix property name
giancarloromeo Sep 30, 2025
7ef024a
fix mock
giancarloromeo Sep 30, 2025
2b2f64b
disable Celery in wb_auth tests
giancarloromeo Sep 30, 2025
19b1ed8
add async jobs stream
giancarloromeo Sep 30, 2025
71b3a21
change task key prefix
giancarloromeo Sep 30, 2025
80dba0a
fix mock
giancarloromeo Sep 30, 2025
d7bea56
disable Celery
giancarloromeo Sep 30, 2025
f81d9ca
move event_generator logic down to service
giancarloromeo Oct 1, 2025
5e46968
move sse to models
giancarloromeo Oct 1, 2025
5b9642f
remove unused
giancarloromeo Oct 1, 2025
821d1f0
add test
giancarloromeo Oct 1, 2025
f6676de
fix name
giancarloromeo Oct 1, 2025
6ac1607
move tasks tests
giancarloromeo Oct 1, 2025
73cec60
add tests
giancarloromeo Oct 1, 2025
1057281
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 1, 2025
2db3c93
revert
giancarloromeo Oct 1, 2025
ada49e3
fix fixture
giancarloromeo Oct 1, 2025
b4a5c95
typecheck
giancarloromeo Oct 1, 2025
f3bb46c
add cleanup
giancarloromeo Oct 1, 2025
3776869
add exception handling
giancarloromeo Oct 1, 2025
48c107b
reraise
giancarloromeo Oct 1, 2025
aba2a54
relative import
giancarloromeo Oct 2, 2025
97fa75b
add tests
giancarloromeo Oct 2, 2025
f1e2ce3
remove streaming
giancarloromeo Oct 2, 2025
a0a2319
fix test
giancarloromeo Oct 2, 2025
51fd3c2
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 2, 2025
7df82ae
fix mock
giancarloromeo Oct 2, 2025
adf70d7
Merge branch 'is8102/add-celery-task-manager-to-webserver' of github.…
giancarloromeo Oct 2, 2025
e62a523
typecheck
giancarloromeo Oct 2, 2025
ad5c6c1
revert
giancarloromeo Oct 2, 2025
259f5de
frozen
giancarloromeo Oct 2, 2025
59868c2
rename
giancarloromeo Oct 2, 2025
2a43ab2
fix
giancarloromeo Oct 3, 2025
c9f3e5e
Merge remote-tracking branch 'upstream/master' into is8102/add-celery…
giancarloromeo Oct 6, 2025
a2b27cc
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 7, 2025
38a83fb
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 7, 2025
010470f
use mock handler
giancarloromeo Oct 8, 2025
f41ce7e
fix
giancarloromeo Oct 8, 2025
eabc46e
refactor
giancarloromeo Oct 8, 2025
45b07b7
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 8, 2025
87607f2
fix mock
giancarloromeo Oct 8, 2025
5eb114b
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,24 @@
from servicelib.aiohttp.long_running_tasks._routes import _PathParam
from servicelib.long_running_tasks.models import TaskGet, TaskStatus
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.tasks._exception_handlers import (
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
from simcore_service_webserver.tasks._controller._rest_exceptions import (
_TO_HTTP_ERROR_MAP,
)

router = APIRouter(
prefix=f"/{API_VTAG}",
tags=[
"long-running-tasks",
],
responses={
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
},
)

_export_data_responses: dict[int | str, dict[str, Any]] = {
i.status_code: {"model": EnvelopedError}
for i in export_data_http_error_map.values()
}


@router.get(
"/tasks",
response_model=Envelope[list[TaskGet]],
responses=_export_data_responses,
)
def get_async_jobs():
"""Lists all long running tasks"""
Expand All @@ -41,17 +38,24 @@ def get_async_jobs():
@router.get(
"/tasks/{task_id}",
response_model=Envelope[TaskStatus],
responses=_export_data_responses,
)
def get_async_job_status(
_path_params: Annotated[_PathParam, Depends()],
):
"""Retrieves the status of a task"""


@router.get(
"/tasks/{task_id}/stream",
)
def get_async_job_stream(
_path_params: Annotated[_PathParam, Depends()],
):
"""Retrieves the stream of a task"""


@router.delete(
"/tasks/{task_id}",
responses=_export_data_responses,
status_code=status.HTTP_204_NO_CONTENT,
)
def cancel_async_job(
Expand All @@ -63,7 +67,6 @@ def cancel_async_job(
@router.get(
"/tasks/{task_id}/result",
response_model=Any,
responses=_export_data_responses,
)
def get_async_job_result(
_path_params: Annotated[_PathParam, Depends()],
Expand Down
17 changes: 6 additions & 11 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# pylint: disable=too-many-arguments


from typing import Annotated, Any, TypeAlias
from typing import Annotated, TypeAlias

from fastapi import APIRouter, Depends, Query, status
from models_library.api_schemas_long_running_tasks.tasks import (
Expand Down Expand Up @@ -35,8 +35,8 @@
from servicelib.fastapi.rest_pagination import CustomizedPathsCursorPage
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.storage.schemas import DatasetMetaData, FileMetaData
from simcore_service_webserver.tasks._exception_handlers import (
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
from simcore_service_webserver.tasks._controller._rest_exceptions import (
_TO_HTTP_ERROR_MAP,
)

router = APIRouter(
Expand Down Expand Up @@ -220,19 +220,14 @@ async def is_completed_upload_file(
"""Returns state of upload completion"""


# data export
_export_data_responses: dict[int | str, dict[str, Any]] = {
i.status_code: {"model": EnvelopedError}
for i in export_data_http_error_map.values()
}


@router.post(
"/storage/locations/{location_id}/export-data",
response_model=Envelope[TaskGet],
name="export_data",
description="Export data",
responses=_export_data_responses,
responses={
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
},
)
async def export_data(export_data: DataExportPost, location_id: LocationID):
"""Trigger data export. Returns async job id for getting status and results"""
84 changes: 76 additions & 8 deletions packages/celery-library/src/celery_library/backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,54 @@
import contextlib
import logging
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING, Final

from models_library.progress_bar import ProgressReport
from pydantic import ValidationError
from pydantic import TypeAdapter, ValidationError
from servicelib.celery.models import (
WILDCARD,
ExecutionMetadata,
OwnerMetadata,
Task,
TaskEvent,
TaskEventID,
TaskID,
TaskInfoStore,
TaskStatusEvent,
TaskStatusValue,
)
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types

_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
_CELERY_TASK_PREFIX: Final[str] = "celery-task-"
_CELERY_TASK_STREAM_PREFIX: Final[str] = "celery-task-stream-"
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"

_CELERY_TASK_STREAM_DEFAULT_ID: Final[str] = "0-0"
_CELERY_TASK_STREAM_BLOCK_TIMEOUT: Final[int] = 3 * 1000 # milliseconds
_CELERY_TASK_STREAM_COUNT: Final[int] = 10
_CELERY_TASK_STREAM_EXPIRE_DEFAULT: Final[timedelta] = timedelta(minutes=5)
_CELERY_TASK_STREAM_MAXLEN: Final[int] = 100_000


_logger = logging.getLogger(__name__)


def _build_key(task_id: TaskID) -> str:
return _CELERY_TASK_INFO_PREFIX + task_id
return _CELERY_TASK_PREFIX + task_id


def _build_stream_key(task_id: TaskID) -> str:
return _CELERY_TASK_STREAM_PREFIX + task_id


class RedisTaskInfoStore:
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
self._redis_client_sdk = redis_client_sdk
@dataclass
class RedisTaskStore:
_redis_client_sdk: RedisClientSDK

async def create_task(
self,
Expand All @@ -51,6 +69,22 @@ async def create_task(
expiry,
)

if execution_metadata.streamed_result:
stream_key = _build_stream_key(task_id)
await self._redis_client_sdk.redis.xadd(
stream_key,
{
"event": TaskStatusEvent(
data=TaskStatusValue.CREATED
).model_dump_json()
},
maxlen=_CELERY_TASK_STREAM_MAXLEN,
approximate=True,
)
await self._redis_client_sdk.redis.expire(
stream_key, _CELERY_TASK_STREAM_EXPIRE_DEFAULT
)

async def get_task_metadata(self, task_id: TaskID) -> ExecutionMetadata | None:
raw_result = await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hget(
Expand Down Expand Up @@ -86,7 +120,7 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
return None

async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
search_key = _CELERY_TASK_INFO_PREFIX + owner_metadata.model_dump_task_id(
search_key = _CELERY_TASK_PREFIX + owner_metadata.model_dump_task_id(
task_uuid=WILDCARD
)

Expand Down Expand Up @@ -124,6 +158,7 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:

async def remove_task(self, task_id: TaskID) -> None:
await self._redis_client_sdk.redis.delete(_build_key(task_id))
await self._redis_client_sdk.redis.delete(_build_stream_key(task_id))

async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
await handle_redis_returns_union_types(
Expand All @@ -139,6 +174,39 @@ async def task_exists(self, task_id: TaskID) -> bool:
assert isinstance(n, int) # nosec
return n > 0

async def publish_task_event(self, task_id: TaskID, event: TaskEvent) -> None:
stream_key = _build_stream_key(task_id)
await self._redis_client_sdk.redis.xadd(
stream_key,
{"event": event.model_dump_json()},
)

async def consume_task_events(
self, task_id: TaskID, last_id: str | None = None
) -> AsyncIterator[tuple[TaskEventID, TaskEvent]]:
stream_key = _build_stream_key(task_id)
while True:
messages = await self._redis_client_sdk.redis.xread(
{stream_key: last_id or _CELERY_TASK_STREAM_DEFAULT_ID},
block=_CELERY_TASK_STREAM_BLOCK_TIMEOUT,
count=_CELERY_TASK_STREAM_COUNT,
)
if not messages:
continue
for _, events in messages:
for msg_id, data in events:
raw_event = data.get("event")
if raw_event is None:
continue

try:
event: TaskEvent = TypeAdapter(TaskEvent).validate_json(
raw_event
)
yield msg_id, event
except ValidationError:
continue


if TYPE_CHECKING:
_: type[TaskInfoStore] = RedisTaskInfoStore
_: type[TaskInfoStore] = RedisTaskStore
30 changes: 30 additions & 0 deletions packages/celery-library/src/celery_library/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import base64
import pickle
from functools import wraps
from inspect import isasyncgenfunction

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from common_library.errors_classes import OsparcErrorMixin


Expand Down Expand Up @@ -34,3 +37,30 @@ class TaskSubmissionError(OsparcErrorMixin, Exception):

class TaskNotFoundError(OsparcErrorMixin, Exception):
msg_template = "Task with id '{task_id}' was not found"


class TaskManagerError(OsparcErrorMixin, Exception):
msg_template = "An internal error occurred"


def handle_celery_errors(func):
if isasyncgenfunction(func):

@wraps(func)
async def async_generator_wrapper(*args, **kwargs):
try:
async for item in func(*args, **kwargs):
yield item
except CeleryError as exc:
raise TaskManagerError from exc

return async_generator_wrapper

@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except CeleryError as exc:
raise TaskManagerError from exc

return wrapper
28 changes: 27 additions & 1 deletion packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from uuid import uuid4
Expand All @@ -12,6 +13,8 @@
ExecutionMetadata,
OwnerMetadata,
Task,
TaskEvent,
TaskEventID,
TaskID,
TaskInfoStore,
TaskState,
Expand All @@ -22,7 +25,7 @@
from servicelib.logging_utils import log_context
from settings_library.celery import CelerySettings

from .errors import TaskNotFoundError, TaskSubmissionError
from .errors import TaskNotFoundError, TaskSubmissionError, handle_celery_errors

_logger = logging.getLogger(__name__)

Expand All @@ -37,6 +40,7 @@ class CeleryTaskManager:
_celery_settings: CelerySettings
_task_info_store: TaskInfoStore

@handle_celery_errors
async def submit_task(
self,
execution_metadata: ExecutionMetadata,
Expand Down Expand Up @@ -85,6 +89,7 @@ async def submit_task(

return task_uuid

@handle_celery_errors
async def cancel_task(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> None:
Expand All @@ -107,6 +112,7 @@ async def task_exists(self, task_id: TaskID) -> bool:
def _forget_task(self, task_id: TaskID) -> None:
self._celery_app.AsyncResult(task_id).forget()

@handle_celery_errors
async def get_task_result(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> Any:
Expand Down Expand Up @@ -150,6 +156,7 @@ async def _get_task_progress_report(
def _get_task_celery_state(self, task_id: TaskID) -> TaskState:
return TaskState(self._celery_app.AsyncResult(task_id).state)

@handle_celery_errors
async def get_task_status(
self, owner_metadata: OwnerMetadata, task_uuid: TaskUUID
) -> TaskStatus:
Expand All @@ -171,6 +178,7 @@ async def get_task_status(
),
)

@handle_celery_errors
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
with log_context(
_logger,
Expand All @@ -179,12 +187,30 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
):
return await self._task_info_store.list_tasks(owner_metadata)

@handle_celery_errors
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
await self._task_info_store.set_task_progress(
task_id=task_id,
report=report,
)

@handle_celery_errors
async def publish_task_event(self, task_id: TaskID, event: TaskEvent) -> None:
await self._task_info_store.publish_task_event(task_id, event)

@handle_celery_errors
async def consume_task_events(
self,
owner_metadata: OwnerMetadata,
task_uuid: TaskUUID,
last_id: str | None = None,
) -> AsyncIterator[tuple[TaskEventID, TaskEvent]]:
task_id = owner_metadata.model_dump_task_id(task_uuid=task_uuid)
async for event in self._task_info_store.consume_task_events(
task_id=task_id, last_id=last_id
):
yield event


if TYPE_CHECKING:
_: type[TaskManager] = CeleryTaskManager
Loading
Loading