Skip to content

Commit 68dce90

Browse files
add Celery plugin
1 parent 4eea09e commit 68dce90

File tree

5 files changed

+77
-0
lines changed

5 files changed

+77
-0
lines changed

services/web/server/src/simcore_service_webserver/application.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from .api_keys.plugin import setup_api_keys
2222
from .application_settings import get_application_settings, setup_settings
2323
from .catalog.plugin import setup_catalog
24+
from .celery.plugin import setup_celery
2425
from .collaboration.bootstrap import (
2526
setup_realtime_collaboration,
2627
)
@@ -188,6 +189,9 @@ def create_application() -> web.Application:
188189
setup_exporter(app)
189190
setup_realtime_collaboration(app)
190191

192+
# Celery
193+
setup_celery(app)
194+
191195
# NOTE: *last* events
192196
app.on_startup.append(_create_welcome_banner(WELCOME_MSG))
193197
app.on_shutdown.append(_create_finished_banner())
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from ._task_manager import get_task_manager
2+
3+
__all__: tuple[str, ...] = ("get_task_manager",)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import logging
2+
from typing import Final
3+
4+
from aiohttp import web
5+
from celery_library.backends.redis import RedisTaskInfoStore
6+
from celery_library.common import create_app
7+
from celery_library.task_manager import CeleryTaskManager
8+
from celery_library.types import register_celery_types
9+
from servicelib.celery.task_manager import TaskManager
10+
from servicelib.logging_utils import log_context
11+
from settings_library.celery import CelerySettings
12+
13+
from ..redis import get_redis_celery_tasks_client_sdk
14+
from .settings import get_plugin_settings
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
_APP_CELERY_TASK_MANAGER_KEY: Final = web.AppKey(
19+
CeleryTaskManager.__name__, CeleryTaskManager
20+
)
21+
22+
23+
async def setup_task_manager(app: web.Application):
24+
with log_context(_logger, logging.INFO, "Setting up Celery task manager"):
25+
celery_settings: CelerySettings = get_plugin_settings(app)
26+
27+
redis_client_sdk = get_redis_celery_tasks_client_sdk(app)
28+
celery_app = create_app(celery_settings)
29+
30+
app[_APP_CELERY_TASK_MANAGER_KEY] = CeleryTaskManager(
31+
celery_app,
32+
celery_settings,
33+
RedisTaskInfoStore(redis_client_sdk),
34+
)
35+
register_celery_types()
36+
37+
yield
38+
39+
40+
def get_task_manager(app: web.Application) -> TaskManager:
41+
return app[_APP_CELERY_TASK_MANAGER_KEY]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import logging
2+
3+
from aiohttp import web
4+
5+
from ..application_setup import ModuleCategory, app_setup_func
6+
from ._task_manager import setup_task_manager
7+
8+
_logger = logging.getLogger(__name__)
9+
10+
11+
@app_setup_func(
12+
__name__,
13+
ModuleCategory.ADDON,
14+
settings_name="WEBSERVER_CELERY",
15+
logger=_logger,
16+
)
17+
def setup_celery(app: web.Application):
18+
app.cleanup_ctx.append(setup_task_manager)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from aiohttp import web
2+
from settings_library.celery import CelerySettings
3+
4+
from ..application_keys import APP_SETTINGS_APPKEY
5+
6+
7+
def get_plugin_settings(app: web.Application) -> CelerySettings:
8+
settings = app[APP_SETTINGS_APPKEY].WEBSERVER_CELERY
9+
assert settings, "plugin.setup_celery not called?" # nosec
10+
assert isinstance(settings, CelerySettings) # nosec
11+
return settings

0 commit comments

Comments
 (0)