Skip to content

Commit 9913368

Browse files
sandereggCopilot
andauthored
šŸ›ā™»ļøComputational pipelines: Align computation pipeline HTTP codes, fix dynamic cycles bug (#8963)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent ff5ffe4 commit 9913368

File tree

20 files changed

+535
-175
lines changed

20 files changed

+535
-175
lines changed

ā€Žapi/specs/web-server/_computations.pyā€Ž

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,16 @@ async def get_computation(_path: Annotated[ComputationPathParams, Depends()]): .
4545
@router.post(
4646
"/computations/{project_id}:start",
4747
response_model=Envelope[ComputationStarted],
48+
status_code=status.HTTP_201_CREATED,
4849
responses={
50+
status.HTTP_200_OK: {
51+
"description": "Pipeline is up-to-date, nothing was started",
52+
"model": Envelope[ComputationGet],
53+
},
4954
status.HTTP_402_PAYMENT_REQUIRED: {"description": "Insufficient credits to run computation"},
50-
status.HTTP_404_NOT_FOUND: {"description": "Project/wallet/pricing details were not found"},
51-
status.HTTP_406_NOT_ACCEPTABLE: {"description": "Cluster not found"},
52-
status.HTTP_409_CONFLICT: {"description": "Project already started"},
53-
status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Configuration error"},
54-
status.HTTP_503_SERVICE_UNAVAILABLE: {"description": "Service not available"},
55+
status.HTTP_404_NOT_FOUND: {"description": "Project/wallet/pricing/cluster details were not found"},
56+
status.HTTP_409_CONFLICT: {"description": "Project already started or contains deprecated services"},
57+
status.HTTP_503_SERVICE_UNAVAILABLE: {"description": "Service not available or configuration error"},
5558
},
5659
)
5760
async def start_computation(

ā€Žpackages/pytest-simcore/src/pytest_simcore/simcore_dask_service.pyā€Ž

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# pylint:disable=unused-argument
33
# pylint:disable=redefined-outer-name
44

5+
import warnings
56
from collections.abc import Iterator
67
from dataclasses import dataclass
78
from pathlib import Path
@@ -90,7 +91,10 @@ def dask_client_security(
9091
def dask_client(dask_scheduler_service: str, dask_client_security: distributed.Security) -> Iterator[Client]:
9192
client = Client(dask_scheduler_service, security=dask_client_security)
9293
yield client
93-
client.close()
94+
# SEE https://github.com/dask/distributed/issues/2507
95+
with warnings.catch_warnings():
96+
warnings.simplefilter("ignore", RuntimeWarning)
97+
client.close()
9498

9599

96100
@pytest.fixture

ā€Žservices/api-server/src/simcore_service_api_server/exceptions/backend_errors.pyā€Ž

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,11 @@ class SolverOutputNotFoundError(BaseBackEndError):
8888
status_code = status.HTTP_404_NOT_FOUND
8989

9090

91-
class ClusterNotFoundError(BaseBackEndError):
92-
msg_template = user_message("The requested cluster could not be found.", _version=1)
93-
status_code = status.HTTP_406_NOT_ACCEPTABLE
9491

9592

9693
class ConfigurationError(BaseBackEndError):
9794
msg_template = user_message("A configuration error occurred.", _version=1)
98-
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
95+
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
9996

10097

10198
class ProductPriceNotFoundError(BaseBackEndError):
@@ -145,7 +142,8 @@ class InsufficientNumberOfSeatsError(BaseBackEndError):
145142

146143
class CanNotCheckoutServiceIsNotRunningError(BaseBackEndError):
147144
msg_template = user_message(
148-
"Unable to check out license item {licensed_item_id} because the dynamic service is not running. Current service ID: {service_run_id}.",
145+
"Unable to check out license item {licensed_item_id} because the dynamic service is not running. "
146+
"Current service ID: {service_run_id}.",
149147
_version=1,
150148
)
151149
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
@@ -171,7 +169,8 @@ class CeleryTaskNotFoundError(BaseBackEndError):
171169

172170
class SolverJobOutputRequestButNotSucceededError(BaseBackEndError):
173171
msg_template = user_message(
174-
"Cannot retrieve output for solver job '{job_id}' because it has not completed successfully. Current state: {state}.",
172+
"Cannot retrieve output for solver job '{job_id}' because it has not completed successfully. "
173+
"Current state: {state}.",
175174
_version=1,
176175
)
177176
status_code = status.HTTP_409_CONFLICT
@@ -187,7 +186,8 @@ class SolverJobNotStoppedYetError(BaseBackEndError):
187186

188187
class StudyJobOutputRequestButNotSucceededError(BaseBackEndError):
189188
msg_template = user_message(
190-
"Cannot retrieve output for project job '{job_id}' because it has not completed successfully. Current state: {state}.",
189+
"Cannot retrieve output for project job '{job_id}' because it has not completed successfully. "
190+
"Current state: {state}.",
191191
_version=1,
192192
)
193193
status_code = status.HTTP_409_CONFLICT

ā€Žservices/api-server/src/simcore_service_api_server/services_http/webserver.pyā€Ž

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757

5858
from ..core.settings import WebServerSettings
5959
from ..exceptions.backend_errors import (
60-
ClusterNotFoundError,
6160
ConfigurationError,
6261
ForbiddenWalletError,
6362
JobNotFoundError,
@@ -265,7 +264,10 @@ async def _wait_for_long_running_task_results(self, lrt_response: httpx.Response
265264
task_status = Envelope[TaskStatus].model_validate_json(get_response.text).data
266265
assert task_status is not None # nosec
267266
if not task_status.done:
268-
msg = "Timed out creating project. TIP: Try again, or contact oSparc support if this is happening repeatedly"
267+
msg = (
268+
"Timed out creating project. "
269+
"TIP: Try again, or contact oSparc support if this is happening repeatedly"
270+
)
269271
raise TryAgain(msg)
270272

271273
result_response = await self.long_running_task_client.get(
@@ -492,8 +494,7 @@ async def connect_pricing_unit_to_project_node(
492494
http_status_map=_JOB_STATUS_MAP
493495
| {
494496
status.HTTP_409_CONFLICT: ProjectAlreadyStartedError,
495-
status.HTTP_406_NOT_ACCEPTABLE: ClusterNotFoundError,
496-
status.HTTP_422_UNPROCESSABLE_ENTITY: ConfigurationError,
497+
status.HTTP_503_SERVICE_UNAVAILABLE: ConfigurationError,
497498
}
498499
)
499500
async def start_project(

ā€Žservices/director-v2/openapi.jsonā€Ž

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,23 +89,23 @@
8989
}
9090
}
9191
},
92-
"404": {
93-
"description": "Project or pricing details not found"
92+
"200": {
93+
"description": "Pipeline is up-to-date, nothing was started"
9494
},
95-
"406": {
96-
"description": "Cluster not found"
95+
"404": {
96+
"description": "Project/cluster/pricing details not found"
9797
},
9898
"503": {
99-
"description": "Service not available"
100-
},
101-
"422": {
102-
"description": "Configuration error"
99+
"description": "Service not available or configuration error"
103100
},
104101
"402": {
105102
"description": "Payment required"
106103
},
107104
"409": {
108-
"description": "Project already started"
105+
"description": "Project already started or contains deprecated services"
106+
},
107+
"422": {
108+
"description": "Invalid computation request (e.g. missing collection_run_id)"
109109
}
110110
}
111111
}

ā€Žservices/director-v2/src/simcore_service_director_v2/api/routes/computations.pyā€Ž

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from typing import Annotated, Any, Final
2222

2323
import networkx as nx
24-
from fastapi import APIRouter, Depends, FastAPI, HTTPException
24+
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Response
2525
from models_library.api_schemas_directorv2.computations import (
2626
ComputationCreate,
2727
ComputationDelete,
@@ -53,6 +53,7 @@
5353
ComputationalRunNotFoundError,
5454
ComputationalSchedulerError,
5555
ConfigurationError,
56+
EC2InstanceTypeNotFoundError,
5657
PipelineTaskMissingError,
5758
PricingPlanUnitNotFoundError,
5859
ProjectNotFoundError,
@@ -119,7 +120,7 @@ async def _check_pipeline_startable(
119120
catalog_client,
120121
):
121122
raise HTTPException(
122-
status_code=status.HTTP_406_NOT_ACCEPTABLE,
123+
status_code=status.HTTP_409_CONFLICT,
123124
detail=f"Project {computation.project_id} cannot run since "
124125
f"it contains deprecated tasks {jsonable_encoder(deprecated_tasks)}",
125126
)
@@ -187,25 +188,14 @@ async def _try_start_pipeline(
187188
*,
188189
project_repo: ProjectsRepository,
189190
computation: ComputationCreate,
190-
complete_dag: nx.DiGraph,
191191
minimal_dag: nx.DiGraph,
192192
project: ProjectAtDB,
193193
users_repo: UsersRepository,
194194
projects_metadata_repo: ProjectsMetadataRepository,
195-
) -> None:
195+
) -> bool:
196196
if not minimal_dag.nodes():
197-
# 2 options here: either we have cycles in the graph or it's really done
198-
if find_computational_node_cycles(complete_dag):
199-
raise HTTPException(
200-
status_code=status.HTTP_409_CONFLICT,
201-
detail=f"Project {computation.project_id} contains cycles with "
202-
"computational services which are currently not supported! Please remove them.",
203-
)
204-
# there is nothing else to be run here, so we are done
205-
raise HTTPException(
206-
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
207-
detail=f"Project {computation.project_id} has no computational services",
208-
)
197+
# there is nothing else to be run here (up-to-date or no computational services)
198+
return False
209199

210200
# Billing info
211201
wallet_id = None
@@ -239,6 +229,7 @@ async def _try_start_pipeline(
239229
use_on_demand_clusters=computation.use_on_demand_clusters,
240230
collection_run_id=computation.collection_run_id,
241231
)
232+
return True
242233

243234

244235
@router.post(
@@ -247,20 +238,20 @@ async def _try_start_pipeline(
247238
response_model=ComputationGet,
248239
status_code=status.HTTP_201_CREATED,
249240
responses={
250-
status.HTTP_404_NOT_FOUND: {
251-
"description": "Project or pricing details not found",
241+
status.HTTP_200_OK: {
242+
"description": "Pipeline is up-to-date, nothing was started",
252243
},
253-
status.HTTP_406_NOT_ACCEPTABLE: {
254-
"description": "Cluster not found",
244+
status.HTTP_404_NOT_FOUND: {
245+
"description": "Project/cluster/pricing details not found",
255246
},
256247
status.HTTP_503_SERVICE_UNAVAILABLE: {
257-
"description": "Service not available",
248+
"description": "Service not available or configuration error",
258249
},
250+
status.HTTP_402_PAYMENT_REQUIRED: {"description": "Payment required"},
251+
status.HTTP_409_CONFLICT: {"description": "Project already started or contains deprecated services"},
259252
status.HTTP_422_UNPROCESSABLE_ENTITY: {
260-
"description": "Configuration error",
253+
"description": "Invalid computation request (e.g. missing collection_run_id)",
261254
},
262-
status.HTTP_402_PAYMENT_REQUIRED: {"description": "Payment required"},
263-
status.HTTP_409_CONFLICT: {"description": "Project already started"},
264255
},
265256
)
266257
# NOTE: in case of a burst of calls to that endpoint, we might end up in a weird state.
@@ -269,6 +260,7 @@ async def _try_start_pipeline(
269260
async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disable=too-many-positional-arguments
270261
computation: ComputationCreate,
271262
request: Request,
263+
response: Response,
272264
project_repo: Annotated[ProjectsRepository, Depends(get_repository(ProjectsRepository))],
273265
comp_pipelines_repo: Annotated[CompPipelinesRepository, Depends(get_repository(CompPipelinesRepository))],
274266
comp_tasks_repo: Annotated[CompTasksRepository, Depends(get_repository(CompTasksRepository))],
@@ -293,6 +285,19 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
293285

294286
# create the complete DAG graph
295287
complete_dag = create_complete_dag(project.workbench)
288+
289+
# reject cycles involving computational nodes early (before catalog checks)
290+
if computation.start_pipeline and (computational_cycles := find_computational_node_cycles(complete_dag)):
291+
cycle_descriptions = [
292+
[complete_dag.nodes[n].get("name", n) for n in cycle] for cycle in computational_cycles
293+
]
294+
raise HTTPException(
295+
status_code=status.HTTP_409_CONFLICT,
296+
detail=f"Project {computation.project_id} contains cycles with "
297+
"computational services which are currently not supported: "
298+
f"{cycle_descriptions}. Please remove them.",
299+
)
300+
296301
# find the minimal viable graph to be run
297302
minimal_computational_dag: nx.DiGraph = await create_minimal_computational_graph_based_on_selection(
298303
complete_dag=complete_dag,
@@ -322,17 +327,19 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
322327
rabbitmq_rpc_client=rpc_client,
323328
)
324329

330+
pipeline_started = False
325331
if computation.start_pipeline:
326-
await _try_start_pipeline(
332+
pipeline_started = await _try_start_pipeline(
327333
request.app,
328334
project_repo=project_repo,
329335
computation=computation,
330-
complete_dag=complete_dag,
331336
minimal_dag=minimal_computational_dag,
332337
project=project,
333338
users_repo=users_repo,
334339
projects_metadata_repo=projects_metadata_repo,
335340
)
341+
if not pipeline_started:
342+
response.status_code = status.HTTP_200_OK
336343

337344
# get run details if any
338345
last_run: CompRunsAtDB | None = None
@@ -352,7 +359,7 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
352359
TypeAdapter(AnyHttpUrl).validate_python(
353360
f"{request.url}/{computation.project_id}:stop?user_id={computation.user_id}",
354361
)
355-
if computation.start_pipeline
362+
if computation.start_pipeline and pipeline_started
356363
else None
357364
),
358365
iteration=last_run.iteration if last_run else None,
@@ -362,16 +369,20 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
362369
submitted=last_run.created if last_run else None,
363370
)
364371

365-
except ProjectNotFoundError as e:
366-
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}") from e
367-
except ClusterNotFoundError as e:
368-
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail=f"{e}") from e
369-
except PricingPlanUnitNotFoundError as e:
372+
except (
373+
ProjectNotFoundError,
374+
ClusterNotFoundError,
375+
PricingPlanUnitNotFoundError,
376+
EC2InstanceTypeNotFoundError,
377+
) as e:
370378
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}") from e
371-
except ClustersKeeperNotAvailableError as e:
379+
380+
except (
381+
ClustersKeeperNotAvailableError,
382+
ConfigurationError,
383+
) as e:
372384
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"{e}") from e
373-
except ConfigurationError as e:
374-
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"{e}") from e
385+
375386
except WalletNotEnoughCreditsError as e:
376387
raise HTTPException(status_code=status.HTTP_402_PAYMENT_REQUIRED, detail=f"{e}") from e
377388

ā€Žservices/director-v2/src/simcore_service_director_v2/core/errors.pyā€Ž

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ class PricingPlanUnitNotFoundError(DirectorError):
5151
msg_template: str = "pricing plan not found {msg}"
5252

5353

54+
class EC2InstanceTypeNotFoundError(DirectorError):
55+
msg_template: str = "invalid EC2 instance type selected {ec2_instance_types}. TIP: adjust product configuration"
56+
57+
5458
class PipelineNotFoundError(DirectorError):
5559
msg_template: str = "pipeline {pipeline_id} not found"
5660

@@ -115,7 +119,8 @@ class MissingComputationalResourcesError(TaskSchedulingError): # pylint: disabl
115119

116120
class InsufficientComputationalResourcesError(TaskSchedulingError): # pylint: disable=too-many-ancestors
117121
msg_template: str = (
118-
"Insufficient computational resources to run {service_name}:{service_version} with {service_requested_resources} on cluster."
122+
"Insufficient computational resources to run {service_name}:{service_version} "
123+
"with {service_requested_resources} on cluster."
119124
"Cluster available workers: {cluster_available_resources}"
120125
"TIP: Reduce service required resources or contact oSparc support"
121126
)

ā€Žservices/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.pyā€Ž

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
from .....core.errors import (
5454
ClustersKeeperNotAvailableError,
55-
ConfigurationError,
55+
EC2InstanceTypeNotFoundError,
5656
WalletNotEnoughCreditsError,
5757
)
5858
from .....models.comp_tasks import CompTaskAtDB, Image, NodeSchema
@@ -286,10 +286,11 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool:
286286
else:
287287
_logger.warning("Services resource override not implemented yet for multi-container services!!!")
288288
except StopIteration as exc:
289-
msg = (
290-
f"invalid EC2 type name selected {set(hardware_info.aws_ec2_instances)}. TIP: adjust product configuration"
291-
)
292-
raise ConfigurationError(msg=msg) from exc
289+
raise EC2InstanceTypeNotFoundError(
290+
ec2_instance_types=f"{set(hardware_info.aws_ec2_instances)}",
291+
node_id=f"{node_id}",
292+
project_id=f"{project_id}",
293+
) from exc
293294
except (
294295
RemoteMethodNotRegisteredError,
295296
RPCServerError,

ā€Žservices/director-v2/src/simcore_service_director_v2/utils/dags.pyā€Ž

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,16 @@ def _node_needs_computation(graph_data: nx.classes.reportviews.NodeDataView, nod
126126

127127
async def _set_computational_nodes_states(complete_dag: nx.DiGraph) -> None:
128128
graph_data: nx.classes.reportviews.NodeDataView = complete_dag.nodes.data()
129-
for node_id in nx.algorithms.dag.topological_sort(complete_dag):
130-
if graph_data[node_id]["node_class"] is NodeClass.COMPUTATIONAL:
131-
await _compute_node_states(graph_data, node_id)
129+
# Build a subgraph of only computational nodes for topological ordering.
130+
# The complete_dag may contain cycles among dynamic services (which is valid),
131+
# but those cycles prevent topological_sort on the full graph. Since we only
132+
# process computational nodes, we only need *their* ordering to be acyclic.
133+
# State lookups (hash, dependencies) still use graph_data from the complete DAG,
134+
# so dynamic-node data remains accessible.
135+
comp_nodes = [n for n, d in graph_data if d["node_class"] is NodeClass.COMPUTATIONAL]
136+
comp_subgraph = complete_dag.subgraph(comp_nodes)
137+
for node_id in nx.algorithms.dag.topological_sort(comp_subgraph):
138+
await _compute_node_states(graph_data, node_id)
132139

133140

134141
async def create_minimal_computational_graph_based_on_selection(

0 commit comments

Comments
Ā (0)