Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
ProjectNotFoundError,
WalletNotEnoughCreditsError,
)
from ...models.comp_pipelines import CompPipelineAtDB
from ...models.comp_runs import CompRunsAtDB, ProjectMetadataDict, RunMetadataDict
from ...models.comp_tasks import CompTaskAtDB
from ...modules.catalog import CatalogClient
Expand Down Expand Up @@ -537,10 +536,8 @@ async def stop_computation(
# check the project exists
await project_repo.get_project(project_id)
# get the project pipeline
pipeline_at_db: CompPipelineAtDB = await comp_pipelines_repo.get_pipeline(
project_id
)
pipeline_dag: nx.DiGraph = pipeline_at_db.get_graph()
pipeline_at_db = await comp_pipelines_repo.get_pipeline(project_id)
pipeline_dag = pipeline_at_db.get_graph()
# get the project task states
tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id)
# create the complete DAG graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def create_complete_dag(workbench: NodesDict) -> nx.DiGraph:
)
if node.input_nodes:
for input_node_id in node.input_nodes:
predecessor_node = workbench.get(NodeIDStr(input_node_id))
predecessor_node = workbench.get(f"{input_node_id}")
if predecessor_node:
dag_graph.add_edge(str(input_node_id), node_id)

Expand Down Expand Up @@ -95,19 +95,18 @@ async def get_node_io_payload_cb(node_id: NodeID) -> dict[str, Any]:
return result

computed_hash = await compute_node_hash(node_id, get_node_io_payload_cb)
if computed_hash != node["run_hash"]:
return True
return False
return computed_hash != node["run_hash"]


async def _compute_node_dependencies_state(graph_data, node_id) -> set[NodeID]:
node = graph_data[f"{node_id}"]
# check if the previous node is outdated or waits for dependencies... in which case this one has to wait
non_computed_dependencies: set[NodeID] = set()
for input_port in node.get("inputs", {}).values():
if isinstance(input_port, PortLink):
if _node_needs_computation(graph_data, input_port.node_uuid):
non_computed_dependencies.add(input_port.node_uuid)
if isinstance(input_port, PortLink) and _node_needs_computation(
graph_data, input_port.node_uuid
):
non_computed_dependencies.add(input_port.node_uuid)
# all good. ready
return non_computed_dependencies

Expand Down Expand Up @@ -188,14 +187,14 @@ def compute_pipeline_started_timestamp(
if not pipeline_dag.nodes:
return None
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
f"{task.node_id}": task for task in comp_tasks
}
TOMORROW = arrow.utcnow().shift(days=1).datetime
tomorrow = arrow.utcnow().shift(days=1).datetime
pipeline_started_at: datetime.datetime | None = min(
node_id_to_comp_task[node_id].start or TOMORROW
node_id_to_comp_task[node_id].start or tomorrow
for node_id in pipeline_dag.nodes
)
if pipeline_started_at == TOMORROW:
if pipeline_started_at == tomorrow:
pipeline_started_at = None
return pipeline_started_at

Expand All @@ -206,13 +205,13 @@ def compute_pipeline_stopped_timestamp(
if not pipeline_dag.nodes:
return None
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
f"{task.node_id}": task for task in comp_tasks
}
TOMORROW = arrow.utcnow().shift(days=1).datetime
tomorrow = arrow.utcnow().shift(days=1).datetime
pipeline_stopped_at: datetime.datetime | None = max(
node_id_to_comp_task[node_id].end or TOMORROW for node_id in pipeline_dag.nodes
node_id_to_comp_task[node_id].end or tomorrow for node_id in pipeline_dag.nodes
)
if pipeline_stopped_at == TOMORROW:
if pipeline_stopped_at == tomorrow:
pipeline_stopped_at = None
return pipeline_stopped_at

Expand All @@ -227,15 +226,15 @@ async def compute_pipeline_details(

# NOTE: the latest progress is available in comp_tasks only
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
f"{task.node_id}": task for task in comp_tasks
}
pipeline_progress = None
if len(pipeline_dag.nodes) > 0:

pipeline_progress = sum(
(node_id_to_comp_task[node_id].progress or 0) / len(pipeline_dag.nodes)
for node_id in pipeline_dag.nodes
if node_id_to_comp_task[node_id].progress is not None
if node_id in node_id_to_comp_task
and node_id_to_comp_task[node_id].progress is not None
)
pipeline_progress = max(0.0, min(pipeline_progress, 1.0))

