Skip to content

Commit 9bc685a

Browse files
committed
fix ordering of tasks
1 parent cfc7164 commit 9bc685a

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class SortedTasks:
116116

117117

118118
async def _triage_changed_tasks(
119-
changed_tasks: list[tuple[_Previous, _Current]]
119+
changed_tasks: list[tuple[_Previous, _Current]],
120120
) -> SortedTasks:
121121
started_tasks = [
122122
current
@@ -242,9 +242,13 @@ async def _set_schedule_done(
242242
async def _set_states_following_failed_to_aborted(
243243
self, project_id: ProjectID, dag: nx.DiGraph
244244
) -> dict[NodeIDStr, CompTaskAtDB]:
245-
tasks: dict[NodeIDStr, CompTaskAtDB] = await self._get_pipeline_tasks(
246-
project_id, dag
247-
)
245+
tasks = await self._get_pipeline_tasks(project_id, dag)
246+
# Perform a reverse topological sort to ensure tasks are ordered from last to first
247+
sorted_node_ids = list(reversed(list(nx.topological_sort(dag))))
248+
tasks = {
249+
node_id: tasks[node_id] for node_id in sorted_node_ids if node_id in tasks
250+
}
251+
# we need the tasks ordered from the last task to the first
248252
node_ids_to_set_as_aborted: set[NodeIDStr] = set()
249253
for task in tasks.values():
250254
if task.state == RunningState.FAILED:
@@ -651,8 +655,10 @@ async def _schedule_tasks_to_stop(
651655
) -> None:
652656
# get any running task and stop them
653657
comp_tasks_repo = CompTasksRepository.instance(self.db_engine)
654-
await comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted(
655-
project_id
658+
await (
659+
comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted(
660+
project_id
661+
)
656662
)
657663
# stop any remaining running task, these are already submitted
658664
if tasks_to_stop := [

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,6 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
14511451
project_uuid=running_project.project.uuid,
14521452
task_ids=[
14531453
running_project.tasks[1].node_id,
1454-
running_project.tasks[2].node_id,
14551454
running_project.tasks[3].node_id,
14561455
],
14571456
expected_state=reboot_state.expected_task_state_group1,
@@ -1460,7 +1459,7 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
14601459
await assert_comp_tasks(
14611460
sqlalchemy_async_engine,
14621461
project_uuid=running_project.project.uuid,
1463-
task_ids=[running_project.tasks[4].node_id],
1462+
task_ids=[running_project.tasks[2].node_id, running_project.tasks[4].node_id],
14641463
expected_state=reboot_state.expected_task_state_group2,
14651464
expected_progress=reboot_state.expected_task_progress_group2,
14661465
)

0 commit comments

Comments
 (0)