Skip to content

Commit 9786b9b

Browse files
committed
seems to work
1 parent e3b5a77 commit 9786b9b

File tree

3 files changed

+81
-49
lines changed

3 files changed

+81
-49
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ async def _() -> None:
9898
)
9999
else:
100100
_logger.warning(
101-
"RabbitMQ client plugin setup is not the main thread!"
101+
"RabbitMQ client plugin setup is not the main thread! Beware! if in pytest it's ok."
102102
)
103103

104104
# Cancel the message processor task

services/dask-sidecar/tests/unit/test_base_tasks.py

Lines changed: 0 additions & 48 deletions
This file was deleted.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
3+
# pylint: disable=unused-variable
4+
# pylint: disable=no-member
5+
6+
import time
7+
8+
import distributed
9+
import pytest
10+
from dask_task_models_library.models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
11+
from models_library.projects_state import RunningState
12+
from tenacity import Retrying, stop_after_delay, wait_fixed
13+
14+
pytest_simcore_core_services_selection = [
15+
"rabbit",
16+
]
17+
18+
19+
def test_task_state_lifecycle(local_cluster: distributed.LocalCluster) -> None:
20+
def _some_task() -> int:
21+
time.sleep(1)
22+
return 2
23+
24+
def _some_failing_task() -> None:
25+
time.sleep(1)
26+
msg = "Some error"
27+
raise RuntimeError(msg)
28+
29+
local_cluster.scale(0)
30+
for attempt in Retrying(
31+
stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True
32+
):
33+
with attempt:
34+
assert len(local_cluster.workers) == 0
35+
with distributed.Client(local_cluster) as dask_client:
36+
# submit the task and wait until it goes into WAITING_FOR_RESOURCES
37+
future = dask_client.submit(_some_task, resources={"CPU": 1})
38+
for attempt in Retrying(
39+
stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True
40+
):
41+
with attempt:
42+
events = dask_client.get_events(
43+
TASK_LIFE_CYCLE_EVENT.format(key=future.key)
44+
)
45+
assert isinstance(events, tuple)
46+
assert len(events) >= 2
47+
parsed_events = [
48+
TaskLifeCycleState.model_validate(event[1]) for event in events
49+
]
50+
assert parsed_events[0].state is RunningState.PENDING
51+
assert parsed_events[-1].state is RunningState.WAITING_FOR_RESOURCES
52+
53+
# now add a worker and wait for it to take the task
54+
local_cluster.scale(1)
55+
56+
# we basically wait for the tasks to finish
57+
assert future.result(timeout=15) == 2
58+
59+
events = dask_client.get_events(TASK_LIFE_CYCLE_EVENT.format(key=future.key))
60+
assert isinstance(events, tuple)
61+
parsed_events = [
62+
TaskLifeCycleState.model_validate(event[1]) for event in events
63+
]
64+
assert parsed_events[0].state is RunningState.PENDING
65+
assert RunningState.STARTED in {event.state for event in parsed_events}
66+
assert RunningState.FAILED not in {event.state for event in parsed_events}
67+
assert parsed_events[-1].state is RunningState.SUCCESS
68+
69+
future = dask_client.submit(_some_failing_task)
70+
with pytest.raises(RuntimeError):
71+
future.result(timeout=10)
72+
events = dask_client.get_events(TASK_LIFE_CYCLE_EVENT.format(key=future.key))
73+
parsed_events = [
74+
TaskLifeCycleState.model_validate(event[1]) for event in events
75+
]
76+
assert parsed_events[0].state is RunningState.PENDING
77+
assert RunningState.STARTED in {event.state for event in parsed_events}
78+
assert RunningState.FAILED in {event.state for event in parsed_events}
79+
assert RunningState.SUCCESS not in {event.state for event in parsed_events}
80+
assert parsed_events[-1].state is RunningState.FAILED

0 commit comments

Comments
 (0)