File tree Expand file tree Collapse file tree 3 files changed +12
-15
lines changed
src/simcore_service_storage/modules/celery Expand file tree Collapse file tree 3 files changed +12
-15
lines changed Original file line number Diff line number Diff line change @@ -56,10 +56,10 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
5656 --log-level \" ${SERVER_LOG_LEVEL} \"
5757 "
5858else
59- if [ " ${STORAGE_WORKER_MODE} " = " 1 " ]; then
59+ if [ " ${STORAGE_WORKER_MODE} " = " true " ]; then
6060 exec celery \
6161 --app=simcore_service_storage.modules.celery.worker_main:app \
62- worker --pool=threads \
62+ worker \
6363 --loglevel=" ${SERVER_LOG_LEVEL} " \
6464 --hostname=" ${HOSTNAME} "
6565 else
Original file line number Diff line number Diff line change 33from celery import Celery
44from fastapi import FastAPI
55
6+ from ...core .settings import ApplicationSettings
7+ from ._common import create_app
68from .client import CeleryTaskQueueClient
79from .worker import CeleryTaskQueueWorker
810
1113_EVENT_LOOP_KEY = "loop"
1214
1315
16+ def create_celery_app_worker (settings : ApplicationSettings ) -> Celery :
17+ celery_app = create_app (settings )
18+ celery_app .conf [_WORKER_KEY ] = CeleryTaskQueueWorker (celery_app )
19+ return celery_app
20+
21+
1422def get_celery_app (fastapi : FastAPI ) -> Celery :
1523 celery = fastapi .state .celery_app
1624 assert isinstance (celery , Celery )
@@ -41,10 +49,6 @@ def get_celery_worker(celery_app: Celery) -> CeleryTaskQueueWorker:
4149 return worker
4250
4351
44- def set_celery_worker (celery_app : Celery , celery_worker : CeleryTaskQueueWorker ) -> None :
45- celery_app .conf [_WORKER_KEY ] = celery_worker
46-
47-
4852def get_event_loop (celery_app : Celery ) -> AbstractEventLoop : # nosec
4953 loop = celery_app .conf [_EVENT_LOOP_KEY ]
5054 assert isinstance (loop , AbstractEventLoop )
Original file line number Diff line number Diff line change 1212
1313from ...core .application import create_app
1414from ...core .settings import ApplicationSettings
15- from ._common import create_app as create_celery_app
16- from ._utils import set_celery_worker
17- from .worker import CeleryTaskQueueWorker
15+ from ._utils import create_celery_app_worker
1816
1917_settings = ApplicationSettings .create_from_envs ()
2018
@@ -77,9 +75,4 @@ async def shutdown():
7775 asyncio .run_coroutine_threadsafe (shutdown (), loop )
7876
7977
80- celery_app = create_celery_app (_settings )
81-
82- celery_worker = CeleryTaskQueueWorker (celery_app )
83- set_celery_worker (celery_app , CeleryTaskQueueWorker (celery_app ))
84-
85- app = celery_app
78+ app = create_celery_app_worker (_settings )
You can’t perform that action at this time.
0 commit comments