diff --git a/packages/celery-library/src/celery_library/common.py b/packages/celery-library/src/celery_library/common.py index d50e75597c6..ef45ef4c8b9 100644 --- a/packages/celery-library/src/celery_library/common.py +++ b/packages/celery-library/src/celery_library/common.py @@ -2,13 +2,9 @@ from typing import Any from celery import Celery # type: ignore[import-untyped] -from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase -from .backends._redis import RedisTaskInfoStore -from .task_manager import CeleryTaskManager - def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]: base_config = { @@ -36,22 +32,3 @@ def create_app(settings: CelerySettings) -> Celery: ), **_celery_configure(settings), ) - - -async def create_task_manager( - app: Celery, settings: CelerySettings -) -> CeleryTaskManager: - redis_client_sdk = RedisClientSDK( - settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( - RedisDatabase.CELERY_TASKS - ), - client_name="celery_tasks", - ) - await redis_client_sdk.setup() - # GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159 - - return CeleryTaskManager( - app, - settings, - RedisTaskInfoStore(redis_client_sdk), - ) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index dd5bf047e65..4673355efd1 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -6,9 +6,7 @@ from celery.worker.worker import WorkController # type: ignore[import-untyped] from servicelib.celery.app_server import BaseAppServer from servicelib.logging_utils import log_context -from settings_library.celery import CelerySettings -from .common import create_task_manager from .utils import get_app_server, set_app_server _logger = logging.getLogger(__name__) @@ -16,7 +14,6 @@ def on_worker_init( app_server: BaseAppServer, - celery_settings: CelerySettings, sender: WorkController, **_kwargs, ) -> None: @@ -26,20 +23,15 @@ def _init(startup_complete_event: threading.Event) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - async def _setup_task_manager(): + async def _setup(): assert sender.app # nosec assert isinstance(sender.app, Celery) # nosec - app_server.task_manager = await create_task_manager( - sender.app, - celery_settings, - ) - set_app_server(sender.app, app_server) app_server.event_loop = loop - loop.run_until_complete(_setup_task_manager()) + loop.run_until_complete(_setup()) loop.run_until_complete(app_server.lifespan(startup_complete_event)) thread = threading.Thread( diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index e9fc599136a..5a9434ed5a0 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -3,7 +3,7 @@ import datetime import threading -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterable, AsyncIterator, Callable from functools import partial from typing import Any @@ -12,7 +12,7 @@ from celery.contrib.testing.worker import TestWorkController, start_worker from celery.signals import worker_init, worker_shutdown from celery.worker.worker import WorkController -from celery_library.common import create_task_manager +from celery_library.backends._redis import RedisTaskInfoStore from celery_library.signals import on_worker_init, on_worker_shutdown from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types @@ -20,7 +20,9 @@ from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.celery.app_server import BaseAppServer from settings_library.celery import CelerySettings -from settings_library.redis import RedisSettings +from settings_library.redis import RedisDatabase, RedisSettings + +from servicelib.redis import RedisClientSDK pytest_plugins = [ "pytest_simcore.docker_compose", @@ -34,9 +36,26 @@ class FakeAppServer(BaseAppServer): + def __init__(self, app, settings: CelerySettings) -> None: + super().__init__(app) + self._task_manager = None + self._settings = settings + async def lifespan(self, startup_completed_event: threading.Event) -> None: + sdk = RedisClientSDK( + self._settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(RedisDatabase.CELERY_TASKS), + client_name="pytest-celery-tasks-worker", + ) + await sdk.setup() + self._task_manager = CeleryTaskManager( + self._app, + self._settings, + RedisTaskInfoStore(sdk) + ) + startup_completed_event.set() await self.shutdown_event.wait() # wait for shutdown + await sdk.shutdown() @pytest.fixture @@ -61,11 +80,25 @@ def app_environment( "REDIS_SECURE": redis_service.REDIS_SECURE, "REDIS_HOST": redis_service.REDIS_HOST, "REDIS_PORT": f"{redis_service.REDIS_PORT}", - "REDIS_PASSWORD": redis_service.REDIS_PASSWORD.get_secret_value(), + "REDIS_PASSWORD": redis_service.REDIS_PASSWORD.get_secret_value() if redis_service.REDIS_PASSWORD else "", }, ) +@pytest.fixture +async def redis_client_sdk( + redis_service: RedisSettings, +) -> AsyncIterable[RedisClientSDK]: + sdk = RedisClientSDK( + redis_service.build_redis_dsn(RedisDatabase.CELERY_TASKS), + decode_responses=False, + client_name="pytest-celery-tasks", + ) + await sdk.setup() + yield sdk + await sdk.shutdown() + + @pytest.fixture def celery_settings( app_environment: EnvVarsDict, @@ -73,11 +106,6 @@ def celery_settings( return CelerySettings.create_from_envs() -@pytest.fixture -def app_server() -> BaseAppServer: - return FakeAppServer(app=None) - - @pytest.fixture(scope="session") def celery_config() -> dict[str, Any]: return { @@ -94,6 +122,11 @@ def celery_config() -> dict[str, Any]: } +@pytest.fixture +def app_server(celery_settings) -> BaseAppServer: + return FakeAppServer(app=None, settings=celery_settings) + + @pytest.fixture async def with_celery_worker( celery_app: Celery, @@ -102,7 +135,7 @@ async def with_celery_worker( register_celery_tasks: Callable[[Celery], None], ) -> AsyncIterator[TestWorkController]: def _on_worker_init_wrapper(sender: WorkController, **_kwargs): - return partial(on_worker_init, app_server, celery_settings)(sender, **_kwargs) + return partial(on_worker_init, app_server)(sender, **_kwargs) worker_init.connect(_on_worker_init_wrapper) worker_shutdown.connect(on_worker_shutdown) @@ -124,11 +157,13 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): async def celery_task_manager( celery_app: Celery, celery_settings: CelerySettings, + redis_client_sdk: RedisClientSDK, with_celery_worker: TestWorkController, ) -> CeleryTaskManager: register_celery_types() - return await create_task_manager( + return CeleryTaskManager( celery_app, celery_settings, + RedisTaskInfoStore(redis_client_sdk) ) diff --git a/packages/service-library/src/servicelib/celery/models.py b/packages/service-library/src/servicelib/celery/models.py index 40756553377..3c39de96a86 100644 --- a/packages/service-library/src/servicelib/celery/models.py +++ b/packages/service-library/src/servicelib/celery/models.py @@ -58,7 +58,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None: ... async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None: ... - async def list_tasks(self, task_context: TaskFilter) -> list[Task]: ... + async def list_tasks(self, task_filter: TaskFilter) -> list[Task]: ... async def remove_task(self, task_id: TaskID) -> None: ... diff --git a/services/storage/src/simcore_service_storage/core/application.py b/services/storage/src/simcore_service_storage/core/application.py index 9dbbbd9c181..cd0930b1baa 100644 --- a/services/storage/src/simcore_service_storage/core/application.py +++ b/services/storage/src/simcore_service_storage/core/application.py @@ -71,12 +71,13 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901 setup_s3(app) setup_client_session(app) - if settings.STORAGE_CELERY and not settings.STORAGE_WORKER_MODE: - setup_rabbitmq(app) - + if settings.STORAGE_CELERY: setup_task_manager(app, celery_settings=settings.STORAGE_CELERY) - setup_rpc_routes(app) + if not settings.STORAGE_WORKER_MODE: + setup_rabbitmq(app) + setup_rpc_routes(app) + setup_rest_api_long_running_tasks_for_uploads(app) setup_rest_api_routes(app, API_VTAG) set_exception_handlers(app) diff --git a/services/storage/src/simcore_service_storage/modules/celery/__init__.py b/services/storage/src/simcore_service_storage/modules/celery/__init__.py index 684262d3f9b..5f55b758560 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/__init__.py +++ b/services/storage/src/simcore_service_storage/modules/celery/__init__.py @@ -1,4 +1,5 @@ -from celery_library.common import create_app, create_task_manager +from celery_library.backends._redis import RedisTaskInfoStore +from celery_library.common import create_app from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types, register_pydantic_types from fastapi import FastAPI @@ -6,21 +7,39 @@ FileUploadCompletionBody, FoldersBody, ) +from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings +from settings_library.redis import RedisDatabase from ...models import FileMetaData def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None: async def on_startup() -> None: - app.state.task_manager = await create_task_manager( - create_app(celery_settings), celery_settings + redis_client_sdk = RedisClientSDK( + celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS + ), + client_name="celery_tasks", + ) + app.state.celery_tasks_redis_client_sdk = redis_client_sdk + await redis_client_sdk.setup() + + app.state.task_manager = CeleryTaskManager( + create_app(celery_settings), + celery_settings, + RedisTaskInfoStore(redis_client_sdk), ) register_celery_types() register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody) + async def on_shutdown() -> None: + redis_client_sdk: RedisClientSDK = app.state.celery_tasks_redis_client_sdk + await redis_client_sdk.shutdown() + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager: diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py index 396ed37accf..b59317730d4 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -34,7 +34,7 @@ def worker_init_wrapper(sender, **_kwargs): assert _settings.STORAGE_CELERY # nosec - return partial(on_worker_init, app_server, _settings.STORAGE_CELERY)( + return partial(on_worker_init, app_server)( sender, **_kwargs ) diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index 32813640197..5f71051d0d0 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -1019,7 +1019,7 @@ async def with_storage_celery_worker( def _on_worker_init_wrapper(sender: WorkController, **_kwargs): assert app_settings.STORAGE_CELERY # nosec - return partial(on_worker_init, app_server, app_settings.STORAGE_CELERY)( + return partial(on_worker_init, app_server)( sender, **_kwargs )