@@ -115,48 +115,54 @@ class SortedTasks:
115115 potentially_lost : list [CompTaskAtDB ]
116116
117117
118+ _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS : Final [datetime .timedelta ] = datetime .timedelta (
119+ seconds = 30
120+ )
121+
122+
118123async def _triage_changed_tasks (
119- changed_tasks_or_executing : list [tuple [_Previous , _Current ]],
124+ changed_tasks : list [tuple [_Previous , _Current ]],
120125) -> SortedTasks :
121126 started_tasks = [
122127 current
123- for previous , current in changed_tasks_or_executing
128+ for previous , current in changed_tasks
124129 if current .state in RUNNING_STATES
125130 or (
126131 previous .state in WAITING_FOR_START_STATES
127132 and current .state in COMPLETED_STATES
128133 )
129134 ]
130135
131- # NOTE: some tasks can be both started and completed since we might have the time they were running
132136 completed_tasks = [
133- current
134- for _ , current in changed_tasks_or_executing
135- if current .state in COMPLETED_STATES
137+ current for _ , current in changed_tasks if current .state in COMPLETED_STATES
136138 ]
137139
138140 waiting_for_resources_tasks = [
139141 current
140- for previous , current in changed_tasks_or_executing
142+ for previous , current in changed_tasks
141143 if current .state in WAITING_FOR_START_STATES
142144 ]
143145
144- lost_or_momentarily_lost_tasks = [
146+ lost_tasks = [
145147 current
146- for _ , current in changed_tasks_or_executing
147- if current .state is RunningState .UNKNOWN
148+ for previous , current in changed_tasks
149+ if (current .state is RunningState .UNKNOWN )
150+ and (
151+ (arrow .utcnow ().datetime - previous .modified )
152+ > _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS
153+ )
148154 ]
149- if lost_or_momentarily_lost_tasks :
155+ if lost_tasks :
150156 _logger .warning (
151157 "%s are currently in unknown state. TIP: If they are running in an external cluster and it is not yet ready, that might explain it. But inform @sanderegg nevertheless!" ,
152- [t .node_id for t in lost_or_momentarily_lost_tasks ],
158+ [t .node_id for t in lost_tasks ],
153159 )
154160
155161 return SortedTasks (
156162 started_tasks ,
157163 completed_tasks ,
158164 waiting_for_resources_tasks ,
159- lost_or_momentarily_lost_tasks ,
165+ lost_tasks ,
160166 )
161167
162168
@@ -500,7 +506,7 @@ async def _update_states_from_comp_backend(
500506 # PENDING -> WAITING_FOR_RESOURCES (workers creation or missing) -> PENDING -> STARTED (worker started processing the task) -> SUCCESS/FAILED
501507 # or ABORTED (user cancelled) or UNKNOWN (lost task - it might be transient, be careful with this one)
502508 sorted_tasks = await _triage_changed_tasks (tasks_with_changed_states )
503-
509+ _logger . debug ( "found the following %s tasks with changed states" , sorted_tasks )
504510 # now process the tasks
505511 if sorted_tasks .started :
506512 # NOTE: the dask-scheduler cannot differentiate between tasks that are effectively computing and
0 commit comments