Skip to content

Commit 1d9a5c1

Browse files
committed
mode PipelineInfo model into models module @sanderegg
1 parent 446bd66 commit 1d9a5c1

File tree

2 files changed

+11
-11
lines changed

2 files changed

+11
-11
lines changed

services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from contextlib import suppress
33
from typing import Annotated, Any
44

5+
import networkx as nx
56
from dask_task_models_library.container_tasks.protocol import ContainerEnvsDict
67
from models_library.api_schemas_directorv2.services import NodeRequirements
78
from models_library.basic_regex import SIMPLE_VERSION_RE
@@ -276,3 +277,9 @@ class ComputationTaskForRpcDBGet(BaseModel):
276277
image: dict[str, Any]
277278
started_at: dt.datetime | None
278279
ended_at: dt.datetime | None
280+
281+
282+
class PipelineInfo(BaseModel):
283+
pipeline_dag: nx.DiGraph
284+
all_tasks: list[CompTaskAtDB]
285+
filtered_tasks: list[CompTaskAtDB]

services/director-v2/src/simcore_service_director_v2/utils/computations_tasks.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,13 @@
1-
from typing import NamedTuple
2-
31
import networkx as nx
42
from models_library.projects import ProjectID
53
from simcore_service_director_v2.core.errors import PipelineTaskMissingError
64

75
from ..models.comp_pipelines import CompPipelineAtDB
8-
from ..models.comp_tasks import CompTaskAtDB
6+
from ..models.comp_tasks import CompTaskAtDB, PipelineInfo
97
from ..modules.db.repositories.comp_pipelines import CompPipelinesRepository
108
from ..modules.db.repositories.comp_tasks import CompTasksRepository
119

1210

13-
class PipelineInfo(NamedTuple):
14-
# NOTE: kept old names for legacy but should rename for clarity
15-
pipeline_dag: nx.DiGraph
16-
all_tasks: list[CompTaskAtDB] # all nodes in pipeline
17-
filtered_tasks: list[CompTaskAtDB] # nodes that actually run i.e. part of the dag
18-
19-
2011
async def _get_pipeline_info(
2112
*,
2213
project_id: ProjectID,
@@ -39,7 +30,9 @@ async def _get_pipeline_info(
3930
t for t in all_tasks if f"{t.node_id}" in set(pipeline_dag.nodes())
4031
]
4132

42-
return PipelineInfo(pipeline_dag, all_tasks, filtered_tasks)
33+
return PipelineInfo(
34+
pipeline_dag=pipeline_dag, all_tasks=all_tasks, filtered_tasks=filtered_tasks
35+
)
4336

4437

4538
async def validate_pipeline(

0 commit comments

Comments
 (0)