Skip to content

Commit 870d96c

Browse files
fix: celery task manager setup
1 parent 6d68a6a commit 870d96c

File tree

3 files changed

+41
-11
lines changed

3 files changed

+41
-11
lines changed
Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,48 @@
1-
from celery_library.common import create_app, create_task_manager
1+
import logging
2+
3+
from celery_library.backends._redis import RedisTaskInfoStore
4+
from celery_library.common import create_app
5+
from celery_library.task_manager import CeleryTaskManager
26
from celery_library.types import register_celery_types, register_pydantic_types
37
from fastapi import FastAPI
8+
from servicelib.logging_utils import log_context
9+
from servicelib.redis import RedisClientSDK
410
from settings_library.celery import CelerySettings
11+
from settings_library.redis import RedisDatabase
512

613
from ..celery_worker.worker_tasks.tasks import pydantic_types_to_register
714

15+
_logger = logging.getLogger(__name__)
16+
817

9-
def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None:
18+
def setup_task_manager(app: FastAPI, settings: CelerySettings) -> None:
1019
async def on_startup() -> None:
11-
app.state.task_manager = await create_task_manager(
12-
create_app(celery_settings), celery_settings
13-
)
20+
with log_context(_logger, logging.INFO, "Setting up Celery"):
21+
redis_client_sdk = RedisClientSDK(
22+
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
23+
RedisDatabase.CELERY_TASKS
24+
),
25+
client_name="api_server_celery_tasks",
26+
)
27+
app.state.celery_tasks_redis_client_sdk = redis_client_sdk
28+
await redis_client_sdk.setup()
29+
30+
app.state.task_manager = CeleryTaskManager(
31+
create_app(settings),
32+
settings,
33+
RedisTaskInfoStore(redis_client_sdk),
34+
)
35+
36+
register_celery_types()
37+
register_pydantic_types(*pydantic_types_to_register)
1438

15-
register_celery_types()
16-
register_pydantic_types(*pydantic_types_to_register)
39+
async def on_shutdown() -> None:
40+
with log_context(_logger, logging.INFO, "Shutting down Celery"):
41+
redis_client_sdk: RedisClientSDK | None = (
42+
app.state.celery_tasks_redis_client_sdk
43+
)
44+
if redis_client_sdk:
45+
await redis_client_sdk.shutdown()
1746

1847
app.add_event_handler("startup", on_startup)
48+
app.add_event_handler("shutdown", on_shutdown)

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from ..dsm import setup_dsm
3737
from ..dsm_cleaner import setup_dsm_cleaner
3838
from ..exceptions.handlers import set_exception_handlers
39-
from ..modules.celery import setup_celery
39+
from ..modules.celery import setup_task_manager
4040
from ..modules.db import setup_db
4141
from ..modules.rabbitmq import setup as setup_rabbitmq
4242
from ..modules.redis import setup as setup_redis
@@ -71,7 +71,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
7171
setup_client_session(app, tracing_settings=settings.STORAGE_TRACING)
7272

7373
if settings.STORAGE_CELERY:
74-
setup_celery(app, settings=settings.STORAGE_CELERY)
74+
setup_task_manager(app, settings=settings.STORAGE_CELERY)
7575

7676
if not settings.STORAGE_WORKER_MODE:
7777
setup_rabbitmq(app)

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
_logger = logging.getLogger(__name__)
2020

2121

22-
def setup_celery(app: FastAPI, settings: CelerySettings) -> None:
22+
def setup_task_manager(app: FastAPI, settings: CelerySettings) -> None:
2323
async def on_startup() -> None:
2424
with log_context(_logger, logging.INFO, "Setting up Celery"):
2525
redis_client_sdk = RedisClientSDK(
2626
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
2727
RedisDatabase.CELERY_TASKS
2828
),
29-
client_name="celery_tasks",
29+
client_name="storage_celery_tasks",
3030
)
3131
app.state.celery_tasks_redis_client_sdk = redis_client_sdk
3232
await redis_client_sdk.setup()

0 commit comments

Comments
 (0)