|
76 | 76 | _logger = logging.getLogger(__name__) |
77 | 77 |
|
78 | 78 |
|
79 | | -_Previous = CompTaskAtDB |
80 | | -_Current = CompTaskAtDB |
| 79 | +@dataclass(frozen=True, slots=True) |
| 80 | +class TaskStateTracker: |
| 81 | + previous: CompTaskAtDB |
| 82 | + current: CompTaskAtDB |
| 83 | + |
81 | 84 |
|
82 | 85 | _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta( |
83 | 86 | seconds=30 |
@@ -123,34 +126,36 @@ class SortedTasks: |
123 | 126 |
|
124 | 127 |
|
125 | 128 | async def _triage_changed_tasks( |
126 | | - changed_tasks: list[tuple[_Previous, _Current]], |
| 129 | + changed_tasks: list[TaskStateTracker], |
127 | 130 | ) -> SortedTasks: |
128 | 131 | started_tasks = [ |
129 | | - current |
130 | | - for previous, current in changed_tasks |
131 | | - if current.state in RUNNING_STATES |
| 132 | + tracker.current |
| 133 | + for tracker in changed_tasks |
| 134 | + if tracker.current.state in RUNNING_STATES |
132 | 135 | or ( |
133 | | - previous.state in WAITING_FOR_START_STATES |
134 | | - and current.state in COMPLETED_STATES |
| 136 | + tracker.previous.state in WAITING_FOR_START_STATES |
| 137 | + and tracker.current.state in COMPLETED_STATES |
135 | 138 | ) |
136 | 139 | ] |
137 | 140 |
|
138 | 141 | completed_tasks = [ |
139 | | - current for _, current in changed_tasks if current.state in COMPLETED_STATES |
| 142 | + tracker.current |
| 143 | + for tracker in changed_tasks |
| 144 | + if tracker.current.state in COMPLETED_STATES |
140 | 145 | ] |
141 | 146 |
|
142 | 147 | waiting_for_resources_tasks = [ |
143 | | - current |
144 | | - for previous, current in changed_tasks |
145 | | - if current.state in WAITING_FOR_START_STATES |
| 148 | + tracker.current |
| 149 | + for tracker in changed_tasks |
| 150 | + if tracker.current.state in WAITING_FOR_START_STATES |
146 | 151 | ] |
147 | 152 |
|
148 | 153 | lost_tasks = [ |
149 | | - current |
150 | | - for previous, current in changed_tasks |
151 | | - if (current.state is RunningState.UNKNOWN) |
| 154 | + tracker.current |
| 155 | + for tracker in changed_tasks |
| 156 | + if (tracker.current.state is RunningState.UNKNOWN) |
152 | 157 | and ( |
153 | | - (arrow.utcnow().datetime - previous.modified) |
| 158 | + (arrow.utcnow().datetime - tracker.previous.modified) |
154 | 159 | > _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS |
155 | 160 | ) |
156 | 161 | ] |
@@ -323,7 +328,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool: |
323 | 328 | return False |
324 | 329 | if task.start is None: |
325 | 330 | _logger.warning( |
326 | | - "Task %s is in state %s but has no start time. This should not happen. Skipping heartbeat as this cannot be computed. (%s, %s, %s, %s, %s)", |
| 331 | + "Task %s is in state %s but has no start time. TIP: this can happen if the task went from PENDING to UNKNOWN to SUCCESS and back to STARTED due to not responding when retrieving the results. Skipping heartbeat as this cannot be computed. (%s, %s, %s, %s, %s)", |
327 | 332 | task.job_id, |
328 | 333 | task.state, |
329 | 334 | user_id, |
@@ -375,14 +380,14 @@ async def _get_changed_tasks_from_backend( |
375 | 380 | user_id: UserID, |
376 | 381 | processing_tasks: list[CompTaskAtDB], |
377 | 382 | comp_run: CompRunsAtDB, |
378 | | - ) -> tuple[list[tuple[_Previous, _Current]], list[CompTaskAtDB]]: |
| 383 | + ) -> tuple[list[TaskStateTracker], list[CompTaskAtDB]]: |
379 | 384 | tasks_backend_status = await self._get_tasks_status( |
380 | 385 | user_id, processing_tasks, comp_run |
381 | 386 | ) |
382 | 387 |
|
383 | 388 | return ( |
384 | 389 | [ |
385 | | - ( |
| 390 | + TaskStateTracker( |
386 | 391 | task, |
387 | 392 | task.model_copy(update={"state": backend_state}), |
388 | 393 | ) |
|
0 commit comments