Skip to content

Commit e406ffc

Browse files
committed
refactor
1 parent 756595d commit e406ffc

File tree

1 file changed

+24
-19
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+24
-19
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@
7676
_logger = logging.getLogger(__name__)
7777

7878

79-
_Previous = CompTaskAtDB
80-
_Current = CompTaskAtDB
79+
@dataclass(frozen=True, slots=True)
80+
class TaskStateTracker:
81+
previous: CompTaskAtDB
82+
current: CompTaskAtDB
83+
8184

8285
_MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta(
8386
seconds=30
@@ -123,34 +126,36 @@ class SortedTasks:
123126

124127

125128
async def _triage_changed_tasks(
126-
changed_tasks: list[tuple[_Previous, _Current]],
129+
changed_tasks: list[TaskStateTracker],
127130
) -> SortedTasks:
128131
started_tasks = [
129-
current
130-
for previous, current in changed_tasks
131-
if current.state in RUNNING_STATES
132+
tracker.current
133+
for tracker in changed_tasks
134+
if tracker.current.state in RUNNING_STATES
132135
or (
133-
previous.state in WAITING_FOR_START_STATES
134-
and current.state in COMPLETED_STATES
136+
tracker.previous.state in WAITING_FOR_START_STATES
137+
and tracker.current.state in COMPLETED_STATES
135138
)
136139
]
137140

138141
completed_tasks = [
139-
current for _, current in changed_tasks if current.state in COMPLETED_STATES
142+
tracker.current
143+
for tracker in changed_tasks
144+
if tracker.current.state in COMPLETED_STATES
140145
]
141146

142147
waiting_for_resources_tasks = [
143-
current
144-
for previous, current in changed_tasks
145-
if current.state in WAITING_FOR_START_STATES
148+
tracker.current
149+
for tracker in changed_tasks
150+
if tracker.current.state in WAITING_FOR_START_STATES
146151
]
147152

148153
lost_tasks = [
149-
current
150-
for previous, current in changed_tasks
151-
if (current.state is RunningState.UNKNOWN)
154+
tracker.current
155+
for tracker in changed_tasks
156+
if (tracker.current.state is RunningState.UNKNOWN)
152157
and (
153-
(arrow.utcnow().datetime - previous.modified)
158+
(arrow.utcnow().datetime - tracker.previous.modified)
154159
> _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS
155160
)
156161
]
@@ -323,7 +328,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
323328
return False
324329
if task.start is None:
325330
_logger.warning(
326-
"Task %s is in state %s but has no start time. This should not happen. Skipping heartbeat as this cannot be computed. (%s, %s, %s, %s, %s)",
331+
"Task %s is in state %s but has no start time. TIP: this can happen if the task went from PENDING to UNKNOWN to SUCCESS and back to STARTED due to not responding when retrieving the results. Skipping heartbeat as this cannot be computed. (%s, %s, %s, %s, %s)",
327332
task.job_id,
328333
task.state,
329334
user_id,
@@ -375,14 +380,14 @@ async def _get_changed_tasks_from_backend(
375380
user_id: UserID,
376381
processing_tasks: list[CompTaskAtDB],
377382
comp_run: CompRunsAtDB,
378-
) -> tuple[list[tuple[_Previous, _Current]], list[CompTaskAtDB]]:
383+
) -> tuple[list[TaskStateTracker], list[CompTaskAtDB]]:
379384
tasks_backend_status = await self._get_tasks_status(
380385
user_id, processing_tasks, comp_run
381386
)
382387

383388
return (
384389
[
385-
(
390+
TaskStateTracker(
386391
task,
387392
task.model_copy(update={"state": backend_state}),
388393
)

0 commit comments

Comments
 (0)