Skip to content

Commit 3174045

Browse files
feat: move redis client lifecycle to app server's one
1 parent 127a619 commit 3174045

File tree

4 files changed

+45
-21
lines changed

4 files changed

+45
-21
lines changed

packages/celery-library/src/celery_library/signals.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from servicelib.logging_utils import log_context
99
from settings_library.celery import CelerySettings
1010

11-
from .common import create_task_manager
1211
from .utils import get_app_server, set_app_server
1312

1413
_logger = logging.getLogger(__name__)
@@ -26,20 +25,15 @@ def _init(startup_complete_event: threading.Event) -> None:
2625
loop = asyncio.new_event_loop()
2726
asyncio.set_event_loop(loop)
2827

29-
async def _setup_task_manager():
28+
async def _setup():
3029
assert sender.app # nosec
3130
assert isinstance(sender.app, Celery) # nosec
3231

33-
app_server.task_manager = await create_task_manager(
34-
sender.app,
35-
celery_settings,
36-
)
37-
3832
set_app_server(sender.app, app_server)
3933

4034
app_server.event_loop = loop
4135

42-
loop.run_until_complete(_setup_task_manager())
36+
loop.run_until_complete(_setup())
4337
loop.run_until_complete(app_server.lifespan(startup_complete_event))
4438

4539
thread = threading.Thread(

packages/service-library/src/servicelib/celery/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None: ...
5858

5959
async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None: ...
6060

61-
async def list_tasks(self, task_context: TaskFilter) -> list[Task]: ...
61+
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]: ...
6262

6363
async def remove_task(self, task_id: TaskID) -> None: ...
6464

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from ..dsm import setup_dsm
3737
from ..dsm_cleaner import setup_dsm_cleaner
3838
from ..exceptions.handlers import set_exception_handlers
39-
from ..modules.celery import setup_task_manager
39+
from ..modules.celery import setup_celery
4040
from ..modules.db import setup_db
4141
from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads
4242
from ..modules.rabbitmq import setup as setup_rabbitmq
@@ -71,12 +71,13 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
7171
setup_s3(app)
7272
setup_client_session(app, tracing_settings=settings.STORAGE_TRACING)
7373

74-
if settings.STORAGE_CELERY and not settings.STORAGE_WORKER_MODE:
75-
setup_rabbitmq(app)
74+
if settings.STORAGE_CELERY:
75+
setup_celery(app, settings=settings.STORAGE_CELERY)
7676

77-
setup_task_manager(app, celery_settings=settings.STORAGE_CELERY)
77+
if not settings.STORAGE_WORKER_MODE:
78+
setup_rabbitmq(app)
79+
setup_rpc_routes(app)
7880

79-
setup_rpc_routes(app)
8081
setup_rest_api_long_running_tasks_for_uploads(app)
8182
setup_rest_api_routes(app, API_VTAG)
8283
set_exception_handlers(app)

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,55 @@
1-
from celery_library.common import create_app, create_task_manager
1+
import logging
2+
3+
from celery_library.backends._redis import RedisTaskInfoStore
4+
from celery_library.common import create_app
25
from celery_library.task_manager import CeleryTaskManager
36
from celery_library.types import register_celery_types, register_pydantic_types
47
from fastapi import FastAPI
58
from models_library.api_schemas_storage.storage_schemas import (
69
FileUploadCompletionBody,
710
FoldersBody,
811
)
12+
from servicelib.logging_utils import log_context
13+
from servicelib.redis import RedisClientSDK
914
from settings_library.celery import CelerySettings
15+
from settings_library.redis import RedisDatabase
1016

1117
from ...models import FileMetaData
1218

19+
_logger = logging.getLogger(__name__)
20+
1321

14-
def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None:
22+
def setup_celery(app: FastAPI, settings: CelerySettings) -> None:
1523
async def on_startup() -> None:
16-
app.state.task_manager = await create_task_manager(
17-
create_app(celery_settings), celery_settings
18-
)
24+
with log_context(_logger, logging.INFO, "Setting up Celery"):
25+
redis_client_sdk = RedisClientSDK(
26+
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
27+
RedisDatabase.CELERY_TASKS
28+
),
29+
client_name="celery_tasks",
30+
)
31+
app.state.celery_tasks_redis_client_sdk = redis_client_sdk
32+
await redis_client_sdk.setup()
33+
34+
app.state.task_manager = CeleryTaskManager(
35+
create_app(settings),
36+
settings,
37+
RedisTaskInfoStore(redis_client_sdk),
38+
)
39+
40+
register_celery_types()
41+
register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody)
1942

20-
register_celery_types()
21-
register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody)
43+
async def on_shutdown() -> None:
44+
with log_context(_logger, logging.INFO, "Shutting down Celery"):
45+
redis_client_sdk: RedisClientSDK | None = (
46+
app.state.celery_tasks_redis_client_sdk
47+
)
48+
if redis_client_sdk:
49+
await redis_client_sdk.shutdown()
2250

2351
app.add_event_handler("startup", on_startup)
52+
app.add_event_handler("shutdown", on_shutdown)
2453

2554

2655
def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager:

0 commit comments

Comments
 (0)