Skip to content

Commit eee4314

Browse files
authored
Merge branch 'master' into is7979/status-line-too-long
2 parents c5ade54 + 28845dd commit eee4314

File tree

3 files changed

+124
-37
lines changed

3 files changed

+124
-37
lines changed

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
ProjectNotFoundError,
5959
WalletNotEnoughCreditsError,
6060
)
61-
from ...models.comp_pipelines import CompPipelineAtDB
6261
from ...models.comp_runs import CompRunsAtDB, ProjectMetadataDict, RunMetadataDict
6362
from ...models.comp_tasks import CompTaskAtDB
6463
from ...modules.catalog import CatalogClient
@@ -537,10 +536,8 @@ async def stop_computation(
537536
# check the project exists
538537
await project_repo.get_project(project_id)
539538
# get the project pipeline
540-
pipeline_at_db: CompPipelineAtDB = await comp_pipelines_repo.get_pipeline(
541-
project_id
542-
)
543-
pipeline_dag: nx.DiGraph = pipeline_at_db.get_graph()
539+
pipeline_at_db = await comp_pipelines_repo.get_pipeline(project_id)
540+
pipeline_dag = pipeline_at_db.get_graph()
544541
# get the project task states
545542
tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id)
546543
# create the complete DAG graph

services/director-v2/src/simcore_service_director_v2/utils/dags.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def create_complete_dag(workbench: NodesDict) -> nx.DiGraph:
4242
)
4343
if node.input_nodes:
4444
for input_node_id in node.input_nodes:
45-
predecessor_node = workbench.get(NodeIDStr(input_node_id))
45+
predecessor_node = workbench.get(f"{input_node_id}")
4646
if predecessor_node:
4747
dag_graph.add_edge(str(input_node_id), node_id)
4848

@@ -95,19 +95,18 @@ async def get_node_io_payload_cb(node_id: NodeID) -> dict[str, Any]:
9595
return result
9696

9797
computed_hash = await compute_node_hash(node_id, get_node_io_payload_cb)
98-
if computed_hash != node["run_hash"]:
99-
return True
100-
return False
98+
return bool(computed_hash != node["run_hash"])
10199

102100

103101
async def _compute_node_dependencies_state(graph_data, node_id) -> set[NodeID]:
104102
node = graph_data[f"{node_id}"]
105103
# check if the previous node is outdated or waits for dependencies... in which case this one has to wait
106104
non_computed_dependencies: set[NodeID] = set()
107105
for input_port in node.get("inputs", {}).values():
108-
if isinstance(input_port, PortLink):
109-
if _node_needs_computation(graph_data, input_port.node_uuid):
110-
non_computed_dependencies.add(input_port.node_uuid)
106+
if isinstance(input_port, PortLink) and _node_needs_computation(
107+
graph_data, input_port.node_uuid
108+
):
109+
non_computed_dependencies.add(input_port.node_uuid)
111110
# all good. ready
112111
return non_computed_dependencies
113112

@@ -188,14 +187,14 @@ def compute_pipeline_started_timestamp(
188187
if not pipeline_dag.nodes:
189188
return None
190189
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
191-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
190+
f"{task.node_id}": task for task in comp_tasks
192191
}
193-
TOMORROW = arrow.utcnow().shift(days=1).datetime
192+
tomorrow = arrow.utcnow().shift(days=1).datetime
194193
pipeline_started_at: datetime.datetime | None = min(
195-
node_id_to_comp_task[node_id].start or TOMORROW
194+
node_id_to_comp_task[node_id].start or tomorrow
196195
for node_id in pipeline_dag.nodes
197196
)
198-
if pipeline_started_at == TOMORROW:
197+
if pipeline_started_at == tomorrow:
199198
pipeline_started_at = None
200199
return pipeline_started_at
201200

