Skip to content

Commit 511ca1e

Browse files
committed
in case of timeout we return LOST
1 parent c792b8d commit 511ca1e

File tree

1 file changed

+25
-17
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules

1 file changed

+25
-17
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
}
100100

101101

102-
_DASK_DEFAULT_TIMEOUT_S: Final[int] = 1
102+
_DASK_DEFAULT_TIMEOUT_S: Final[int] = 5
103103

104104

105105
_UserCallbackInSepThread = Callable[[], None]
@@ -453,25 +453,33 @@ def _get_pipeline_statuses(
453453
DaskSchedulerTaskState | None, task_statuses.get(job_id, "lost")
454454
)
455455
if dask_status == "erred":
456-
# find out if this was a cancellation
457-
var = distributed.Variable(job_id, client=self.backend.client)
458-
future: distributed.Future = await var.get(
459-
timeout=_DASK_DEFAULT_TIMEOUT_S
460-
)
461-
exception = await future.exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
462-
assert isinstance(exception, Exception) # nosec
463-
464-
if isinstance(exception, TaskCancelledError):
465-
running_states.append(DaskClientTaskState.ABORTED)
466-
else:
467-
assert exception # nosec
456+
try:
457+
# find out if this was a cancellation
458+
var = distributed.Variable(job_id, client=self.backend.client)
459+
future: distributed.Future = await var.get(
460+
timeout=_DASK_DEFAULT_TIMEOUT_S
461+
)
462+
exception = await future.exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
463+
assert isinstance(exception, Exception) # nosec
464+
465+
if isinstance(exception, TaskCancelledError):
466+
running_states.append(DaskClientTaskState.ABORTED)
467+
else:
468+
assert exception # nosec
469+
_logger.warning(
470+
"Task %s completed in error:\n%s\nTrace:\n%s",
471+
job_id,
472+
exception,
473+
"".join(traceback.format_exception(exception)),
474+
)
475+
running_states.append(DaskClientTaskState.ERRED)
476+
except TimeoutError:
468477
_logger.warning(
469-
"Task %s completed in error:\n%s\nTrace:\n%s",
478+
"Task %s could not be retrieved from dask-scheduler, it is lost\n"
479+
"TIP:If the task was unpublished this can happen, or if the dask-scheduler was restarted.",
470480
job_id,
471-
exception,
472-
"".join(traceback.format_exception(exception)),
473481
)
474-
running_states.append(DaskClientTaskState.ERRED)
482+
running_states.append(DaskClientTaskState.LOST)
475483

476484
elif dask_status is None:
477485
running_states.append(DaskClientTaskState.LOST)

0 commit comments

Comments
 (0)