Skip to content

Commit 0e890d7

Browse files
fix: add Celery worker setup
1 parent f196f2e commit 0e890d7

File tree

4 files changed

+62
-0
lines changed

4 files changed

+62
-0
lines changed

services/notifications/src/simcore_service_notifications/modules/__init__.py

Whitespace-only changes.

services/notifications/src/simcore_service_notifications/modules/celery/__init__.py

Whitespace-only changes.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import logging
2+
3+
from celery import Celery # type: ignore[import-untyped]
4+
from celery_library.types import register_celery_types
5+
from servicelib.logging_utils import log_context
6+
7+
_logger = logging.getLogger(__name__)
8+
9+
10+
def setup_worker_tasks(app: Celery) -> None:
11+
register_celery_types()
12+
# TODO: add more types as needed
13+
# register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody)
14+
15+
with log_context(_logger, logging.INFO, msg="worker tasks registration"):
16+
...
17+
# TODO: register tasks here
18+
# register_task(app, send_email_notification)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import logging
2+
from functools import partial
3+
4+
from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped]
5+
from celery_library.common import create_app as create_celery_app
6+
from celery_library.signals import (
7+
on_worker_init,
8+
on_worker_shutdown,
9+
)
10+
from servicelib.fastapi.celery.app_server import FastAPIAppServer
11+
from servicelib.logging_utils import config_all_loggers
12+
13+
from ...core.application import create_app
14+
from ...core.settings import ApplicationSettings
15+
from .tasks import setup_worker_tasks
16+
17+
_settings = ApplicationSettings.create_from_envs()
18+
19+
logging.basicConfig(level=_settings.log_level) # NOSONAR
20+
logging.root.setLevel(_settings.log_level)
21+
config_all_loggers(
22+
log_format_local_dev_enabled=_settings.NOTIFICATIONS_LOG_FORMAT_LOCAL_DEV_ENABLED,
23+
logger_filter_mapping=_settings.NOTIFICATIONS_LOG_FILTER_MAPPING,
24+
tracing_settings=_settings.NOTIFICATIONS_TRACING,
25+
)
26+
27+
28+
assert _settings.NOTIFICATIONS_CELERY # nosec
29+
app = create_celery_app(_settings.NOTIFICATIONS_CELERY)
30+
31+
app_server = FastAPIAppServer(app=create_app())
32+
33+
34+
def worker_init_wrapper(sender, **_kwargs):
35+
assert _settings.NOTIFICATIONS_CELERY # nosec
36+
return partial(on_worker_init, app_server, _settings.NOTIFICATIONS_CELERY)(
37+
sender, **_kwargs
38+
)
39+
40+
41+
worker_init.connect(worker_init_wrapper)
42+
worker_shutdown.connect(on_worker_shutdown)
43+
44+
setup_worker_tasks(app)

0 commit comments

Comments
 (0)