Skip to content

Commit b99405f

Browse files
committed
publish_event is async
1 parent ffc10ca commit b99405f

File tree

2 files changed

+45
-33
lines changed

2 files changed

+45
-33
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/utils/dask.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Final
77

88
import distributed
9+
from common_library.async_tools import maybe_await
910
from dask_task_models_library.container_tasks.errors import TaskCancelledError
1011
from dask_task_models_library.container_tasks.events import (
1112
BaseTaskEvent,
@@ -169,7 +170,11 @@ async def periodicaly_check_if_aborted(task_name: str) -> None:
169170
await periodically_checking_task
170171

171172

172-
def publish_event(dask_pub: distributed.Pub, event: BaseTaskEvent) -> None:
173+
async def publish_event(dask_client: distributed.Client, event: BaseTaskEvent) -> None:
173174
"""never reraises, only CancellationError"""
174175
with log_catch(_logger, reraise=False):
175-
dask_pub.put(event.model_dump_json())
176+
await maybe_await(
177+
dask_client.log_event(
178+
TaskProgressEvent.topic_name(), event.model_dump_json()
179+
)
180+
)

services/dask-sidecar/tests/unit/test_utils_dask.py

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
monitor_task_abortion,
2525
publish_event,
2626
)
27+
from tenacity import Retrying
2728
from tenacity.asyncio import AsyncRetrying
2829
from tenacity.retry import retry_if_exception_type
2930
from tenacity.stop import stop_after_delay
@@ -47,54 +48,60 @@ def test_publish_event(
4748
task_owner=task_owner,
4849
)
4950

50-
def handler_some_topic_event(event: tuple) -> None:
51+
def handler(event: tuple) -> None:
5152
print("received event", event)
5253
assert isinstance(event, tuple)
5354
received_task_log_event = TaskProgressEvent.model_validate_json(event[1])
5455
assert received_task_log_event == event_to_publish
5556

56-
dask_client.subscribe_topic("some_topic", handler_some_topic_event)
57-
time.sleep(1)
58-
dask_client.log_event("some_topic", event_to_publish.model_dump_json())
59-
time.sleep(1)
60-
# publish_event(dask_pub=dask_pub, event=event_to_publish)
61-
62-
# NOTE: this tests runs a sync dask client,
63-
# and the CI seems to have sometimes difficulties having this run in a reasonable time
64-
# hence the long time out
65-
events = dask_client.get_events("some_topic")
66-
assert events is not None
57+
dask_client.subscribe_topic(TaskProgressEvent.topic_name(), handler)
58+
59+
publish_event(dask_client, event=event_to_publish)
60+
for attempt in Retrying(
61+
wait=wait_fixed(0.2), stop=stop_after_delay(15), reraise=True
62+
):
63+
with attempt:
64+
events = dask_client.get_events(TaskProgressEvent.topic_name())
65+
assert events is not None
66+
6767
assert isinstance(events, tuple)
6868
assert len(events) == 1
6969
assert isinstance(events[0], tuple)
7070
received_task_log_event = TaskProgressEvent.model_validate_json(events[0][1])
7171
assert received_task_log_event == event_to_publish
7272

73-
# message = dask_sub.get(timeout=DASK_TESTING_TIMEOUT_S)
74-
# assert message is not None
75-
# assert isinstance(message, str)
76-
# received_task_log_event = TaskProgressEvent.model_validate_json(message)
77-
# assert received_task_log_event == event_to_publish
78-
7973

8074
async def test_publish_event_async(
8175
async_dask_client: distributed.Client, job_id: str, task_owner: TaskOwner
8276
):
83-
dask_pub = distributed.Pub("some_topic", client=async_dask_client)
84-
dask_sub = distributed.Sub("some_topic", client=async_dask_client)
8577
event_to_publish = TaskProgressEvent(
86-
job_id=job_id, msg="the log", progress=2, task_owner=task_owner
78+
job_id=job_id,
79+
msg="the log",
80+
progress=2,
81+
task_owner=task_owner,
8782
)
88-
publish_event(dask_pub=dask_pub, event=event_to_publish)
89-
90-
# NOTE: this tests runs a sync dask client,
91-
# and the CI seems to have sometimes difficulties having this run in a reasonable time
92-
# hence the long time out
93-
message = dask_sub.get(timeout=DASK_TESTING_TIMEOUT_S)
94-
assert isinstance(message, Coroutine)
95-
message = await message
96-
assert message is not None
97-
received_task_log_event = TaskProgressEvent.model_validate_json(message)
83+
84+
async def handler(event: tuple) -> None:
85+
print("received event", event)
86+
assert isinstance(event, tuple)
87+
received_task_log_event = TaskProgressEvent.model_validate_json(event[1])
88+
assert received_task_log_event == event_to_publish
89+
90+
async_dask_client.subscribe_topic(TaskProgressEvent.topic_name(), handler)
91+
92+
await publish_event(async_dask_client, event=event_to_publish)
93+
94+
async for attempt in AsyncRetrying(
95+
wait=wait_fixed(0.2), stop=stop_after_delay(15), reraise=True
96+
):
97+
with attempt:
98+
events = await async_dask_client.get_events(TaskProgressEvent.topic_name())
99+
assert events is not None
100+
101+
assert isinstance(events, tuple)
102+
assert len(events) == 1
103+
assert isinstance(events[0], tuple)
104+
received_task_log_event = TaskProgressEvent.model_validate_json(events[0][1])
98105
assert received_task_log_event == event_to_publish
99106

100107

0 commit comments

Comments
 (0)