From bcb0a1da1dd0bd9cb2631b4d0307c6bad2cb0ee6 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 14 Mar 2025 14:41:39 +0100 Subject: [PATCH 1/4] fix task name --- .../storage/src/simcore_service_storage/api/rpc/_data_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py index 424fbc2f0d0d..aab9d7339f62 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py @@ -52,7 +52,7 @@ async def start_data_export( ) from err task_uuid = await get_celery_client(app).send_task( - "export_data_with_error", + "export_data", task_context=job_id_data.model_dump(), files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature ) From 933e2462469a861d1dfe234ba48cccfe2975a8eb Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 14 Mar 2025 14:42:51 +0100 Subject: [PATCH 2/4] remove registered from inspect --- .../modules/celery/client.py | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 300aadb66b2c..3640d667c6e5 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -3,8 +3,10 @@ from typing import Any, Final from uuid import uuid4 -from celery import Celery # type: ignore[import-untyped] -from celery.contrib.abortable import AbortableAsyncResult # type: ignore[import-untyped] +from celery import Celery # type: ignore[import-untyped] +from celery.contrib.abortable import ( + AbortableAsyncResult, # type: ignore[import-untyped] +) from common_library.async_tools import make_async from models_library.progress_bar import ProgressReport from pydantic import ValidationError @@ -16,7 +18,6 @@ _CELERY_INSPECT_TASK_STATUSES: Final[tuple[str, ...]] = ( "active", - "registered", "scheduled", "revoked", ) @@ -47,7 +48,9 @@ def _build_task_id_prefix(task_context: TaskContext) -> str: def _build_task_id(task_context: TaskContext, task_uuid: TaskUUID) -> TaskID: - return _CELERY_TASK_ID_KEY_SEPARATOR.join([_build_task_id_prefix(task_context), f"{task_uuid}"]) + return _CELERY_TASK_ID_KEY_SEPARATOR.join( + [_build_task_id_prefix(task_context), f"{task_uuid}"] + ) class CeleryTaskQueueClient: @@ -114,30 +117,35 @@ def get_task_status( ) def _get_completed_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]: - search_key = ( - _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context) - ) + search_key = _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context) redis = self._celery_app.backend.client if hasattr(redis, "keys") and (keys := redis.keys(search_key + "*")): return { - TaskUUID(f"{key.decode(_CELERY_TASK_ID_KEY_ENCODING).removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR)}") + TaskUUID( + f"{key.decode(_CELERY_TASK_ID_KEY_ENCODING).removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR)}" + ) for key in keys } return set() @make_async() def get_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]: - all_task_ids = self._get_completed_task_uuids(task_context) + task_uuids = self._get_completed_task_uuids(task_context) - search_key = ( - _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context) - ) + task_id_prefix = _build_task_id_prefix(task_context) + inspect = self._celery_app.control.inspect() for task_inspect_status in _CELERY_INSPECT_TASK_STATUSES: - if task_ids := getattr( - self._celery_app.control.inspect(), task_inspect_status - )(): - for values in task_ids.values(): - for value in values: - all_task_ids.add(TaskUUID(value.removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR))) - - return all_task_ids + tasks = getattr(inspect, task_inspect_status)() or {} + + task_uuids.update( + TaskUUID( + task_info["id"].removeprefix( + task_id_prefix + _CELERY_TASK_ID_KEY_SEPARATOR + ) + ) + for tasks_per_worker in tasks.values() + for task_info in tasks_per_worker + if "id" in task_info + ) + + return task_uuids From 8548c229d371bcfc7c5eaa3d9e854a6291255906 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 14 Mar 2025 14:43:11 +0100 Subject: [PATCH 3/4] add configurations --- .../modules/celery/_common.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/_common.py b/services/storage/src/simcore_service_storage/modules/celery/_common.py index ae5979d4f1fb..52bb638772af 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/_common.py +++ b/services/storage/src/simcore_service_storage/modules/celery/_common.py @@ -1,12 +1,12 @@ -from collections.abc import Callable -from functools import wraps import logging import traceback +from collections.abc import Callable +from functools import wraps from typing import Any -from celery import Celery, Task # type: ignore[import-untyped] -from celery.exceptions import Ignore # type: ignore[import-untyped] +from celery import Celery, Task # type: ignore[import-untyped] from celery.contrib.abortable import AbortableTask # type: ignore[import-untyped] +from celery.exceptions import Ignore # type: ignore[import-untyped] from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase @@ -27,7 +27,10 @@ def create_app(celery_settings: CelerySettings) -> Celery: app.conf.result_expires = celery_settings.CELERY_RESULT_EXPIRES app.conf.result_extended = True # original args are included in the results app.conf.result_serializer = "json" + app.conf.task_send_sent_event = True app.conf.task_track_started = True + app.conf.worker_send_task_events = True # enable tasks monitoring + return app @@ -39,7 +42,7 @@ def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any: except Exception as exc: exc_type = type(exc).__name__ exc_message = f"{exc}" - exc_traceback = traceback.format_exc().split('\n') + exc_traceback = traceback.format_exc().split("\n") _logger.exception( "Task %s failed with exception: %s", @@ -53,9 +56,10 @@ def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any: exc_type=exc_type, exc_msg=exc_message, ).model_dump(mode="json"), - traceback=exc_traceback + traceback=exc_traceback, ) - raise Ignore from exc # ignore doing state updates + raise Ignore from exc # ignore doing state updates + return wrapper From 2d98c9cb1c188014b784f7cbf5229339e43ede2f Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 14 Mar 2025 22:39:17 +0100 Subject: [PATCH 4/4] add test --- .../tests/unit/modules/celery/conftest.py | 11 ++++++- .../tests/unit/modules/celery/test_celery.py | 32 +++++++++++++++++-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/services/storage/tests/unit/modules/celery/conftest.py b/services/storage/tests/unit/modules/celery/conftest.py index 3cd06195b286..8bbb621ef0bd 100644 --- a/services/storage/tests/unit/modules/celery/conftest.py +++ b/services/storage/tests/unit/modules/celery/conftest.py @@ -43,6 +43,9 @@ def celery_conf() -> dict[str, Any]: "result_expires": timedelta(days=7), "result_extended": True, "pool": "threads", + "worker_send_task_events": True, + "task_track_started": True, + "task_send_sent_event": True, } @@ -77,7 +80,13 @@ def celery_worker_controller( register_celery_tasks(celery_app) - with start_worker(celery_app, loglevel="info", perform_ping_check=False) as worker: + with start_worker( + celery_app, + pool="threads", + loglevel="info", + perform_ping_check=False, + worker_kwargs={"hostname": "celery@worker1"}, + ) as worker: worker_init.send(sender=worker) yield worker diff --git a/services/storage/tests/unit/modules/celery/test_celery.py b/services/storage/tests/unit/modules/celery/test_celery.py index 99c3cc34263a..097e5b269ab9 100644 --- a/services/storage/tests/unit/modules/celery/test_celery.py +++ b/services/storage/tests/unit/modules/celery/test_celery.py @@ -4,17 +4,21 @@ from collections.abc import Callable from random import randint -from pydantic import TypeAdapter, ValidationError import pytest from celery import Celery, Task from celery.contrib.abortable import AbortableTask from common_library.errors_classes import OsparcErrorMixin from models_library.progress_bar import ProgressReport +from pydantic import TypeAdapter, ValidationError from servicelib.logging_utils import log_context from simcore_service_storage.modules.celery import get_event_loop from simcore_service_storage.modules.celery._common import define_task from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient -from simcore_service_storage.modules.celery.models import TaskContext, TaskError, TaskState +from simcore_service_storage.modules.celery.models import ( + TaskContext, + TaskError, + TaskState, +) from simcore_service_storage.modules.celery.utils import ( get_celery_worker, get_fastapi_app, @@ -54,7 +58,7 @@ def sync_archive(task: Task, files: list[str]) -> str: class MyError(OsparcErrorMixin, Exception): - msg_template = "Something strange happened: {msg}" + msg_template = "Something strange happened: {msg}" def failure_task(task: Task): @@ -163,3 +167,25 @@ async def test_aborting_task_results_with_aborted_state( assert ( await celery_client.get_task_status(task_context, task_uuid) ).task_state == TaskState.ABORTED + + +@pytest.mark.usefixtures("celery_worker") +async def test_listing_task_uuids_contains_submitted_task( + celery_client: CeleryTaskQueueClient, +): + task_context = TaskContext(user_id=42) + + task_uuid = await celery_client.send_task( + "dreamer_task", + task_context=task_context, + ) + + for attempt in Retrying( + retry=retry_if_exception_type(AssertionError), + wait=wait_fixed(1), + stop=stop_after_delay(10), + ): + with attempt: + assert task_uuid in await celery_client.get_task_uuids(task_context) + + assert task_uuid in await celery_client.get_task_uuids(task_context)