Skip to content

Commit 3efc130

Browse files
committed
change boot process for celery worker
1 parent 4dcf9bd commit 3efc130

File tree

3 files changed

+30
-25
lines changed

3 files changed

+30
-25
lines changed

services/api-server/docker/boot.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,17 @@ if [ "${API_SERVER_WORKER_MODE}" = "true" ]; then
4848
--recursive \
4949
-- \
5050
celery \
51-
--app=simcore_service_api_server.celery.worker_main:app \
51+
--app=boot_celery_worker:app \
52+
--workdir=services/api-server/docker \
5253
worker --pool=threads \
5354
--loglevel="${API_SERVER_LOGLEVEL}" \
5455
--concurrency="${CELERY_CONCURRENCY}" \
5556
--hostname="${API_SERVER_WORKER_NAME}" \
5657
--queues="${CELERY_QUEUES:-default}"
5758
else
5859
exec celery \
59-
--app=simcore_service_api_server.celery.worker_main:app \
60+
--app=boot_celery_worker:app \
61+
--workdir=services/api-server/docker \
6062
worker --pool=threads \
6163
--loglevel="${API_SERVER_LOGLEVEL}" \
6264
--concurrency="${CELERY_CONCURRENCY}" \
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped]
2+
from celery_library.signals import (
3+
on_worker_shutdown,
4+
)
5+
from simcore_service_api_server.celery.worker_main import get_app, worker_init_wrapper
6+
7+
app = get_app()
8+
9+
worker_init.connect(worker_init_wrapper)
10+
worker_shutdown.connect(on_worker_shutdown)

services/api-server/src/simcore_service_api_server/celery/worker_main.py

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
from functools import partial
44

5-
from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped]
65
from celery_library.common import create_app as create_celery_app
76
from celery_library.signals import (
87
on_worker_init,
9-
on_worker_shutdown,
108
)
119
from servicelib.fastapi.celery.app_server import FastAPIAppServer
1210
from servicelib.logging_utils import setup_loggers
@@ -16,34 +14,29 @@
1614
from .worker_tasks.tasks import setup_worker_tasks
1715

1816

19-
def _get_settings() -> ApplicationSettings:
20-
return ApplicationSettings.create_from_envs()
17+
def get_app():
18+
_settings = ApplicationSettings.create_from_envs()
2119

20+
setup_loggers(
21+
log_format_local_dev_enabled=_settings.API_SERVER_LOG_FORMAT_LOCAL_DEV_ENABLED,
22+
logger_filter_mapping=_settings.API_SERVER_LOG_FILTER_MAPPING,
23+
tracing_settings=_settings.API_SERVER_TRACING,
24+
log_base_level=_settings.log_level,
25+
noisy_loggers=None,
26+
)
2227

23-
_settings = _get_settings()
24-
25-
setup_loggers(
26-
log_format_local_dev_enabled=_settings.API_SERVER_LOG_FORMAT_LOCAL_DEV_ENABLED,
27-
logger_filter_mapping=_settings.API_SERVER_LOG_FILTER_MAPPING,
28-
tracing_settings=_settings.API_SERVER_TRACING,
29-
log_base_level=_settings.log_level,
30-
noisy_loggers=None,
31-
)
32-
33-
assert _settings.API_SERVER_CELERY # nosec
34-
app = create_celery_app(_settings.API_SERVER_CELERY)
28+
assert _settings.API_SERVER_CELERY # nosec
29+
app = create_celery_app(_settings.API_SERVER_CELERY)
30+
setup_worker_tasks(app)
3531

36-
app_server = FastAPIAppServer(app=create_app(_settings))
32+
return app
3733

3834

3935
def worker_init_wrapper(sender, **_kwargs):
36+
_settings = ApplicationSettings.create_from_envs()
4037
assert _settings.API_SERVER_CELERY # nosec
38+
app_server = FastAPIAppServer(app=create_app(_settings))
39+
4140
return partial(on_worker_init, app_server, _settings.API_SERVER_CELERY)(
4241
sender, **_kwargs
4342
)
44-
45-
46-
worker_init.connect(worker_init_wrapper)
47-
worker_shutdown.connect(on_worker_shutdown)
48-
49-
setup_worker_tasks(app)

0 commit comments

Comments
 (0)