Skip to content

Commit 2ebb69b

Browse files
committed
tests are passing
1 parent 4dcc4e5 commit 2ebb69b

File tree

7 files changed

+50
-22
lines changed

7 files changed

+50
-22
lines changed

services/dask-sidecar/docker/boot.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ else
117117
fi
118118

119119
# GPUs
120-
num_gpus=$(python -c "from simcore_service_dask_sidecar.utils import num_available_gpus; print(num_available_gpus());")
120+
num_gpus=$(python -c "from simcore_service_dask_sidecar.utils.gpus import num_available_gpus; print(num_available_gpus());")
121121

122122
# RAM (is computed similarly as the default dask-sidecar computation)
123123
_value=$(python -c "import psutil; print(int(psutil.virtual_memory().total * $num_cpus/$(nproc)))")
@@ -128,7 +128,7 @@ else
128128

129129
# add the GPUs if there are any
130130
if [ "$num_gpus" -gt 0 ]; then
131-
total_vram=$(python -c "from simcore_service_dask_sidecar.utils import video_memory; print(video_memory());")
131+
total_vram=$(python -c "from simcore_service_dask_sidecar.utils.gpus import video_memory; print(video_memory());")
132132
resources="$resources,GPU=$num_gpus,VRAM=$total_vram"
133133
fi
134134

services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_plugin.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
import asyncio
12
import logging
3+
from asyncio import AbstractEventLoop
24
from collections.abc import Awaitable
35

46
import distributed
5-
from servicelib.logging_utils import log_context
7+
from servicelib.logging_utils import log_catch, log_context
68
from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive
79
from settings_library.rabbit import RabbitSettings
810

@@ -15,6 +17,7 @@ class RabbitMQPlugin(distributed.WorkerPlugin):
1517
"""Dask Worker Plugin for RabbitMQ integration"""
1618

1719
name = "rabbitmq_plugin"
20+
_loop: AbstractEventLoop | None = None
1821
_client: RabbitMQClient | None = None
1922
_settings: RabbitSettings | None = None
2023

@@ -36,6 +39,7 @@ async def _() -> None:
3639
logging.INFO,
3740
f"RabbitMQ client initialization for worker {worker.address}",
3841
):
42+
self._loop = asyncio.get_event_loop()
3943
await wait_till_rabbitmq_responsive(self._settings.dsn)
4044
self._client = RabbitMQClient(
4145
client_name="dask-sidecar", settings=self._settings
@@ -53,7 +57,15 @@ async def _() -> None:
5357
f"RabbitMQ client teardown for worker {worker.address}",
5458
):
5559
if self._client:
56-
await self._client.close()
60+
current_loop = asyncio.get_event_loop()
61+
if self._loop != current_loop:
62+
_logger.warning(
63+
"RabbitMQ client is de-activated (loop mismatch)"
64+
)
65+
assert self._loop # nosec
66+
with log_catch(_logger, reraise=False):
67+
await asyncio.wait_for(self._client.close(), timeout=5.0)
68+
5769
self._client = None
5870

5971
return _()

services/dask-sidecar/tests/unit/test_cli.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
from simcore_service_dask_sidecar.settings import ApplicationSettings
1414
from typer.testing import CliRunner
1515

16+
pytest_simcore_core_services_selection = [
17+
"rabbit",
18+
]
19+
1620

1721
def test_cli_help_and_version(cli_runner: CliRunner):
1822
# invitations-maker --help

services/dask-sidecar/tests/unit/test_dask_utils.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66

77
import asyncio
88
import concurrent.futures
9-
import logging
109
import time
1110
from collections.abc import AsyncIterator, Callable, Coroutine
1211
from typing import Any
1312

