Skip to content

Commit fa25ca2

Browse files
setup task worker
1 parent 9db3d5b commit fa25ca2

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

services/storage/src/simcore_service_storage/modules/celery/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async def get_uuids(self, task_context: TaskContext) -> set[TaskUUID]: ...
5858
async def remove(self, task_id: TaskID) -> None: ...
5959

6060
async def set_metadata(
61-
self, task_id: TaskID, task_data: TaskMetadata, expiry: timedelta
61+
self, task_id: TaskID, task_metadata: TaskMetadata, expiry: timedelta
6262
) -> None: ...
6363

6464
async def set_progress(self, task_id: TaskID, report: ProgressReport) -> None: ...

services/storage/src/simcore_service_storage/modules/celery/signals.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
from celery import Celery # type: ignore[import-untyped]
99
from fastapi import FastAPI
1010
from servicelib.logging_utils import log_context
11+
from servicelib.redis._client import RedisClientSDK
12+
from settings_library.redis import RedisDatabase
13+
from simcore_service_storage._meta import APP_NAME
1114

1215
from ...core.application import create_app
1316
from ...core.settings import ApplicationSettings
14-
from ...modules.celery import set_event_loop
17+
from ...modules.celery import RedisTaskInfoStore, set_event_loop
1518
from ...modules.celery.utils import (
1619
get_fastapi_app,
1720
set_celery_worker,
@@ -28,14 +31,30 @@
2831
def on_worker_init(sender, **_kwargs) -> None:
2932
startup_complete_event = threading.Event()
3033

31-
def _init_fastapi(startup_complete_event: threading.Event) -> None:
34+
def _init(startup_complete_event: threading.Event) -> None:
3235
loop = asyncio.new_event_loop()
3336
asyncio.set_event_loop(loop)
3437
shutdown_event = asyncio.Event()
3538

36-
fastapi_app = create_app(ApplicationSettings.create_from_envs())
39+
app_settings = ApplicationSettings.create_from_envs()
40+
fastapi_app = create_app(app_settings)
3741

38-
async def lifespan(
42+
assert app_settings.STORAGE_CELERY
43+
celery_settings = app_settings.STORAGE_CELERY
44+
45+
async def setup_task_worker():
46+
redis_client_sdk = RedisClientSDK(
47+
celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
48+
RedisDatabase.CELERY_TASKS
49+
),
50+
client_name=f"{APP_NAME}.celery_tasks",
51+
)
52+
53+
set_celery_worker(
54+
sender.app, CeleryTaskWorker(RedisTaskInfoStore(redis_client_sdk))
55+
)
56+
57+
async def fastapi_lifespan(
3958
startup_complete_event: threading.Event, shutdown_event: asyncio.Event
4059
) -> None:
4160
async with LifespanManager(
@@ -54,12 +73,14 @@ async def lifespan(
5473
set_event_loop(fastapi_app, loop)
5574

5675
set_fastapi_app(sender.app, fastapi_app)
57-
set_celery_worker(sender.app, CeleryTaskWorker(sender.app))
58-
loop.run_until_complete(lifespan(startup_complete_event, shutdown_event))
76+
loop.run_until_complete(setup_task_worker())
77+
loop.run_until_complete(
78+
fastapi_lifespan(startup_complete_event, shutdown_event)
79+
)
5980

6081
thread = threading.Thread(
6182
group=None,
62-
target=_init_fastapi,
83+
target=_init,
6384
name="fastapi_app",
6485
args=(startup_complete_event,),
6586
daemon=True,

0 commit comments

Comments
 (0)