@@ -450,23 +450,30 @@ def _get_pipeline_statuses(
450450 DaskSchedulerTaskState | None , task_statuses .get (job_id , "lost" )
451451 )
452452 if dask_status == "erred" :
453- # find out if this was a cancellation
454- exception = await distributed .Future (
455- job_id , client = self .backend .client
456- ).exception (timeout = _DASK_DEFAULT_TIMEOUT_S )
457- assert isinstance (exception , Exception ) # nosec
458-
459- if isinstance (exception , TaskCancelledError ):
460- running_states .append (DaskClientTaskState .ABORTED )
461- else :
462- assert exception # nosec
453+ try :
454+ # find out if this was a cancellation
455+ exception = await distributed .Future (
456+ job_id , client = self .backend .client
457+ ).exception (timeout = _DASK_DEFAULT_TIMEOUT_S )
458+ assert isinstance (exception , Exception ) # nosec
459+
460+ if isinstance (exception , TaskCancelledError ):
461+ running_states .append (DaskClientTaskState .ABORTED )
462+ else :
463+ assert exception # nosec
464+ _logger .warning (
465+ "Task %s completed in error:\n %s\n Trace:\n %s" ,
466+ job_id ,
467+ exception ,
468+ "" .join (traceback .format_exception (exception )),
469+ )
470+ running_states .append (DaskClientTaskState .ERRED )
471+ except TimeoutError :
463472 _logger .warning (
464- "Task %s completed in error:\n %s\n Trace:\n %s" ,
465- job_id ,
466- exception ,
467- "" .join (traceback .format_exception (exception )),
473+ "Task %s completed in error but was lost from dask-scheduler since then."
474+ "TIP: This can happen when the future just disappeared from the dask-scheduler when this call was done."
468475 )
469- running_states .append (DaskClientTaskState .ERRED )
476+ running_states .append (DaskClientTaskState .LOST )
470477 elif dask_status is None :
471478 running_states .append (DaskClientTaskState .LOST )
472479 else :
0 commit comments