1413
import distributed
1514
import pytest
1615
from dask_task_models_library.container_tasks.errors import TaskCancelledError
17-
from dask_task_models_library.container_tasks.events import TaskLogEvent
16+
from dask_task_models_library.container_tasks.events import TaskProgressEvent
1817
from dask_task_models_library.container_tasks.io import TaskCancelEventName
1918
from dask_task_models_library.container_tasks.protocol import TaskOwner
2019
from simcore_service_dask_sidecar.dask_utils import (
@@ -33,16 +32,20 @@
3332
DASK_TASK_STARTED_EVENT = "task_started"
3433
DASK_TESTING_TIMEOUT_S = 25
3534

35+
pytest_simcore_core_services_selection = [
36+
"rabbit",
37+
]
38+
3639

3740
def test_publish_event(
3841
dask_client: distributed.Client, job_id: str, task_owner: TaskOwner
3942
):
4043
dask_pub = distributed.Pub("some_topic", client=dask_client)
4144
dask_sub = distributed.Sub("some_topic", client=dask_client)
42-
event_to_publish = TaskLogEvent(
45+
event_to_publish = TaskProgressEvent(
4346
job_id=job_id,
44-
log="the log",
45-
log_level=logging.INFO,
47+
msg="the log",
48+
progress=1,
4649
task_owner=task_owner,
4750
)
4851
publish_event(dask_pub=dask_pub, event=event_to_publish)
@@ -53,7 +56,7 @@ def test_publish_event(
5356
message = dask_sub.get(timeout=DASK_TESTING_TIMEOUT_S)
5457
assert message is not None
5558
assert isinstance(message, str)
56-
received_task_log_event = TaskLogEvent.model_validate_json(message)
59+
received_task_log_event = TaskProgressEvent.model_validate_json(message)
5760
assert received_task_log_event == event_to_publish
5861

5962

@@ -62,8 +65,8 @@ async def test_publish_event_async(
6265
):
6366
dask_pub = distributed.Pub("some_topic", client=async_dask_client)
6467
dask_sub = distributed.Sub("some_topic", client=async_dask_client)
65-
event_to_publish = TaskLogEvent(
66-
job_id=job_id, log="the log", log_level=logging.INFO, task_owner=task_owner
68+
event_to_publish = TaskProgressEvent(
69+
job_id=job_id, msg="the log", progress=2, task_owner=task_owner
6770
)
6871
publish_event(dask_pub=dask_pub, event=event_to_publish)
6972

@@ -74,7 +77,7 @@ async def test_publish_event_async(
7477
assert isinstance(message, Coroutine)
7578
message = await message
7679
assert message is not None
77-
received_task_log_event = TaskLogEvent.model_validate_json(message)
80+
received_task_log_event = TaskProgressEvent.model_validate_json(message)
7881
assert received_task_log_event == event_to_publish
7982

8083

@@ -117,11 +120,10 @@ async def _dask_sub_consumer_task(sub: distributed.Sub) -> None:
117120

118121
async def _dask_publisher_task(pub: distributed.Pub) -> None:
119122
print("--> starting publisher task")
120-
for n in range(NUMBER_OF_MESSAGES):
121-
event_to_publish = TaskLogEvent(
123+
for _ in range(NUMBER_OF_MESSAGES):
124+
event_to_publish = TaskProgressEvent(
122125
job_id=job_id,
123-
log=f"the log {n}",
124-
log_level=logging.INFO,
126+
progress=0.5,
125127
task_owner=task_owner,
126128
)
127129
publish_event(dask_pub=pub, event=event_to_publish)

services/dask-sidecar/tests/unit/test_tasks.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import logging
1010
import re
1111
import threading
12-
from collections.abc import AsyncIterator, Callable, Coroutine, Iterable
12+
from collections.abc import AsyncIterator, Callable, Iterable
1313

1414
# copied out from dask
1515
from dataclasses import dataclass
@@ -613,7 +613,7 @@ def test_run_multiple_computational_sidecar_dask(
613613

614614
results = dask_client.gather(futures)
615615
assert results
616-
assert not isinstance(results, Coroutine)
616+
assert isinstance(results, list)
617617
# for result in results:
618618
# check that the task produce the expected data, not less not more
619619
for output_data in results:
@@ -708,6 +708,7 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
708708
progress_sub: distributed.Sub,
709709
mocked_get_image_labels: mock.Mock,
710710
log_rabbit_client_parser: mock.AsyncMock,
711+
task_owner: TaskOwner,
711712
):
712713
mocked_get_image_labels.assert_not_called()
713714
NUMBER_OF_LOGS = 200
@@ -745,14 +746,19 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
745746

746747
await asyncio.sleep(5)
747748
assert log_rabbit_client_parser.called
749+
748750
worker_logs = [
749751
message
750752
for msg in log_rabbit_client_parser.call_args_list
751753
for message in LoggerRabbitMessage.model_validate_json(msg.args[0]).messages
752754
]
753755
# check all the awaited logs are in there
754756
filtered_worker_logs = filter(lambda log: "This is iteration" in log, worker_logs)
755-
assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS
757+
assert (
758+
len(list(filtered_worker_logs)) == (2 * NUMBER_OF_LOGS)
759+
if task_owner.has_parent
760+
else NUMBER_OF_LOGS
761+
)
756762
mocked_get_image_labels.assert_called()
757763

758764

services/dask-sidecar/tests/unit/test_utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@
1010
import pytest
1111
from pytest_mock.plugin import MockerFixture
1212
from pytest_simcore.helpers.typing_env import EnvVarsDict
13-
from simcore_service_dask_sidecar.utils import num_available_gpus
13+
from simcore_service_dask_sidecar.utils.gpus import num_available_gpus
14+
15+
pytest_simcore_core_services_selection = [
16+
"rabbit",
17+
]
1418

1519

1620
@pytest.fixture
1721
def mock_aiodocker(mocker: MockerFixture) -> mock.MagicMock:
1822
return mocker.patch(
19-
"simcore_service_dask_sidecar.utils.aiodocker.Docker", autospec=True
23+
"simcore_service_dask_sidecar.utils.gpus.aiodocker.Docker", autospec=True
2024
)
2125

2226

0 commit comments

Comments
 (0)