Skip to content

Commit 3b8a3ba

Browse files
Merge branch 'master' into is5646/use-project-nodes-table-instead-of-workbench
2 parents dbf42b6 + 68679ab commit 3b8a3ba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+254
-172
lines changed

packages/service-library/src/servicelib/aiohttp/rest_middlewares.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def handle_aiohttp_web_http_error(
118118
exception.content_type = MIMETYPE_APPLICATION_JSON
119119
if exception.reason:
120120
exception.set_status(
121-
exception.status, safe_status_message(message=exception.reason)
121+
exception.status, reason=safe_status_message(message=exception.reason)
122122
)
123123

124124
if not exception.text or not is_enveloped_from_text(exception.text):
@@ -165,7 +165,7 @@ def _handle_aiohttp_web_http_successful(
165165
exception.content_type = MIMETYPE_APPLICATION_JSON
166166
if exception.reason:
167167
exception.set_status(
168-
exception.status, safe_status_message(message=exception.reason)
168+
exception.status, reason=safe_status_message(message=exception.reason)
169169
)
170170

171171
if exception.text and not is_enveloped_from_text(exception.text):

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
ProjectNotFoundError,
5959
WalletNotEnoughCreditsError,
6060
)
61-
from ...models.comp_pipelines import CompPipelineAtDB
6261
from ...models.comp_runs import CompRunsAtDB, ProjectMetadataDict, RunMetadataDict
6362
from ...models.comp_tasks import CompTaskAtDB
6463
from ...modules.catalog import CatalogClient
@@ -537,10 +536,8 @@ async def stop_computation(
537536
# check the project exists
538537
await project_repo.get_project(project_id)
539538
# get the project pipeline
540-
pipeline_at_db: CompPipelineAtDB = await comp_pipelines_repo.get_pipeline(
541-
project_id
542-
)
543-
pipeline_dag: nx.DiGraph = pipeline_at_db.get_graph()
539+
pipeline_at_db = await comp_pipelines_repo.get_pipeline(project_id)
540+
pipeline_dag = pipeline_at_db.get_graph()
544541
# get the project task states
545542
tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id)
546543
# create the complete DAG graph

services/director-v2/src/simcore_service_director_v2/utils/dags.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def create_complete_dag(workbench: NodesDict) -> nx.DiGraph:
4343
)
4444
if node.input_nodes:
4545
for input_node_id in node.input_nodes:
46-
predecessor_node = workbench.get(NodeIDStr(input_node_id))
46+
predecessor_node = workbench.get(f"{input_node_id}")
4747
assert (
4848
predecessor_node
4949
), f"Node {input_node_id} not found in workbench" # nosec
@@ -99,19 +99,18 @@ async def get_node_io_payload_cb(node_id: NodeID) -> dict[str, Any]:
9999
return result
100100

101101
computed_hash = await compute_node_hash(node_id, get_node_io_payload_cb)
102-
if computed_hash != node["run_hash"]:
103-
return True
104-
return False
102+
return bool(computed_hash != node["run_hash"])
105103

106104

107105
async def _compute_node_dependencies_state(graph_data, node_id) -> set[NodeID]:
108106
node = graph_data[f"{node_id}"]
109107
# check if the previous node is outdated or waits for dependencies... in which case this one has to wait
110108
non_computed_dependencies: set[NodeID] = set()
111109
for input_port in node.get("inputs", {}).values():
112-
if isinstance(input_port, PortLink):
113-
if _node_needs_computation(graph_data, input_port.node_uuid):
114-
non_computed_dependencies.add(input_port.node_uuid)
110+
if isinstance(input_port, PortLink) and _node_needs_computation(
111+
graph_data, input_port.node_uuid
112+
):
113+
non_computed_dependencies.add(input_port.node_uuid)
115114
# all good. ready
116115
return non_computed_dependencies
117116

@@ -192,14 +191,14 @@ def compute_pipeline_started_timestamp(
192191
if not pipeline_dag.nodes:
193192
return None
194193
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
195-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
194+
f"{task.node_id}": task for task in comp_tasks
196195
}
197-
TOMORROW = arrow.utcnow().shift(days=1).datetime
196+
tomorrow = arrow.utcnow().shift(days=1).datetime
198197
pipeline_started_at: datetime.datetime | None = min(
199-
node_id_to_comp_task[node_id].start or TOMORROW
198+
node_id_to_comp_task[node_id].start or tomorrow
200199
for node_id in pipeline_dag.nodes
201200
)
202-
if pipeline_started_at == TOMORROW:
201+
if pipeline_started_at == tomorrow:
203202
pipeline_started_at = None
204203
return pipeline_started_at
205204

