Skip to content

Commit f4e5d21

Browse files
committed
time out before processing stuff so that failure happens right away
1 parent 5246d30 commit f4e5d21

File tree

3 files changed

+59
-13
lines changed

3 files changed

+59
-13
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import datetime
1616
import logging
1717
from abc import ABC, abstractmethod
18-
from asyncio import tasks
1918
from collections.abc import Callable
2019
from dataclasses import dataclass
2120
from typing import Final
@@ -272,9 +271,12 @@ async def _set_processing_done(
272271
)
273272

274273
async def _set_states_following_failed_to_aborted(
275-
self, project_id: ProjectID, dag: nx.DiGraph, run_id: PositiveInt
274+
self,
275+
project_id: ProjectID,
276+
dag: nx.DiGraph,
277+
tasks: dict[NodeIDStr, CompTaskAtDB],
278+
run_id: PositiveInt,
276279
) -> dict[NodeIDStr, CompTaskAtDB]:
277-
tasks = await self._get_pipeline_tasks(project_id, dag)
278280
# Perform a reverse topological sort to ensure tasks are ordered from last to first
279281
sorted_node_ids = list(reversed(list(nx.topological_sort(dag))))
280282
tasks = {
@@ -634,15 +636,20 @@ async def apply(
634636
user_id, project_id, iteration
635637
)
636638
dag = await self._get_pipeline_dag(project_id)
639+
comp_tasks = await self._get_pipeline_tasks(project_id, dag)
637640
# 1. Update our list of tasks with data from backend (state, results)
638641
await self._update_states_from_comp_backend(
639642
user_id, project_id, iteration, dag, comp_run
640643
)
641-
# 2. Any task following a FAILED task shall be ABORTED
644+
# 2. timeout if waiting for cluster has been there for more than X minutes
645+
comp_tasks = await self._timeout_if_waiting_for_cluster_too_long(
646+
user_id, project_id, comp_run, comp_tasks
647+
)
648+
# 3. Any task following a FAILED task shall be ABORTED
642649
comp_tasks = await self._set_states_following_failed_to_aborted(
643-
project_id, dag, comp_run.run_id
650+
project_id, dag, comp_tasks, comp_run.run_id
644651
)
645-
# 3. do we want to stop the pipeline now?
652+
# 4. do we want to stop the pipeline now?
646653
if comp_run.cancelled:
647654
comp_tasks = await self._schedule_tasks_to_stop(
648655
user_id, project_id, comp_tasks, comp_run
@@ -664,10 +671,7 @@ async def apply(
664671
iteration=iteration,
665672
),
666673
)
667-
# 4. timeout if waiting for cluster has been there for more than X minutes
668-
comp_tasks = await self._timeout_if_waiting_for_cluster_too_long(
669-
user_id, project_id, comp_run, comp_tasks
670-
)
674+
671675
# 5. send a heartbeat
672676
await self._send_running_tasks_heartbeat(
673677
user_id, project_id, comp_run.run_id, iteration, dag

services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
# pylint: disable=too-many-statements
99

1010

11+
import datetime
1112
from unittest import mock
1213

1314
import pytest
@@ -69,3 +70,15 @@ def with_disabled_scheduler_publisher(mocker: MockerFixture) -> mock.Mock:
6970
"simcore_service_director_v2.modules.comp_scheduler._manager.request_pipeline_scheduling",
7071
autospec=True,
7172
)
73+
74+
75+
@pytest.fixture
76+
def with_short_max_wait_for_clusters_keeper(
77+
monkeypatch: pytest.MonkeyPatch, mocker: MockerFixture
78+
) -> datetime.timedelta:
79+
short_time = datetime.timedelta(seconds=5)
80+
setenvs_from_dict(
81+
monkeypatch,
82+
{"COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_CLUSTER_TIMEOUT": f"{short_time}"},
83+
)
84+
return short_time

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,9 +2048,10 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
20482048
"get_or_create_exception",
20492049
[ClustersKeeperNotAvailableError],
20502050
)
2051-
async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_waits(
2051+
async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_waits_and_eventually_timesout_fails(
20522052
with_disabled_auto_scheduling: mock.Mock,
20532053
with_disabled_scheduler_publisher: mock.Mock,
2054+
with_short_max_wait_for_clusters_keeper: datetime.timedelta,
20542055
initialized_app: FastAPI,
20552056
scheduler_api: BaseCompScheduler,
20562057
sqlalchemy_async_engine: AsyncEngine,
@@ -2061,8 +2062,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_waits(
20612062
computational_pipeline_rabbit_client_parser: mock.AsyncMock,
20622063
fake_collection_run_id: CollectionRunID,
20632064
):
2064-
# needs to change: https://github.com/ITISFoundation/osparc-simcore/issues/6817
2065-
20662065
mocked_get_or_create_cluster.side_effect = get_or_create_exception
20672066
# running the pipeline will trigger a call to the clusters-keeper
20682067
assert published_project.project.prj_owner
@@ -2166,6 +2165,36 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_waits(
21662165
expected_progress=None,
21672166
run_id=run_in_db.run_id,
21682167
)
2168+
await asyncio.sleep(with_short_max_wait_for_clusters_keeper.total_seconds() + 1)
2169+
# again will trigger the call again, but now it will start failing, first the task will be mark as FAILED
2170+
await scheduler_api.apply(
2171+
user_id=run_in_db.user_id,
2172+
project_id=run_in_db.project_uuid,
2173+
iteration=run_in_db.iteration,
2174+
)
2175+
mocked_get_or_create_cluster.assert_not_called()
2176+
await assert_comp_runs(
2177+
sqlalchemy_async_engine,
2178+
expected_total=1,
2179+
expected_state=RunningState.FAILED,
2180+
where_statement=and_(
2181+
comp_runs.c.user_id == published_project.project.prj_owner,
2182+
comp_runs.c.project_uuid == f"{published_project.project.uuid}",
2183+
),
2184+
)
2185+
await _assert_message_received(
2186+
computational_pipeline_rabbit_client_parser,
2187+
1,
2188+
ComputationalPipelineStatusMessage.model_validate_json,
2189+
)
2190+
await assert_comp_tasks_and_comp_run_snapshot_tasks(
2191+
sqlalchemy_async_engine,
2192+
project_uuid=published_project.project.uuid,
2193+
task_ids=[t.node_id for t in expected_waiting_for_cluster_tasks],
2194+
expected_state=RunningState.FAILED,
2195+
expected_progress=1.0,
2196+
run_id=run_in_db.run_id,
2197+
)
21692198

21702199

21712200
async def test_run_new_pipeline_called_twice_prevents_duplicate_runs(

0 commit comments

Comments
 (0)