Skip to content

Commit 42e3518

Browse files
committed
use a Variable to keep the Future
1 parent 35f091e commit 42e3518

File tree

1 file changed

+21
-22
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules

1 file changed

+21
-22
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ def _comp_sidecar_fct(
263263
)
264264
# NOTE: the callback is running in a secondary thread, and takes a future as arg
265265
task_future.add_done_callback(lambda _: callback())
266+
await distributed.Variable(job_id, client=self.backend.client).set(
267+
task_future
268+
)
266269

267270
await dask_utils.wrap_client_async_routine(
268271
self.backend.client.publish_dataset(task_future, name=job_id)
@@ -450,30 +453,26 @@ def _get_pipeline_statuses(
450453
DaskSchedulerTaskState | None, task_statuses.get(job_id, "lost")
451454
)
452455
if dask_status == "erred":
453-
try:
454-
# find out if this was a cancellation
455-
exception = await distributed.Future(
456-
job_id, client=self.backend.client
457-
).exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
458-
assert isinstance(exception, Exception) # nosec
459-
460-
if isinstance(exception, TaskCancelledError):
461-
running_states.append(DaskClientTaskState.ABORTED)
462-
else:
463-
assert exception # nosec
464-
_logger.warning(
465-
"Task %s completed in error:\n%s\nTrace:\n%s",
466-
job_id,
467-
exception,
468-
"".join(traceback.format_exception(exception)),
469-
)
470-
running_states.append(DaskClientTaskState.ERRED)
471-
except TimeoutError:
456+
# find out if this was a cancellation
457+
var = distributed.Variable(job_id, client=self.backend.client)
458+
future: distributed.Future = await var.get(
459+
timeout=_DASK_DEFAULT_TIMEOUT_S
460+
)
461+
exception = await future.exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
462+
assert isinstance(exception, Exception) # nosec
463+
464+
if isinstance(exception, TaskCancelledError):
465+
running_states.append(DaskClientTaskState.ABORTED)
466+
else:
467+
assert exception # nosec
472468
_logger.warning(
473-
"Task %s completed in error but was lost from dask-scheduler since then."
474-
"TIP: This can happen when the future just disappeared from the dask-scheduler when this call was done."
469+
"Task %s completed in error:\n%s\nTrace:\n%s",
470+
job_id,
471+
exception,
472+
"".join(traceback.format_exception(exception)),
475473
)
476-
running_states.append(DaskClientTaskState.LOST)
474+
running_states.append(DaskClientTaskState.ERRED)
475+
477476
elif dask_status is None:
478477
running_states.append(DaskClientTaskState.LOST)
479478
else:

0 commit comments

Comments
 (0)