Skip to content

Commit ba8e89e

Browse files
committed
creating dask plugin
1 parent 49c6b80 commit ba8e89e

File tree

3 files changed

+74
-32
lines changed

3 files changed

+74
-32
lines changed
Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,88 @@
1-
import contextlib
21
import logging
3-
from typing import cast
42

53
import distributed
64
from models_library.rabbitmq_messages import RabbitMessageBase
7-
from servicelib.logging_utils import log_catch
85
from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive
96
from settings_library.rabbit import RabbitSettings
107

118
from .errors import ConfigurationError
129

13-
logger = logging.getLogger(__name__)
10+
_logger = logging.getLogger(__name__)
1411

1512

16-
async def on_startup(
17-
worker: distributed.Worker, rabbit_settings: RabbitSettings
18-
) -> None:
19-
worker.rabbitmq_client = None
20-
settings: RabbitSettings | None = rabbit_settings
21-
if not settings:
22-
logger.warning("Rabbit MQ client is de-activated in the settings")
23-
return
24-
await wait_till_rabbitmq_responsive(settings.dsn)
25-
worker.rabbitmq_client = RabbitMQClient(
26-
client_name="dask-sidecar", settings=settings
27-
)
13+
class RabbitMQPlugin(distributed.WorkerPlugin):
14+
"""Dask Worker Plugin for RabbitMQ integration"""
2815

16+
name = "rabbitmq_plugin"
17+
_client: RabbitMQClient | None = None
18+
_settings: RabbitSettings | None = None
2919

30-
async def on_shutdown(worker: distributed.Worker) -> None:
31-
if worker.rabbitmq_client:
32-
await worker.rabbitmq_client.close()
20+
def __init__(self, settings: RabbitSettings):
21+
self._settings = settings
3322

23+
async def setup(self, worker: distributed.Worker) -> None:
24+
"""Called when the plugin is attached to a worker"""
25+
_logger.info("Setting up RabbitMQ plugin")
26+
if not self._settings:
27+
_logger.warning("RabbitMQ client is de-activated (no settings provided)")
28+
return
3429

35-
def get_rabbitmq_client(worker: distributed.Worker) -> RabbitMQClient:
36-
if not worker.rabbitmq_client:
37-
raise ConfigurationError(
38-
msg="RabbitMQ client is not available. Please check the configuration."
30+
await wait_till_rabbitmq_responsive(self._settings.dsn)
31+
self._client = RabbitMQClient(
32+
client_name="dask-sidecar", settings=self._settings
3933
)
40-
return cast(RabbitMQClient, worker.rabbitmq_client)
34+
_logger.info("RabbitMQ client initialized successfully")
4135

36+
async def teardown(self, worker: distributed.Worker) -> None:
37+
"""Called when the worker shuts down or the plugin is removed"""
38+
_logger.info("Tearing down RabbitMQ plugin")
39+
if self._client:
40+
await self._client.close()
41+
self._client = None
42+
_logger.info("RabbitMQ client closed")
4243

43-
async def post_message(worker: distributed.Worker, message: RabbitMessageBase) -> None:
44-
with log_catch(logger, reraise=False), contextlib.suppress(ConfigurationError):
45-
# NOTE: if rabbitmq was not initialized the error does not need to flood the logs
46-
await get_rabbitmq_client(worker).publish(message.channel_name, message)
44+
def get_client(self) -> RabbitMQClient:
45+
"""Returns the RabbitMQ client or raises an error if not available"""
46+
if not self._client:
47+
raise ConfigurationError(
48+
msg="RabbitMQ client is not available. Please check the configuration."
49+
)
50+
return self._client
51+
52+
async def publish(self, channel_name: str, message: RabbitMessageBase) -> None:
53+
"""Publishes a message to the specified channel"""
54+
if self._client:
55+
await self._client.publish(channel_name, message)
56+
57+
58+
# async def on_startup(
59+
# worker: distributed.Worker, rabbit_settings: RabbitSettings
60+
# ) -> None:
61+
# worker.rabbitmq_client = None
62+
# settings: RabbitSettings | None = rabbit_settings
63+
# if not settings:
64+
# __logger.warning("Rabbit MQ client is de-activated in the settings")
65+
# return
66+
# await wait_till_rabbitmq_responsive(settings.dsn)
67+
# worker.rabbitmq_client = RabbitMQClient(
68+
# client_name="dask-sidecar", settings=settings
69+
# )
70+
71+
72+
# async def on_shutdown(worker: distributed.Worker) -> None:
73+
# if worker.rabbitmq_client:
74+
# await worker.rabbitmq_client.close()
75+
76+
77+
# def get_rabbitmq_client(worker: distributed.Worker) -> RabbitMQClient:
78+
# if not worker.rabbitmq_client:
79+
# raise ConfigurationError(
80+
# msg="RabbitMQ client is not available. Please check the configuration."
81+
# )
82+
# return cast(RabbitMQClient, worker.rabbitmq_client)
83+
84+
85+
# async def post_message(worker: distributed.Worker, message: RabbitMessageBase) -> None:
86+
# with log_catch(__logger, reraise=False), contextlib.suppress(ConfigurationError):
87+
# # NOTE: if rabbitmq was not initialized the error does not need to flood the logs
88+
# await get_rabbitmq_client(worker).publish(message.channel_name, message)

services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from ._meta import print_dask_sidecar_banner
1919
from .computational_sidecar.core import ComputationalSidecar
2020
from .dask_utils import TaskPublisher, get_current_task_resources, monitor_task_abortion
21+
from .rabbitmq import RabbitMQPlugin
2122
from .rabbitmq import on_shutdown as shutdown_rabbitmq
22-
from .rabbitmq import on_startup as setup_rabbitmq
2323
from .settings import ApplicationSettings
2424

2525
_logger = logging.getLogger(__name__)
@@ -83,7 +83,7 @@ async def dask_setup(worker: distributed.Worker) -> None:
8383
"We do have a running loop in the main thread: %s", f"{loop=}"
8484
)
8585
if settings.DASK_SIDECAR_RABBITMQ:
86-
await setup_rabbitmq(worker, settings.DASK_SIDECAR_RABBITMQ)
86+
await worker.plugin_add(RabbitMQPlugin(settings.DASK_SIDECAR_RABBITMQ))
8787

8888

8989
async def dask_teardown(worker: distributed.Worker) -> None:

services/dask-sidecar/tests/unit/test_deployment.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from typing import Any, Dict
1+
from typing import Any
22

33

44
def test_sidecar_service_is_deployed_in_global_mode(
5-
simcore_docker_compose: Dict[str, Any]
5+
simcore_docker_compose: dict[str, Any],
66
):
77
dask_sidecar_deploy_config = simcore_docker_compose["services"]["dask-sidecar"][
88
"deploy"

0 commit comments

Comments
 (0)