@@ -206,13 +205,13 @@ def compute_pipeline_stopped_timestamp(
206205
if not pipeline_dag.nodes:
207206
return None
208207
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
209-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
208+
f"{task.node_id}": task for task in comp_tasks
210209
}
211-
TOMORROW = arrow.utcnow().shift(days=1).datetime
210+
tomorrow = arrow.utcnow().shift(days=1).datetime
212211
pipeline_stopped_at: datetime.datetime | None = max(
213-
node_id_to_comp_task[node_id].end or TOMORROW for node_id in pipeline_dag.nodes
212+
node_id_to_comp_task[node_id].end or tomorrow for node_id in pipeline_dag.nodes
214213
)
215-
if pipeline_stopped_at == TOMORROW:
214+
if pipeline_stopped_at == tomorrow:
216215
pipeline_stopped_at = None
217216
return pipeline_stopped_at
218217

@@ -227,15 +226,15 @@ async def compute_pipeline_details(
227226

228227
# NOTE: the latest progress is available in comp_tasks only
229228
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
230-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
229+
f"{task.node_id}": task for task in comp_tasks
231230
}
232231
pipeline_progress = None
233232
if len(pipeline_dag.nodes) > 0:
234-
235233
pipeline_progress = sum(
236234
(node_id_to_comp_task[node_id].progress or 0) / len(pipeline_dag.nodes)
237235
for node_id in pipeline_dag.nodes
238-
if node_id_to_comp_task[node_id].progress is not None
236+
if node_id in node_id_to_comp_task
237+
and node_id_to_comp_task[node_id].progress is not None
239238
)
240239
pipeline_progress = max(0.0, min(pipeline_progress, 1.0))
241240

