Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions packages/celery-library/src/celery_library/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
from typing import Any

from celery import Celery # type: ignore[import-untyped]
from servicelib.redis import RedisClientSDK
from settings_library.celery import CelerySettings
from settings_library.redis import RedisDatabase

from .backends._redis import RedisTaskInfoStore
from .task_manager import CeleryTaskManager


def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
base_config = {
Expand Down Expand Up @@ -36,22 +32,3 @@ def create_app(settings: CelerySettings) -> Celery:
),
**_celery_configure(settings),
)


async def create_task_manager(
app: Celery, settings: CelerySettings
) -> CeleryTaskManager:
redis_client_sdk = RedisClientSDK(
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
RedisDatabase.CELERY_TASKS
),
client_name="celery_tasks",
)
await redis_client_sdk.setup()
# GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159

return CeleryTaskManager(
app,
settings,
RedisTaskInfoStore(redis_client_sdk),
)
12 changes: 2 additions & 10 deletions packages/celery-library/src/celery_library/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
from celery.worker.worker import WorkController # type: ignore[import-untyped]
from servicelib.celery.app_server import BaseAppServer
from servicelib.logging_utils import log_context
from settings_library.celery import CelerySettings

from .common import create_task_manager
from .utils import get_app_server, set_app_server

_logger = logging.getLogger(__name__)


def on_worker_init(
app_server: BaseAppServer,
celery_settings: CelerySettings,
sender: WorkController,
**_kwargs,
) -> None:
Expand All @@ -26,20 +23,15 @@ def _init(startup_complete_event: threading.Event) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

async def _setup_task_manager():
async def _setup():
assert sender.app # nosec
assert isinstance(sender.app, Celery) # nosec

app_server.task_manager = await create_task_manager(
sender.app,
celery_settings,
)

set_app_server(sender.app, app_server)

app_server.event_loop = loop

loop.run_until_complete(_setup_task_manager())
loop.run_until_complete(_setup())
loop.run_until_complete(app_server.lifespan(startup_complete_event))

