Skip to content

Commit 866ca9b

Browse files
add celery task queue class
1 parent ce01b63 commit 866ca9b

File tree

2 files changed

+35
-19
lines changed

2 files changed

+35
-19
lines changed

services/storage/src/simcore_service_storage/modules/celery/celery.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,52 @@
44

55
from celery import Celery
66
from celery.apps.worker import Worker
7+
from celery.result import AsyncResult
78
from fastapi import FastAPI
89
from settings_library.redis import RedisDatabase
9-
from simcore_service_storage.modules.celery.tasks import setup_celery_tasks
10+
from simcore_service_storage.modules.celery.tasks import archive
1011

11-
from ...core.settings import get_application_settings
12+
from ...core.settings import ApplicationSettings, get_application_settings
1213

1314
_log = logging.getLogger(__name__)
1415

1516

16-
def setup_celery(app: FastAPI) -> None:
17-
async def on_startup() -> None:
18-
settings = get_application_settings(app)
19-
assert settings.STORAGE_REDIS
20-
21-
redis_dsn = settings.STORAGE_REDIS.build_redis_dsn(
17+
class CeleryTaskQueue:
18+
def __init__(self, app_settings: ApplicationSettings):
19+
assert app_settings.STORAGE_REDIS
20+
redis_dsn = app_settings.STORAGE_REDIS.build_redis_dsn(
2221
RedisDatabase.CELERY_TASKS,
2322
)
2423

25-
app.state.celery_app = Celery(
24+
self._celery_app = Celery(
2625
broker=redis_dsn,
2726
backend=redis_dsn,
2827
)
2928

30-
setup_celery_tasks(app.state.celery_app)
29+
@property
30+
def celery_app(self):
31+
return self._celery_app
32+
33+
def create_task(self, task):
34+
self._celery_app.task()(task)
35+
36+
def send_task(self, name: str, **kwargs) -> AsyncResult:
37+
return self._celery_app.send_task(name, **kwargs)
38+
39+
def cancel_task(self, task_id: str):
40+
self._celery_app.control.revoke(task_id)
41+
42+
43+
# TODO: use new FastAPI lifespan
44+
def setup_celery(app: FastAPI) -> None:
45+
async def on_startup() -> None:
46+
settings = get_application_settings(app)
47+
assert settings.STORAGE_REDIS
48+
49+
task_queue = CeleryTaskQueue(settings)
50+
task_queue.create_task(archive)
51+
52+
app.state.task_queue = task_queue
3153

3254
# FIXME: Experiment: to start worker in a separate process
3355
def worker_process():
@@ -44,5 +66,5 @@ async def on_shutdown() -> None:
4466
app.add_event_handler("shutdown", on_shutdown)
4567

4668

47-
def get_celery_app(app: FastAPI) -> Celery:
48-
return cast(Celery, app.state.celery_app)
69+
def get_celery_task_queue(app: FastAPI) -> CeleryTaskQueue:
70+
return cast(CeleryTaskQueue, app.state.task_queue)
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
import logging
22

3-
from celery import Celery
4-
53
_log = logging.getLogger(__name__)
64

75

86
def archive(files: list[str]) -> None:
9-
_log.info(f"Archiving {files=}")
10-
11-
12-
def setup_celery_tasks(celery_app: Celery) -> None:
13-
celery_app.task()(archive)
7+
_log.info("Archiving: %s", ", ".join(files))

0 commit comments

Comments
 (0)