Expand All @@ -246,10 +245,15 @@ async def compute_pipeline_details(
node_id: NodeState(
modified=node_data.get(kNODE_MODIFIED_STATE, False),
dependencies=node_data.get(kNODE_DEPENDENCIES_TO_COMPUTE, set()),
current_status=node_id_to_comp_task[node_id].state,
current_status=(
node_id_to_comp_task[node_id].state
if node_id in node_id_to_comp_task
else RunningState.UNKNOWN
),
progress=(
node_id_to_comp_task[node_id].progress
if node_id_to_comp_task[node_id].progress is not None
if node_id in node_id_to_comp_task
and node_id_to_comp_task[node_id].progress is not None
else None
),
)
Expand All @@ -261,12 +265,13 @@ async def compute_pipeline_details(

def find_computational_node_cycles(dag: nx.DiGraph) -> list[list[str]]:
"""returns a list of nodes part of a cycle and computational, which is currently forbidden."""
computational_node_cycles = []

list_potential_cycles = nx.algorithms.cycles.simple_cycles(dag)
for cycle in list_potential_cycles:
return [
deepcopy(cycle)
for cycle in list_potential_cycles
if any(
dag.nodes[node_id]["node_class"] is NodeClass.COMPUTATIONAL
for node_id in cycle
):
computational_node_cycles.append(deepcopy(cycle))
return computational_node_cycles
)
]
102 changes: 97 additions & 5 deletions services/director-v2/tests/unit/test_utils_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,11 @@ def pipeline_test_params(
expected_pipeline_details_output: PipelineDetails,
) -> PipelineDetailsTestParams:
# check the inputs make sense
assert len(set(dag_adjacency)) == len(node_keys) == len(list_comp_tasks)
assert dag_adjacency.keys() == node_keys.keys()
assert len(
{t.node_id for t in list_comp_tasks}.intersection(node_keys.keys())
) == len(set(dag_adjacency))
# assert len(set(dag_adjacency)) == len(node_keys) == len(list_comp_tasks)
# assert dag_adjacency.keys() == node_keys.keys()
# assert len(
# {t.node_id for t in list_comp_tasks}.intersection(node_keys.keys())
# ) == len(set(dag_adjacency))

# resolve the naming
node_name_to_uuid_map = {}
Expand Down Expand Up @@ -596,3 +596,95 @@ async def test_compute_pipeline_details(
received_details.model_dump()
== pipeline_test_params.expected_pipeline_details.model_dump()
)


@pytest.mark.parametrize(
"dag_adjacency, node_keys, list_comp_tasks, expected_pipeline_details_output",
[
pytest.param(
{"node_1": ["node_2", "node_3"], "node_2": ["node_3"], "node_3": []},
{
"node_1": {
"key": "simcore/services/comp/fake",
"node_class": NodeClass.COMPUTATIONAL,
"state": RunningState.NOT_STARTED,
"outputs": None,
},
"node_2": {
"key": "simcore/services/comp/fake",
"node_class": NodeClass.COMPUTATIONAL,
"state": RunningState.NOT_STARTED,
"outputs": None,
},
"node_3": {
"key": "simcore/services/comp/fake",
"node_class": NodeClass.COMPUTATIONAL,
"state": RunningState.NOT_STARTED,
"outputs": None,
},
},
[
# NOTE: we use construct here to be able to use non uuid names to simplify test setup
CompTaskAtDB.model_construct(
project_id=uuid4(),
node_id="node_1",
schema=NodeSchema(inputs={}, outputs={}),
inputs=None,
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
state=RunningState.NOT_STARTED,
internal_id=2,
node_class=NodeClass.COMPUTATIONAL,
created=datetime.datetime.now(tz=datetime.UTC),
modified=datetime.datetime.now(tz=datetime.UTC),
last_heartbeat=None,
),
CompTaskAtDB.model_construct(
project_id=uuid4(),
node_id="node_2",
schema=NodeSchema(inputs={}, outputs={}),
inputs=None,
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
state=RunningState.NOT_STARTED,
internal_id=3,
node_class=NodeClass.COMPUTATIONAL,
created=datetime.datetime.now(tz=datetime.UTC),
modified=datetime.datetime.now(tz=datetime.UTC),
last_heartbeat=None,
),
],
PipelineDetails.model_construct(
adjacency_list={
"node_1": ["node_2", "node_3"],
"node_2": ["node_3"],
"node_3": [],
},
progress=0.0,
node_states={
"node_1": NodeState(modified=True, progress=None),
"node_2": NodeState(modified=True, progress=None),
"node_3": NodeState(
modified=True,
progress=None,
current_status=RunningState.UNKNOWN,
),
},
),
id="dag with missing tasks (node 3 is missing, so it is not skipped in the pipeline details)",
)
],
)
@pytest.mark.acceptance_test(
"For https://github.com/ITISFoundation/osparc-simcore/issues/8172"
)
async def test_compute_pipeline_details_with_missing_tasks(
pipeline_test_params: PipelineDetailsTestParams,
):
received_details = await compute_pipeline_details(
pipeline_test_params.complete_dag,
pipeline_test_params.pipeline_dag,
pipeline_test_params.comp_tasks,
)
assert (
received_details.model_dump()
== pipeline_test_params.expected_pipeline_details.model_dump()
)
Loading