Skip to content

Commit f225219

Browse files
committed
works
1 parent 00aa81d commit f225219

File tree

3 files changed

+44
-17
lines changed

3 files changed

+44
-17
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/utils/dask.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from distributed.worker_state_machine import TaskState
1919
from models_library.progress_bar import ProgressReport
2020
from models_library.rabbitmq_messages import LoggerRabbitMessage
21-
from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch
21+
from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context
2222

2323
from ..rabbitmq_plugin import get_rabbitmq_client
2424

@@ -170,11 +170,16 @@ async def periodicaly_check_if_aborted(task_name: str) -> None:
170170
await periodically_checking_task
171171

172172

173-
async def publish_event(dask_client: distributed.Client, event: BaseTaskEvent) -> None:
173+
async def publish_event(
174+
event: BaseTaskEvent,
175+
) -> None:
174176
"""never reraises, only CancellationError"""
175-
with log_catch(_logger, reraise=False):
177+
worker = get_worker()
178+
_logger.debug("current worker %s", f"{worker=}")
179+
with (
180+
log_catch(_logger, reraise=False),
181+
log_context(_logger, logging.DEBUG, msg=f"publishing {event=}"),
182+
):
176183
await maybe_await(
177-
dask_client.log_event(
178-
TaskProgressEvent.topic_name(), event.model_dump_json()
179-
)
184+
worker.log_event(TaskProgressEvent.topic_name(), event.model_dump_json())
180185
)

services/dask-sidecar/tests/unit/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def app_environment(
9999
model_dump_with_secrets(rabbit_service, show_secrets=True)
100100
),
101101
"SC_BOOT_MODE": "debug",
102-
"SIDECAR_LOGLEVEL": "DEBUG",
102+
"DASK_SIDECAR_LOGLEVEL": "DEBUG",
103103
"SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME": "simcore_computational_shared_data",
104104
"SIDECAR_COMP_SERVICES_SHARED_FOLDER": f"{shared_data_folder}",
105105
},

services/dask-sidecar/tests/unit/test_utils_dask.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@
66

77
import asyncio
88
import concurrent.futures
9+
import logging
910
import time
1011
from collections.abc import AsyncIterator, Callable, Coroutine
1112
from typing import Any
13+
from unittest import mock
1214

1315
import distributed
1416
import pytest
1517
from dask_task_models_library.container_tasks.errors import TaskCancelledError
1618
from dask_task_models_library.container_tasks.events import TaskProgressEvent
1719
from dask_task_models_library.container_tasks.io import TaskCancelEventName
1820
from dask_task_models_library.container_tasks.protocol import TaskOwner
21+
from pytest_simcore.helpers.logging_tools import log_context
1922
from simcore_service_dask_sidecar.utils.dask import (
2023
_DEFAULT_MAX_RESOURCES,
2124
TaskPublisher,
@@ -38,31 +41,50 @@
3841
]
3942

4043

44+
@pytest.mark.parametrize("handler", [mock.Mock(), mock.AsyncMock()])
4145
async def test_publish_event(
42-
dask_client: distributed.Client, job_id: str, task_owner: TaskOwner
46+
dask_client: distributed.Client,
47+
job_id: str,
48+
task_owner: TaskOwner,
49+
monkeypatch: pytest.MonkeyPatch,
50+
handler: mock.Mock | mock.AsyncMock,
4351
):
52+
monkeypatch.setenv("DASK_SIDECAR_LOGLEVEL", "DEBUG")
4453
event_to_publish = TaskProgressEvent(
4554
job_id=job_id,
4655
msg="the log",
4756
progress=1,
4857
task_owner=task_owner,
4958
)
5059

51-
def handler(event: tuple) -> None:
52-
print("received event", event)
53-
assert isinstance(event, tuple)
54-
received_task_log_event = TaskProgressEvent.model_validate_json(event[1])
55-
assert received_task_log_event == event_to_publish
56-
60+
# NOTE: only 1 handler per topic is allowed
5761
dask_client.subscribe_topic(TaskProgressEvent.topic_name(), handler)
5862

59-
await publish_event(dask_client, event=event_to_publish)
63+
def _worker_task() -> int:
64+
with log_context(logging.INFO, "_worker_task"):
65+
66+
async def _() -> int:
67+
with log_context(logging.INFO, "_worker_task_async"):
68+
await publish_event(event_to_publish)
69+
return 2
70+
71+
return asyncio.run(_())
72+
73+
future = dask_client.submit(_worker_task)
74+
assert future.result(timeout=DASK_TESTING_TIMEOUT_S) == 2
75+
6076
for attempt in Retrying(
61-
wait=wait_fixed(0.2), stop=stop_after_delay(15), reraise=True
77+
wait=wait_fixed(0.2),
78+
stop=stop_after_delay(15),
79+
reraise=True,
80+
retry=retry_if_exception_type(AssertionError),
6281
):
6382
with attempt:
6483
events = dask_client.get_events(TaskProgressEvent.topic_name())
65-
assert events is not None
84+
assert events is not None, "No events received"
85+
assert isinstance(events, tuple)
86+
87+
handler.assert_called_with(events[-1])
6688

6789
assert isinstance(events, tuple)
6890
assert len(events) == 1

0 commit comments

Comments
 (0)