Skip to content

Commit 7392a35

Browse files
committed
also validate pipeline in rpc case @sanderegg
1 parent 0ef5ad6 commit 7392a35

File tree

4 files changed

+43
-32
lines changed

4 files changed

+43
-32
lines changed

services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository
2525
from ...modules.db.repositories.comp_tasks import CompTasksRepository
2626
from ...utils import dask as dask_utils
27-
from ...utils.computations_tasks import PipelineInfo, get_pipeline_info
27+
from ...utils.computations_tasks import PipelineTaskMissingError, validate_pipeline
2828
from ..dependencies.database import get_repository
2929

3030
log = logging.getLogger(__name__)
@@ -35,31 +35,6 @@
3535
# HELPERS -------------------------------------------------------------------
3636

3737

38-
async def validate_pipeline(
39-
project_id: ProjectID,
40-
comp_pipelines_repo: CompPipelinesRepository,
41-
comp_tasks_repo: CompTasksRepository,
42-
) -> PipelineInfo:
43-
"""
44-
Loads and validates data from pipelines and tasks tables and
45-
reports it back as PipelineInfo
46-
"""
47-
48-
pipeline_info = await get_pipeline_info(
49-
project_id=project_id,
50-
comp_pipelines_repo=comp_pipelines_repo,
51-
comp_tasks_repo=comp_tasks_repo,
52-
)
53-
54-
# check that we have the expected tasks
55-
if len(pipeline_info.filtered_tasks) != len(pipeline_info.pipeline_dag):
56-
raise HTTPException(
57-
status_code=status.HTTP_409_CONFLICT,
58-
detail="The tasks referenced by the pipeline are missing",
59-
)
60-
return pipeline_info
61-
62-
6338
# ROUTES HANDLERS --------------------------------------------------------------
6439

6540

@@ -82,7 +57,13 @@ async def get_all_tasks_log_files(
8257
Each log is only available when the corresponding task is done
8358
"""
8459
# gets computation task ids
85-
info = await validate_pipeline(project_id, comp_pipelines_repo, comp_tasks_repo)
60+
try:
61+
info = await validate_pipeline(project_id, comp_pipelines_repo, comp_tasks_repo)
62+
except PipelineTaskMissingError as exc:
63+
raise HTTPException(
64+
status_code=status.HTTP_409_CONFLICT,
65+
detail="The tasks referenced by the pipeline are missing",
66+
) from exc
8667
iter_task_ids = (t.node_id for t in info.filtered_tasks)
8768

8869
tasks_logs_files: list[TaskLogFileGet] = await logged_gather(

services/director-v2/src/simcore_service_director_v2/api/rpc/_computations_tasks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
from simcore_sdk.node_ports_common import data_items_utils
1111

1212
from ...constants import LOGS_FILE_NAME
13-
from ...core.errors import PipelineNotFoundError
13+
from ...core.errors import PipelineNotFoundError, PipelineTaskMissingError
1414
from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository
1515
from ...modules.db.repositories.comp_tasks import CompTasksRepository
16-
from ...utils.computations_tasks import get_pipeline_info
16+
from ...utils.computations_tasks import validate_pipeline
1717

1818
router = RPCRouter()
1919

@@ -27,12 +27,12 @@ async def get_computation_task_log_file_ids(
2727
comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine)
2828

2929
try:
30-
info = await get_pipeline_info(
30+
info = await validate_pipeline(
3131
project_id=project_id,
3232
comp_pipelines_repo=comp_pipelines_repo,
3333
comp_tasks_repo=comp_tasks_repo,
3434
)
35-
except PipelineNotFoundError as exc:
35+
except (PipelineNotFoundError, PipelineTaskMissingError) as exc:
3636
raise ComputationalTaskMissingError(project_id=project_id) from exc
3737

3838
iter_task_ids = (t.node_id for t in info.filtered_tasks)

services/director-v2/src/simcore_service_director_v2/core/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ class WalletNotEnoughCreditsError(DirectorError):
6767
msg_template = "Wallet '{wallet_name}' has {wallet_credit_amount} credits."
6868

6969

70+
class PipelineTaskMissingError(DirectorError):
71+
msg_template = "Pipeline associated with project_id {project_id} is missing task(s)"
72+
73+
7074
#
7175
# SCHEDULER ERRORS
7276
#

services/director-v2/src/simcore_service_director_v2/utils/computations_tasks.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import networkx as nx
44
from models_library.projects import ProjectID
5+
from simcore_service_director_v2.core.errors import PipelineTaskMissingError
56

67
from ..models.comp_pipelines import CompPipelineAtDB
78
from ..models.comp_tasks import CompTaskAtDB
@@ -16,7 +17,7 @@ class PipelineInfo(NamedTuple):
1617
filtered_tasks: list[CompTaskAtDB] # nodes that actually run i.e. part of the dag
1718

1819

19-
async def get_pipeline_info(
20+
async def _get_pipeline_info(
2021
*,
2122
project_id: ProjectID,
2223
comp_pipelines_repo: CompPipelinesRepository,
@@ -39,3 +40,28 @@ async def get_pipeline_info(
3940
]
4041

4142
return PipelineInfo(pipeline_dag, all_tasks, filtered_tasks)
43+
44+
45+
async def validate_pipeline(
46+
project_id: ProjectID,
47+
comp_pipelines_repo: CompPipelinesRepository,
48+
comp_tasks_repo: CompTasksRepository,
49+
) -> PipelineInfo:
50+
"""
51+
Loads and validates data from pipelines and tasks tables and
52+
reports it back as PipelineInfo
53+
54+
raises PipelineTaskMissingError
55+
"""
56+
57+
pipeline_info = await _get_pipeline_info(
58+
project_id=project_id,
59+
comp_pipelines_repo=comp_pipelines_repo,
60+
comp_tasks_repo=comp_tasks_repo,
61+
)
62+
63+
# check that we have the expected tasks
64+
if len(pipeline_info.filtered_tasks) != len(pipeline_info.pipeline_dag):
65+
raise PipelineTaskMissingError(project_id=project_id)
66+
67+
return pipeline_info

0 commit comments

Comments
 (0)