Skip to content

Commit 4c6906a

Browse files
committed
added rabbitmq client
1 parent da1bc2c commit 4c6906a

File tree

4 files changed

+77
-21
lines changed

4 files changed

+77
-21
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/errors.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
from common_library.errors_classes import OsparcErrorMixin
2-
3-
4-
class ComputationalSidecarRuntimeError(OsparcErrorMixin, RuntimeError):
5-
...
1+
from ..errors import ComputationalSidecarRuntimeError
62

73

84
class ServiceBadFormattedOutputError(ComputationalSidecarRuntimeError):
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from common_library.errors_classes import OsparcErrorMixin
2+
3+
4+
class ComputationalSidecarRuntimeError(OsparcErrorMixin, RuntimeError): ...
5+
6+
7+
class ConfigurationError(ComputationalSidecarRuntimeError):
8+
msg_template: str = "Application misconfiguration: {msg}"
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import contextlib
2+
import logging
3+
from typing import cast
4+
5+
import distributed
6+
from models_library.rabbitmq_messages import RabbitMessageBase
7+
from servicelib.logging_utils import log_catch
8+
from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive
9+
from settings_library.rabbit import RabbitSettings
10+
11+
from .errors import ConfigurationError
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
async def on_startup(
17+
worker: distributed.Worker, rabbit_settings: RabbitSettings
18+
) -> None:
19+
worker.rabbitmq_client = None
20+
settings: RabbitSettings | None = rabbit_settings
21+
if not settings:
22+
logger.warning("Rabbit MQ client is de-activated in the settings")
23+
return
24+
await wait_till_rabbitmq_responsive(settings.dsn)
25+
worker.rabbitmq_client = RabbitMQClient(
26+
client_name="dask-sidecar", settings=settings
27+
)
28+
29+
30+
async def on_shutdown(worker: distributed.Worker) -> None:
31+
if worker.rabbitmq_client:
32+
await worker.rabbitmq_client.close()
33+
34+
35+
def get_rabbitmq_client(worker: distributed.Worker) -> RabbitMQClient:
36+
if not worker.rabbitmq_client:
37+
raise ConfigurationError(
38+
msg="RabbitMQ client is not available. Please check the configuration."
39+
)
40+
return cast(RabbitMQClient, worker.rabbitmq_client)
41+
42+
43+
async def post_message(worker: distributed.Worker, message: RabbitMessageBase) -> None:
44+
with log_catch(logger, reraise=False), contextlib.suppress(ConfigurationError):
45+
# NOTE: if rabbitmq was not initialized the error does not need to flood the logs
46+
await get_rabbitmq_client(worker).publish(message.channel_name, message)

services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
ContainerTaskParameters,
1212
LogFileUploadURL,
1313
)
14-
from distributed.worker import logger
15-
from servicelib.logging_utils import config_all_loggers
14+
from distributed.worker import logger as dask_worker_logger
15+
from servicelib.logging_utils import config_all_loggers, log_context
1616
from settings_library.s3 import S3Settings
1717

1818
from ._meta import print_dask_sidecar_banner
1919
from .computational_sidecar.core import ComputationalSidecar
2020
from .dask_utils import TaskPublisher, get_current_task_resources, monitor_task_abortion
21+
from .rabbitmq import on_shutdown as shutdown_rabbitmq
22+
from .rabbitmq import on_startup as setup_rabbitmq
2123
from .settings import ApplicationSettings
2224

2325
_logger = logging.getLogger(__name__)
@@ -40,7 +42,7 @@ def __init__(self, worker: distributed.Worker):
4042

4143
def exit_gracefully(self, *_args):
4244
tasks = asyncio.all_tasks()
43-
logger.warning(
45+
dask_worker_logger.warning(
4446
"Application shutdown detected!\n %s",
4547
pformat([t.get_name() for t in tasks]),
4648
)
@@ -55,9 +57,9 @@ async def dask_setup(worker: distributed.Worker) -> None:
5557
"""This is a special function recognized by the dask worker when starting with flag --preload"""
5658
settings = ApplicationSettings.create_from_envs()
5759
# set up logging
58-
logging.basicConfig(level=settings.LOG_LEVEL.value)
59-
logging.root.setLevel(level=settings.LOG_LEVEL.value)
60-
logger.setLevel(level=settings.LOG_LEVEL.value)
60+
logging.basicConfig(level=settings.DASK_SIDECAR_LOGLEVEL.value)
61+
logging.root.setLevel(level=settings.DASK_SIDECAR_LOGLEVEL.value)
62+
dask_worker_logger.setLevel(level=settings.DASK_SIDECAR_LOGLEVEL.value)
6163
# NOTE: Dask attaches a StreamHandler to the logger in distributed
6264
# removing them solves dual propagation of logs
6365
for handler in logging.getLogger("distributed").handlers:
@@ -68,21 +70,25 @@ async def dask_setup(worker: distributed.Worker) -> None:
6870
tracing_settings=None, # no tracing for dask sidecar
6971
)
7072

71-
logger.info("Setting up worker...")
72-
logger.info("Settings: %s", pformat(settings.model_dump()))
73+
with log_context(dask_worker_logger, logging.INFO, "Launch dask worker"):
74+
dask_worker_logger.info("app settings: %s", settings.model_dump_json(indent=1))
7375

74-
print_dask_sidecar_banner()
76+
print_dask_sidecar_banner()
7577

76-
if threading.current_thread() is threading.main_thread():
77-
loop = asyncio.get_event_loop()
78-
logger.info("We do have a running loop in the main thread: %s", f"{loop=}")
78+
if threading.current_thread() is threading.main_thread():
79+
GracefulKiller(worker)
7980

80-
if threading.current_thread() is threading.main_thread():
81-
GracefulKiller(worker)
81+
loop = asyncio.get_event_loop()
82+
dask_worker_logger.info(
83+
"We do have a running loop in the main thread: %s", f"{loop=}"
84+
)
85+
if settings.DASK_SIDECAR_RABBITMQ:
86+
await setup_rabbitmq(worker, settings.DASK_SIDECAR_RABBITMQ)
8287

8388

84-
async def dask_teardown(_worker: distributed.Worker) -> None:
85-
logger.warning("Tearing down worker!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
89+
async def dask_teardown(worker: distributed.Worker) -> None:
90+
with log_context(dask_worker_logger, logging.INFO, "tear down dask worker"):
91+
await shutdown_rabbitmq(worker)
8692

8793

8894
async def _run_computational_sidecar_async(

0 commit comments

Comments
 (0)