@@ -246,10 +245,15 @@ async def compute_pipeline_details(
246245
node_id: NodeState(
247246
modified=node_data.get(kNODE_MODIFIED_STATE, False),
248247
dependencies=node_data.get(kNODE_DEPENDENCIES_TO_COMPUTE, set()),
249-
current_status=node_id_to_comp_task[node_id].state,
248+
current_status=(
249+
node_id_to_comp_task[node_id].state
250+
if node_id in node_id_to_comp_task
251+
else RunningState.UNKNOWN
252+
),
250253
progress=(
251254
node_id_to_comp_task[node_id].progress
252-
if node_id_to_comp_task[node_id].progress is not None
255+
if node_id in node_id_to_comp_task
256+
and node_id_to_comp_task[node_id].progress is not None
253257
else None
254258
),
255259
)
@@ -261,12 +265,13 @@ async def compute_pipeline_details(
261265

262266
def find_computational_node_cycles(dag: nx.DiGraph) -> list[list[str]]:
263267
"""returns a list of nodes part of a cycle and computational, which is currently forbidden."""
264-
computational_node_cycles = []
268+
265269
list_potential_cycles = nx.algorithms.cycles.simple_cycles(dag)
266-
for cycle in list_potential_cycles:
270+
return [
271+
deepcopy(cycle)
272+
for cycle in list_potential_cycles
267273
if any(
268274
dag.nodes[node_id]["node_class"] is NodeClass.COMPUTATIONAL
269275
for node_id in cycle
270-
):
271-
computational_node_cycles.append(deepcopy(cycle))
272-
return computational_node_cycles
276+
)
277+
]

services/director-v2/tests/unit/test_utils_dags.py

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,13 +390,6 @@ def pipeline_test_params(
390390
list_comp_tasks: list[CompTaskAtDB],
391391
expected_pipeline_details_output: PipelineDetails,
392392
) -> PipelineDetailsTestParams:
393-
# check the inputs make sense
394-
assert len(set(dag_adjacency)) == len(node_keys) == len(list_comp_tasks)
395-
assert dag_adjacency.keys() == node_keys.keys()
396-
assert len(
397-
{t.node_id for t in list_comp_tasks}.intersection(node_keys.keys())
398-
) == len(set(dag_adjacency))
399-
400393
# resolve the naming
401394
node_name_to_uuid_map = {}
402395
resolved_dag_adjacency: dict[str, list[str]] = {}
@@ -596,3 +589,95 @@ async def test_compute_pipeline_details(
596589
received_details.model_dump()
597590
== pipeline_test_params.expected_pipeline_details.model_dump()
598591
)
592+
593+
594+
@pytest.mark.parametrize(
595+
"dag_adjacency, node_keys, list_comp_tasks, expected_pipeline_details_output",
596+
[
597+
pytest.param(
598+
{"node_1": ["node_2", "node_3"], "node_2": ["node_3"], "node_3": []},
599+
{
600+
"node_1": {
601+
"key": "simcore/services/comp/fake",
602+
"node_class": NodeClass.COMPUTATIONAL,
603+
"state": RunningState.NOT_STARTED,
604+
"outputs": None,
605+
},
606+
"node_2": {
607+
"key": "simcore/services/comp/fake",
608+
"node_class": NodeClass.COMPUTATIONAL,
609+
"state": RunningState.NOT_STARTED,
610+
"outputs": None,
611+
},
612+
"node_3": {
613+
"key": "simcore/services/comp/fake",
614+
"node_class": NodeClass.COMPUTATIONAL,
615+
"state": RunningState.NOT_STARTED,
616+
"outputs": None,
617+
},
618+
},
619+
[
620+
# NOTE: we use construct here to be able to use non uuid names to simplify test setup
621+
CompTaskAtDB.model_construct(
622+
project_id=uuid4(),
623+
node_id="node_1",
624+
schema=NodeSchema(inputs={}, outputs={}),
625+
inputs=None,
626+
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
627+
state=RunningState.NOT_STARTED,
628+
internal_id=2,
629+
node_class=NodeClass.COMPUTATIONAL,
630+
created=datetime.datetime.now(tz=datetime.UTC),
631+
modified=datetime.datetime.now(tz=datetime.UTC),
632+
last_heartbeat=None,
633+
),
634+
CompTaskAtDB.model_construct(
635+
project_id=uuid4(),
636+
node_id="node_2",
637+
schema=NodeSchema(inputs={}, outputs={}),
638+
inputs=None,
639+
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
640+
state=RunningState.NOT_STARTED,
641+
internal_id=3,
642+
node_class=NodeClass.COMPUTATIONAL,
643+
created=datetime.datetime.now(tz=datetime.UTC),
644+
modified=datetime.datetime.now(tz=datetime.UTC),
645+
last_heartbeat=None,
646+
),
647+
],
648+
PipelineDetails.model_construct(
649+
adjacency_list={
650+
"node_1": ["node_2", "node_3"],
651+
"node_2": ["node_3"],
652+
"node_3": [],
653+
},
654+
progress=0.0,
655+
node_states={
656+
"node_1": NodeState(modified=True, progress=None),
657+
"node_2": NodeState(modified=True, progress=None),
658+
"node_3": NodeState(
659+
modified=True,
660+
progress=None,
661+
current_status=RunningState.UNKNOWN,
662+
),
663+
},
664+
),
665+
id="dag with missing tasks (node 3 is missing, so it is not skipped in the pipeline details)",
666+
)
667+
],
668+
)
669+
@pytest.mark.acceptance_test(
670+
"For https://github.com/ITISFoundation/osparc-simcore/issues/8172"
671+
)
672+
async def test_compute_pipeline_details_with_missing_tasks(
673+
pipeline_test_params: PipelineDetailsTestParams,
674+
):
675+
received_details = await compute_pipeline_details(
676+
pipeline_test_params.complete_dag,
677+
pipeline_test_params.pipeline_dag,
678+
pipeline_test_params.comp_tasks,
679+
)
680+
assert (
681+
received_details.model_dump()
682+
== pipeline_test_params.expected_pipeline_details.model_dump()
683+
)

0 commit comments

Comments
 (0)