@@ -210,13 +209,13 @@ def compute_pipeline_stopped_timestamp(
210209
if not pipeline_dag.nodes:
211210
return None
212211
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
213-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
212+
f"{task.node_id}": task for task in comp_tasks
214213
}
215-
TOMORROW = arrow.utcnow().shift(days=1).datetime
214+
tomorrow = arrow.utcnow().shift(days=1).datetime
216215
pipeline_stopped_at: datetime.datetime | None = max(
217-
node_id_to_comp_task[node_id].end or TOMORROW for node_id in pipeline_dag.nodes
216+
node_id_to_comp_task[node_id].end or tomorrow for node_id in pipeline_dag.nodes
218217
)
219-
if pipeline_stopped_at == TOMORROW:
218+
if pipeline_stopped_at == tomorrow:
220219
pipeline_stopped_at = None
221220
return pipeline_stopped_at
222221

@@ -231,15 +230,15 @@ async def compute_pipeline_details(
231230

232231
# NOTE: the latest progress is available in comp_tasks only
233232
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
234-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
233+
f"{task.node_id}": task for task in comp_tasks
235234
}
236235
pipeline_progress = None
237236
if len(pipeline_dag.nodes) > 0:
238-
239237
pipeline_progress = sum(
240238
(node_id_to_comp_task[node_id].progress or 0) / len(pipeline_dag.nodes)
241239
for node_id in pipeline_dag.nodes
242-
if node_id_to_comp_task[node_id].progress is not None
240+
if node_id in node_id_to_comp_task
241+
and node_id_to_comp_task[node_id].progress is not None
243242
)
244243
pipeline_progress = max(0.0, min(pipeline_progress, 1.0))
245244

