Skip to content

Commit ce01b63

Browse files
add celery task
1 parent 1a2cc8d commit ce01b63

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from ..dsm import setup_dsm
3030
from ..dsm_cleaner import setup_dsm_cleaner
3131
from ..exceptions.handlers import set_exception_handlers
32+
from ..modules.celery.celery import setup_celery
3233
from ..modules.db import setup_db
3334
from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads
3435
from ..modules.redis import setup as setup_redis
@@ -81,11 +82,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
8182
setup_rest_api_routes(app, API_VTAG)
8283
set_exception_handlers(app)
8384

85+
setup_redis(app)
86+
8487
setup_dsm(app)
8588
if settings.STORAGE_CLEANER_INTERVAL_S:
86-
setup_redis(app)
8789
setup_dsm_cleaner(app)
8890

91+
setup_celery(app)
92+
8993
if settings.STORAGE_PROFILING:
9094
app.add_middleware(ProfilerMiddleware)
9195

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Whitespace-only changes.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
from multiprocessing import Process
3+
from typing import cast
4+
5+
from celery import Celery
6+
from celery.apps.worker import Worker
7+
from fastapi import FastAPI
8+
from settings_library.redis import RedisDatabase
9+
from simcore_service_storage.modules.celery.tasks import setup_celery_tasks
10+
11+
from ...core.settings import get_application_settings
12+
13+
_log = logging.getLogger(__name__)
14+
15+
16+
def setup_celery(app: FastAPI) -> None:
17+
async def on_startup() -> None:
18+
settings = get_application_settings(app)
19+
assert settings.STORAGE_REDIS
20+
21+
redis_dsn = settings.STORAGE_REDIS.build_redis_dsn(
22+
RedisDatabase.CELERY_TASKS,
23+
)
24+
25+
app.state.celery_app = Celery(
26+
broker=redis_dsn,
27+
backend=redis_dsn,
28+
)
29+
30+
setup_celery_tasks(app.state.celery_app)
31+
32+
# FIXME: Experiment: to start worker in a separate process
33+
def worker_process():
34+
worker = Worker(app=app.state.celery_app)
35+
worker.start()
36+
37+
worker_proc = Process(target=worker_process)
38+
worker_proc.start()
39+
40+
async def on_shutdown() -> None:
41+
_log.warning("Implementing shutdown of celery app")
42+
43+
app.add_event_handler("startup", on_startup)
44+
app.add_event_handler("shutdown", on_shutdown)
45+
46+
47+
def get_celery_app(app: FastAPI) -> Celery:
48+
return cast(Celery, app.state.celery_app)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import logging
2+
3+
from celery import Celery
4+
5+
_log = logging.getLogger(__name__)
6+
7+
8+
def archive(files: list[str]) -> None:
9+
_log.info(f"Archiving {files=}")
10+
11+
12+
def setup_celery_tasks(celery_app: Celery) -> None:
13+
celery_app.task()(archive)

0 commit comments

Comments
 (0)