Skip to content

Commit d24571b

Browse files
committed
get_tasks_status fixed for new version of dask
1 parent 511ca1e commit d24571b

File tree

2 files changed

+21
-27
lines changed

2 files changed

+21
-27
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,14 +536,21 @@ async def get_task_result(self, job_id: str) -> TaskOutputData:
536536
async def release_task_result(self, job_id: str) -> None:
537537
_logger.debug("releasing results for %s", f"{job_id=}")
538538
try:
539+
# NOTE: The distributed Variable holds the future of the tasks in the dask-scheduler
540+
# Alas, deleting the variable is done asynchronously and there is no way to ensure
541+
# the variable was effectively deleted.
542+
# This is annoying as one can re-create the variable without error.
543+
var = distributed.Variable(job_id, client=self.backend.client)
544+
var.delete()
539545
# first check if the key exists
540546
await dask_utils.wrap_client_async_routine(
541547
self.backend.client.get_dataset(name=job_id)
542548
)
549+
543550
await dask_utils.wrap_client_async_routine(
544551
self.backend.client.unpublish_dataset(name=job_id)
545552
)
546-
distributed.Variable(job_id, client=self.backend.client).delete()
553+
547554
except KeyError:
548555
_logger.warning("Unknown task cannot be unpublished: %s", f"{job_id=}")
549556

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async def _assert_wait_for_task_status(
9191
job_id: str,
9292
dask_client: DaskClient,
9393
expected_status: DaskClientTaskState,
94-
timeout: int | None = None,
94+
timeout: int | None = None, # noqa: ASYNC109
9595
):
9696
async for attempt in AsyncRetrying(
9797
reraise=True,
@@ -104,24 +104,20 @@ async def _assert_wait_for_task_status(
104104
f"waiting for task to be {expected_status=}, "
105105
f"Attempt={attempt.retry_state.attempt_number}"
106106
)
107-
current_task_status = (await dask_client.get_tasks_status([job_id]))[0]
108-
assert isinstance(current_task_status, DaskClientTaskState)
109-
print(f"{current_task_status=} vs {expected_status=}")
110-
if (
111-
current_task_status is DaskClientTaskState.ERRED
112-
and expected_status
113-
not in [
114-
DaskClientTaskState.ERRED,
115-
DaskClientTaskState.LOST,
116-
]
117-
):
107+
got = (await dask_client.get_tasks_status([job_id]))[0]
108+
assert isinstance(got, DaskClientTaskState)
109+
print(f"{got=} vs {expected_status=}")
110+
if got is DaskClientTaskState.ERRED and expected_status not in [
111+
DaskClientTaskState.ERRED,
112+
DaskClientTaskState.LOST,
113+
]:
118114
try:
119115
# we can fail fast here
120116
# this will raise and we catch the Assertion to not reraise too long
121117
await dask_client.get_task_result(job_id)
122118
except AssertionError as exc:
123119
raise RuntimeError from exc
124-
assert current_task_status is expected_status
120+
assert got is expected_status
125121

126122

127123
@pytest.fixture
@@ -1024,11 +1020,6 @@ def fake_remote_fct(
10241020
assert len(published_computation_task) == 1
10251021

10261022
assert published_computation_task[0].node_id in cpu_image.fake_tasks
1027-
# let's get a dask future for the task here so dask will not remove the task from the scheduler at the end
1028-
computation_future = distributed.Future(
1029-
key=published_computation_task[0].job_id, client=dask_client.backend.client
1030-
)
1031-
assert computation_future
10321023

10331024
await _assert_wait_for_task_status(
10341025
published_computation_task[0].job_id,
@@ -1047,15 +1038,11 @@ def fake_remote_fct(
10471038
)
10481039
# release the task results
10491040
await dask_client.release_task_result(published_computation_task[0].job_id)
1050-
# the task is still present since we hold a future here
1051-
await _assert_wait_for_task_status(
1052-
published_computation_task[0].job_id,
1053-
dask_client,
1054-
DaskClientTaskState.ERRED if fail_remote_fct else DaskClientTaskState.SUCCESS,
1055-
)
10561041

1057-
# removing the future will let dask eventually delete the task from its memory, so its status becomes undefined
1058-
del computation_future
1042+
await asyncio.sleep(
1043+
5 # NOTE: here we wait to be sure that the dask-scheduler properly updates its state
1044+
)
1045+
# the task is gone, since the distributed Variable was removed above
10591046
await _assert_wait_for_task_status(
10601047
published_computation_task[0].job_id,
10611048
dask_client,

0 commit comments

Comments
 (0)