@@ -250,10 +249,15 @@ async def compute_pipeline_details(
250249
node_id: NodeState(
251250
modified=node_data.get(kNODE_MODIFIED_STATE, False),
252251
dependencies=node_data.get(kNODE_DEPENDENCIES_TO_COMPUTE, set()),
253-
current_status=node_id_to_comp_task[node_id].state,
252+
current_status=(
253+
node_id_to_comp_task[node_id].state
254+
if node_id in node_id_to_comp_task
255+
else RunningState.UNKNOWN
256+
),
254257
progress=(
255258
node_id_to_comp_task[node_id].progress
256-
if node_id_to_comp_task[node_id].progress is not None
259+
if node_id in node_id_to_comp_task
260+
and node_id_to_comp_task[node_id].progress is not None
257261
else None
258262
),
259263
)
@@ -265,12 +269,13 @@ async def compute_pipeline_details(
265269

266270
def find_computational_node_cycles(dag: nx.DiGraph) -> list[list[str]]:
267271
"""returns a list of nodes part of a cycle and computational, which is currently forbidden."""
268-
computational_node_cycles = []
272+
269273
list_potential_cycles = nx.algorithms.cycles.simple_cycles(dag)
270-
for cycle in list_potential_cycles:
274+
return [
275+
deepcopy(cycle)
276+
for cycle in list_potential_cycles
271277
if any(
272278
dag.nodes[node_id]["node_class"] is NodeClass.COMPUTATIONAL
273279
for node_id in cycle
274-
):
275-
computational_node_cycles.append(deepcopy(cycle))
276-
return computational_node_cycles
280+
)
281+
]

services/director-v2/tests/unit/test_utils_dags.py

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,13 +390,6 @@ def pipeline_test_params(
390390
list_comp_tasks: list[CompTaskAtDB],
391391
expected_pipeline_details_output: PipelineDetails,
392392
) -> PipelineDetailsTestParams:
393-
# check the inputs make sense
394-
assert len(set(dag_adjacency)) == len(node_keys) == len(list_comp_tasks)
395-
assert dag_adjacency.keys() == node_keys.keys()
396-
assert len(
397-
{t.node_id for t in list_comp_tasks}.intersection(node_keys.keys())
398-
) == len(set(dag_adjacency))
399-
400393
# resolve the naming
401394
node_name_to_uuid_map = {}
402395
resolved_dag_adjacency: dict[str, list[str]] = {}
@@ -596,3 +589,95 @@ async def test_compute_pipeline_details(
596589
received_details.model_dump()
597590
== pipeline_test_params.expected_pipeline_details.model_dump()
598591
)
592+
593+
594+
@pytest.mark.parametrize(
595+
"dag_adjacency, node_keys, list_comp_tasks, expected_pipeline_details_output",
596+
[
597+
pytest.param(
598+
{"node_1": ["node_2", "node_3"], "node_2": ["node_3"], "node_3": []},
599+
{
600+
"node_1": {
601+
"key": "simcore/services/comp/fake",
602+
"node_class": NodeClass.COMPUTATIONAL,
603+
"state": RunningState.NOT_STARTED,
604+
"outputs": None,
605+
},
606+
"node_2": {
607+
"key": "simcore/services/comp/fake",
608+
"node_class": NodeClass.COMPUTATIONAL,
609+
"state": RunningState.NOT_STARTED,
610+
"outputs": None,
611+
},
612+
"node_3": {
613+
"key": "simcore/services/comp/fake",
614+
"node_class": NodeClass.COMPUTATIONAL,
615+
"state": RunningState.NOT_STARTED,
616+
"outputs": None,
617+
},
618+
},
619+
[
620+
# NOTE: we use construct here to be able to use non uuid names to simplify test setup
621+
CompTaskAtDB.model_construct(
622+
project_id=uuid4(),
623+
node_id="node_1",
624+
schema=NodeSchema(inputs={}, outputs={}),
625+
inputs=None,
626+
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
627+
state=RunningState.NOT_STARTED,
628+
internal_id=2,
629+
node_class=NodeClass.COMPUTATIONAL,
630+
created=datetime.datetime.now(tz=datetime.UTC),
631+
modified=datetime.datetime.now(tz=datetime.UTC),
632+
last_heartbeat=None,
633+
),
634+
CompTaskAtDB.model_construct(
635+
project_id=uuid4(),
636+
node_id="node_2",
637+
schema=NodeSchema(inputs={}, outputs={}),
638+
inputs=None,
639+
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
640+
state=RunningState.NOT_STARTED,
641+
internal_id=3,
642+
node_class=NodeClass.COMPUTATIONAL,
643+
created=datetime.datetime.now(tz=datetime.UTC),
644+
modified=datetime.datetime.now(tz=datetime.UTC),
645+
last_heartbeat=None,
646+
),
647+
],
648+
PipelineDetails.model_construct(
649+
adjacency_list={
650+
"node_1": ["node_2", "node_3"],
651+
"node_2": ["node_3"],
652+
"node_3": [],
653+
},
654+
progress=0.0,
655+
node_states={
656+
"node_1": NodeState(modified=True, progress=None),
657+
"node_2": NodeState(modified=True, progress=None),
658+
"node_3": NodeState(
659+
modified=True,
660+
progress=None,
661+
current_status=RunningState.UNKNOWN,
662+
),
663+
},
664+
),
665+
id="dag with missing tasks (node 3 is missing, so it is not skipped in the pipeline details)",
666+
)
667+
],
668+
)
669+
@pytest.mark.acceptance_test(
670+
"For https://github.com/ITISFoundation/osparc-simcore/issues/8172"
671+
)
672+
async def test_compute_pipeline_details_with_missing_tasks(
673+
pipeline_test_params: PipelineDetailsTestParams,
674+
):
675+
received_details = await compute_pipeline_details(
676+
pipeline_test_params.complete_dag,
677+
pipeline_test_params.pipeline_dag,
678+
pipeline_test_params.comp_tasks,
679+
)
680+
assert (
681+
received_details.model_dump()
682+
== pipeline_test_params.expected_pipeline_details.model_dump()
683+
)

services/web/server/src/simcore_service_webserver/api_keys/_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ async def get_api_key(
7777
if api_key is not None:
7878
return api_key
7979

80-
raise ApiKeyNotFoundError(api_key_id=api_key_id)
80+
raise ApiKeyNotFoundError(
81+
api_key_id=api_key_id, product_name=product_name, user_id=user_id
82+
)
8183

8284

8385
async def list_api_keys(
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from ..errors import WebServerBaseError
22

33

4-
class ApiKeysValueError(WebServerBaseError, ValueError):
5-
...
4+
class ApiKeysValueError(WebServerBaseError, ValueError): ...
65

76

87
class ApiKeyDuplicatedDisplayNameError(ApiKeysValueError):
9-
msg_template = "API Key with display name '{display_name}' already exists. {reason}"
8+
msg_template = (
9+
"API Key with display name '{display_name}' already exists: {details}"
10+
)
1011

1112

1213
class ApiKeyNotFoundError(ApiKeysValueError):
13-
msg_template = "API Key with ID '{api_key_id}' not found. {reason}"
14+
msg_template = "API Key with ID '{api_key_id}' not found: {details}"

services/web/server/src/simcore_service_webserver/director_v2/_client_base.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import aiohttp
55
from aiohttp import ClientSession, ClientTimeout, web
66
from servicelib.aiohttp import status
7+
from servicelib.mimetype_constants import MIMETYPE_APPLICATION_JSON
78
from tenacity import retry
89
from tenacity.before_sleep import before_sleep_log
910
from tenacity.stop import stop_after_attempt
@@ -36,13 +37,14 @@
3637

3738

3839
def _get_exception_from(
39-
status_code: int, on_error: _StatusToExceptionMapping | None, reason: str, url: URL
40+
status_code: int, on_error: _StatusToExceptionMapping | None, details: str, url: URL
4041
):
4142
if on_error and status_code in on_error:
42-
exc, exc_ctx = on_error[status_code]
43-
return exc(**exc_ctx, status=status_code, reason=reason)
43+
exc_cls, exc_ctx = on_error[status_code]
44+
return exc_cls(**exc_ctx, status=status_code, details=details)
45+
4446
# default
45-
return DirectorV2ServiceError(status=status_code, reason=reason, url=url)
47+
return DirectorV2ServiceError(status=status_code, details=details, url=url)
4648

4749

4850
@retry(**DEFAULT_RETRY_POLICY)
@@ -61,13 +63,13 @@ async def _make_request(
6163
) as response:
6264
payload: dict[str, Any] | list[dict[str, Any]] | None | str = (
6365
await response.json()
64-
if response.content_type == "application/json"
66+
if response.content_type == MIMETYPE_APPLICATION_JSON
6567
else await response.text()
6668
)
6769

6870
if response.status != expected_status.status_code:
6971
raise _get_exception_from(
70-
response.status, on_error, reason=f"{payload}", url=url
72+
response.status, on_error, details=f"{payload}", url=url
7173
)
7274
return payload
7375

@@ -99,13 +101,13 @@ async def request_director_v2(
99101
except TimeoutError as err:
100102
raise DirectorV2ServiceError(
101103
status=status.HTTP_503_SERVICE_UNAVAILABLE,
102-
reason=f"request to director-v2 timed-out: {err}",
104+
details=f"request to director-v2 timed-out: {err}",
103105
url=url,
104106
) from err
105107

106108
except aiohttp.ClientError as err:
107109
raise DirectorV2ServiceError(
108110
status=status.HTTP_503_SERVICE_UNAVAILABLE,
109-
reason=f"request to director-v2 service unexpected error {err}",
111+
details=f"request to director-v2 service unexpected error {err}",
110112
url=url,
111113
) from err

services/web/server/src/simcore_service_webserver/director_v2/_controller/_rest_exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ async def _handler_director_service_error_as_503_or_4xx(
9595
WalletNotEnoughCreditsError: HttpErrorInfo(
9696
status.HTTP_402_PAYMENT_REQUIRED,
9797
user_message(
98-
"Your wallet does not have sufficient credits to run this computation. {reason}",
98+
"Your wallet does not have sufficient credits to run this computation: {details}",
9999
_version=1,
100100
),
101101
),

services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ async def start_computation(request: web.Request) -> web.Response:
9595
now = datetime.now(UTC)
9696
if now - created_at > timedelta(minutes=5):
9797
raise web.HTTPBadRequest(
98-
reason=(
98+
text=(
9999
"This client generated collection is not new, "
100100
"it was created more than 5 minutes ago. "
101101
"Therefore, the client is probably wrongly generating it."

services/web/server/src/simcore_service_webserver/director_v2/exceptions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
class DirectorV2ServiceError(WebServerBaseError, RuntimeError):
99
"""Basic exception for errors raised by director-v2"""
1010

11-
msg_template = "Unexpected error: director-v2 returned '{status}', reason '{reason}' after calling '{url}'"
11+
msg_template = "Unexpected error: director-v2 returned '{status}', details '{details}' after calling '{url}'"
1212

13-
def __init__(self, *, status: int, reason: str, **ctx: Any) -> None:
13+
def __init__(self, *, status: int, details: str, **ctx: Any) -> None:
1414
super().__init__(**ctx)
1515
self.status = status
16-
self.reason = reason
16+
self.details = details
1717

1818

1919
class ComputationNotFoundError(DirectorV2ServiceError):

0 commit comments

Comments
 (0)