thread = threading.Thread(
Expand Down
57 changes: 46 additions & 11 deletions packages/celery-library/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import datetime
import threading
from collections.abc import AsyncIterator, Callable
from collections.abc import AsyncIterable, AsyncIterator, Callable
from functools import partial
from typing import Any

Expand All @@ -12,15 +12,17 @@
from celery.contrib.testing.worker import TestWorkController, start_worker
from celery.signals import worker_init, worker_shutdown
from celery.worker.worker import WorkController
from celery_library.common import create_task_manager
from celery_library.backends._redis import RedisTaskInfoStore
from celery_library.signals import on_worker_init, on_worker_shutdown
from celery_library.task_manager import CeleryTaskManager
from celery_library.types import register_celery_types
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.celery.app_server import BaseAppServer
from settings_library.celery import CelerySettings
from settings_library.redis import RedisSettings
from settings_library.redis import RedisDatabase, RedisSettings

from servicelib.redis import RedisClientSDK

pytest_plugins = [
"pytest_simcore.docker_compose",
Expand All @@ -34,9 +36,26 @@


class FakeAppServer(BaseAppServer):
def __init__(self, app, settings: CelerySettings) -> None:
super().__init__(app)
self._task_manager = None
self._settings = settings

async def lifespan(self, startup_completed_event: threading.Event) -> None:
sdk = RedisClientSDK(
self._settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(RedisDatabase.CELERY_TASKS),
client_name="pytest-celery-tasks-worker",
)
await sdk.setup()
self._task_manager = CeleryTaskManager(
self._app,
self._settings,
RedisTaskInfoStore(sdk)
)

startup_completed_event.set()
await self.shutdown_event.wait() # wait for shutdown
await sdk.shutdown()


@pytest.fixture
Expand All @@ -61,23 +80,32 @@ def app_environment(
"REDIS_SECURE": redis_service.REDIS_SECURE,
"REDIS_HOST": redis_service.REDIS_HOST,
"REDIS_PORT": f"{redis_service.REDIS_PORT}",
"REDIS_PASSWORD": redis_service.REDIS_PASSWORD.get_secret_value(),
"REDIS_PASSWORD": redis_service.REDIS_PASSWORD.get_secret_value() if redis_service.REDIS_PASSWORD else "",
},
)


@pytest.fixture
async def redis_client_sdk(
redis_service: RedisSettings,
) -> AsyncIterable[RedisClientSDK]:
sdk = RedisClientSDK(
redis_service.build_redis_dsn(RedisDatabase.CELERY_TASKS),
decode_responses=False,
client_name="pytest-celery-tasks",
)
await sdk.setup()
yield sdk
await sdk.shutdown()


@pytest.fixture
def celery_settings(
app_environment: EnvVarsDict,
) -> CelerySettings:
return CelerySettings.create_from_envs()


@pytest.fixture
def app_server() -> BaseAppServer:
return FakeAppServer(app=None)


@pytest.fixture(scope="session")
def celery_config() -> dict[str, Any]:
return {
Expand All @@ -94,6 +122,11 @@ def celery_config() -> dict[str, Any]:
}


@pytest.fixture
def app_server(celery_settings) -> BaseAppServer:
return FakeAppServer(app=None, settings=celery_settings)


@pytest.fixture
async def with_celery_worker(
celery_app: Celery,
Expand All @@ -102,7 +135,7 @@ async def with_celery_worker(
register_celery_tasks: Callable[[Celery], None],
) -> AsyncIterator[TestWorkController]:
def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
return partial(on_worker_init, app_server, celery_settings)(sender, **_kwargs)
return partial(on_worker_init, app_server)(sender, **_kwargs)

worker_init.connect(_on_worker_init_wrapper)
worker_shutdown.connect(on_worker_shutdown)
Expand All @@ -124,11 +157,13 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
async def celery_task_manager(
celery_app: Celery,
celery_settings: CelerySettings,
redis_client_sdk: RedisClientSDK,
with_celery_worker: TestWorkController,
) -> CeleryTaskManager:
register_celery_types()

return await create_task_manager(
return CeleryTaskManager(
celery_app,
celery_settings,
RedisTaskInfoStore(redis_client_sdk)
)
2 changes: 1 addition & 1 deletion packages/service-library/src/servicelib/celery/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None: ...

async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None: ...

async def list_tasks(self, task_context: TaskFilter) -> list[Task]: ...
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]: ...

async def remove_task(self, task_id: TaskID) -> None: ...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
setup_s3(app)
setup_client_session(app)

if settings.STORAGE_CELERY and not settings.STORAGE_WORKER_MODE:
setup_rabbitmq(app)

if settings.STORAGE_CELERY:
setup_task_manager(app, celery_settings=settings.STORAGE_CELERY)

setup_rpc_routes(app)
if not settings.STORAGE_WORKER_MODE:
setup_rabbitmq(app)
setup_rpc_routes(app)

setup_rest_api_long_running_tasks_for_uploads(app)
setup_rest_api_routes(app, API_VTAG)
set_exception_handlers(app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
from celery_library.common import create_app, create_task_manager
from celery_library.backends._redis import RedisTaskInfoStore
from celery_library.common import create_app
from celery_library.task_manager import CeleryTaskManager
from celery_library.types import register_celery_types, register_pydantic_types
from fastapi import FastAPI
from models_library.api_schemas_storage.storage_schemas import (
FileUploadCompletionBody,
FoldersBody,
)
from servicelib.redis import RedisClientSDK
from settings_library.celery import CelerySettings
from settings_library.redis import RedisDatabase

from ...models import FileMetaData


def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to migrate storage to new FastAPI's lifespans...

async def on_startup() -> None:
app.state.task_manager = await create_task_manager(
create_app(celery_settings), celery_settings
redis_client_sdk = RedisClientSDK(
celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
RedisDatabase.CELERY_TASKS
),
client_name="celery_tasks",
)
app.state.celery_tasks_redis_client_sdk = redis_client_sdk
await redis_client_sdk.setup()

app.state.task_manager = CeleryTaskManager(
create_app(celery_settings),
celery_settings,
RedisTaskInfoStore(redis_client_sdk),
)

register_celery_types()
register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody)

async def on_shutdown() -> None:
redis_client_sdk: RedisClientSDK = app.state.celery_tasks_redis_client_sdk
await redis_client_sdk.shutdown()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

def worker_init_wrapper(sender, **_kwargs):
assert _settings.STORAGE_CELERY # nosec
return partial(on_worker_init, app_server, _settings.STORAGE_CELERY)(
return partial(on_worker_init, app_server)(
sender, **_kwargs
)

Expand Down
2 changes: 1 addition & 1 deletion services/storage/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ async def with_storage_celery_worker(

def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
assert app_settings.STORAGE_CELERY # nosec
return partial(on_worker_init, app_server, app_settings.STORAGE_CELERY)(
return partial(on_worker_init, app_server)(
sender, **_kwargs
)

Expand Down
Loading