Skip to content

Commit 328c574

Browse files
committed
renaming and testing
1 parent b6c9903 commit 328c574

File tree

5 files changed

+103
-98
lines changed

5 files changed

+103
-98
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq.py

Lines changed: 0 additions & 88 deletions
This file was deleted.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import logging
2+
3+
import distributed
4+
from models_library.rabbitmq_messages import RabbitMessageBase
5+
from servicelib.logging_utils import log_catch, log_context
6+
from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive
7+
from settings_library.rabbit import RabbitSettings
8+
9+
from .errors import ConfigurationError
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
14+
class RabbitMQPlugin(distributed.WorkerPlugin):
15+
"""Dask Worker Plugin for RabbitMQ integration"""
16+
17+
name = "rabbitmq_plugin"
18+
_client: RabbitMQClient | None = None
19+
_settings: RabbitSettings | None = None
20+
21+
def __init__(self, settings: RabbitSettings):
22+
self._settings = settings
23+
24+
async def setup(self, worker: distributed.Worker) -> None:
25+
"""Called when the plugin is attached to a worker"""
26+
if not self._settings:
27+
_logger.warning("RabbitMQ client is de-activated (no settings provided)")
28+
return
29+
30+
with log_context(
31+
_logger,
32+
logging.INFO,
33+
f"RabbitMQ client initialization for worker {worker.address}",
34+
):
35+
await wait_till_rabbitmq_responsive(self._settings.dsn)
36+
self._client = RabbitMQClient(
37+
client_name="dask-sidecar", settings=self._settings
38+
)
39+
40+
async def teardown(self, worker: distributed.Worker) -> None:
41+
"""Called when the worker shuts down or the plugin is removed"""
42+
with log_context(
43+
_logger,
44+
logging.INFO,
45+
f"RabbitMQ client teardown for worker {worker.address}",
46+
):
47+
if self._client:
48+
await self._client.close()
49+
self._client = None
50+
51+
def get_client(self) -> RabbitMQClient:
52+
"""Returns the RabbitMQ client or raises an error if not available"""
53+
if not self._client:
54+
raise ConfigurationError(
55+
msg="RabbitMQ client is not available. Please check the configuration."
56+
)
57+
return self._client
58+
59+
async def publish(self, *, channel_name: str, message: RabbitMessageBase) -> None:
60+
"""Publishes a message to the specified channel"""
61+
with log_catch(_logger, reraise=False):
62+
if self._client:
63+
await self._client.publish(channel_name, message)

services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ._meta import print_dask_sidecar_banner
1919
from .computational_sidecar.core import ComputationalSidecar
2020
from .dask_utils import TaskPublisher, get_current_task_resources, monitor_task_abortion
21-
from .rabbitmq import RabbitMQPlugin
21+
from .rabbitmq_plugin import RabbitMQPlugin
2222
from .settings import ApplicationSettings
2323

2424
_logger = logging.getLogger(__name__)
@@ -41,7 +41,7 @@ def __init__(self, worker: distributed.Worker):
4141

4242
def exit_gracefully(self, *_args):
4343
tasks = asyncio.all_tasks()
44-
dask_worker_logger.warning(
44+
_logger.warning(
4545
"Application shutdown detected!\n %s",
4646
pformat([t.get_name() for t in tasks]),
4747
)
@@ -69,24 +69,22 @@ async def dask_setup(worker: distributed.Worker) -> None:
6969
tracing_settings=None, # no tracing for dask sidecar
7070
)
7171

72-
with log_context(dask_worker_logger, logging.INFO, "Launch dask worker"):
73-
dask_worker_logger.info("app settings: %s", settings.model_dump_json(indent=1))
72+
with log_context(_logger, logging.INFO, "Launch dask worker"):
73+
_logger.info("app settings: %s", settings.model_dump_json(indent=1))
7474

7575
print_dask_sidecar_banner()
7676

7777
if threading.current_thread() is threading.main_thread():
7878
GracefulKiller(worker)
7979

8080
loop = asyncio.get_event_loop()
81-
dask_worker_logger.info(
82-
"We do have a running loop in the main thread: %s", f"{loop=}"
83-
)
81+
_logger.info("We do have a running loop in the main thread: %s", f"{loop=}")
8482
if settings.DASK_SIDECAR_RABBITMQ:
8583
await worker.plugin_add(RabbitMQPlugin(settings.DASK_SIDECAR_RABBITMQ))
8684

8785

88-
async def dask_teardown(_worker: distributed.Worker) -> None:
89-
with log_context(dask_worker_logger, logging.INFO, "tear down dask worker"):
86+
async def dask_teardown(worker: distributed.Worker) -> None:
87+
with log_context(_logger, logging.INFO, f"tear down dask {worker.address}"):
9088
...
9189

9290

services/dask-sidecar/tests/unit/conftest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"pytest_simcore.docker_swarm",
3838
"pytest_simcore.environment_configs",
3939
"pytest_simcore.faker_users_data",
40+
"pytest_simcore.rabbit_service",
4041
"pytest_simcore.repository_paths",
4142
]
4243

@@ -231,7 +232,7 @@ def creator() -> AnyUrl:
231232
open_file = fsspec.open(f"{new_remote_file}", mode="wt", **s3_storage_kwargs)
232233
with open_file as fp:
233234
fp.write( # type: ignore
234-
f"This is the file contents of file #'{(len(list_of_created_files)+1):03}'\n"
235+
f"This is the file contents of file #'{(len(list_of_created_files) + 1):03}'\n"
235236
)
236237
for s in faker.sentences(5):
237238
fp.write(f"{s}\n") # type: ignore
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import distributed
2+
import pytest
3+
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
4+
from pytest_simcore.helpers.typing_env import EnvVarsDict
5+
from settings_library.rabbit import RabbitSettings
6+
7+
# Selection of core and tool services started in this swarm fixture (integration)
8+
pytest_simcore_core_services_selection = [
9+
"rabbit",
10+
]
11+
12+
pytest_simcore_ops_services_selection = []
13+
14+
15+
@pytest.fixture
16+
def app_environment(
17+
app_environment: EnvVarsDict,
18+
monkeypatch: pytest.MonkeyPatch,
19+
rabbit_service: RabbitSettings,
20+
) -> EnvVarsDict:
21+
# configured as worker
22+
envs = setenvs_from_dict(
23+
monkeypatch,
24+
{
25+
"DASK_WORKER_RABBITMQ": rabbit_service.model_dump_json(),
26+
},
27+
)
28+
return app_environment | envs
29+
30+
31+
def test_rabbitmq_plugin_initializes(dask_client: distributed.Client): ...

0 commit comments

Comments
 (0)