Skip to content

Commit e266b6f

Browse files
authored
Merge branch 'master' into feature/listenToProjectDocumentWS
2 parents c7071b8 + 5f20287 commit e266b6f

File tree

53 files changed

+279
-191
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+279
-191
lines changed

packages/service-library/src/servicelib/aiohttp/rest_middlewares.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def handle_aiohttp_web_http_error(
118118
exception.content_type = MIMETYPE_APPLICATION_JSON
119119
if exception.reason:
120120
exception.set_status(
121-
exception.status, safe_status_message(message=exception.reason)
121+
exception.status, reason=safe_status_message(message=exception.reason)
122122
)
123123

124124
if not exception.text or not is_enveloped_from_text(exception.text):
@@ -165,7 +165,7 @@ def _handle_aiohttp_web_http_successful(
165165
exception.content_type = MIMETYPE_APPLICATION_JSON
166166
if exception.reason:
167167
exception.set_status(
168-
exception.status, safe_status_message(message=exception.reason)
168+
exception.status, reason=safe_status_message(message=exception.reason)
169169
)
170170

171171
if exception.text and not is_enveloped_from_text(exception.text):

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
ProjectNotFoundError,
5959
WalletNotEnoughCreditsError,
6060
)
61-
from ...models.comp_pipelines import CompPipelineAtDB
6261
from ...models.comp_runs import CompRunsAtDB, ProjectMetadataDict, RunMetadataDict
6362
from ...models.comp_tasks import CompTaskAtDB
6463
from ...modules.catalog import CatalogClient
@@ -537,10 +536,8 @@ async def stop_computation(
537536
# check the project exists
538537
await project_repo.get_project(project_id)
539538
# get the project pipeline
540-
pipeline_at_db: CompPipelineAtDB = await comp_pipelines_repo.get_pipeline(
541-
project_id
542-
)
543-
pipeline_dag: nx.DiGraph = pipeline_at_db.get_graph()
539+
pipeline_at_db = await comp_pipelines_repo.get_pipeline(project_id)
540+
pipeline_dag = pipeline_at_db.get_graph()
544541
# get the project task states
545542
tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id)
546543
# create the complete DAG graph

services/director-v2/src/simcore_service_director_v2/utils/dags.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def create_complete_dag(workbench: NodesDict) -> nx.DiGraph:
4242
)
4343
if node.input_nodes:
4444
for input_node_id in node.input_nodes:
45-
predecessor_node = workbench.get(NodeIDStr(input_node_id))
45+
predecessor_node = workbench.get(f"{input_node_id}")
4646
if predecessor_node:
4747
dag_graph.add_edge(str(input_node_id), node_id)
4848

@@ -95,19 +95,18 @@ async def get_node_io_payload_cb(node_id: NodeID) -> dict[str, Any]:
9595
return result
9696

9797
computed_hash = await compute_node_hash(node_id, get_node_io_payload_cb)
98-
if computed_hash != node["run_hash"]:
99-
return True
100-
return False
98+
return bool(computed_hash != node["run_hash"])
10199

102100

103101
async def _compute_node_dependencies_state(graph_data, node_id) -> set[NodeID]:
104102
node = graph_data[f"{node_id}"]
105103
# check if the previous node is outdated or waits for dependencies... in which case this one has to wait
106104
non_computed_dependencies: set[NodeID] = set()
107105
for input_port in node.get("inputs", {}).values():
108-
if isinstance(input_port, PortLink):
109-
if _node_needs_computation(graph_data, input_port.node_uuid):
110-
non_computed_dependencies.add(input_port.node_uuid)
106+
if isinstance(input_port, PortLink) and _node_needs_computation(
107+
graph_data, input_port.node_uuid
108+
):
109+
non_computed_dependencies.add(input_port.node_uuid)
111110
# all good. ready
112111
return non_computed_dependencies
113112

@@ -188,14 +187,14 @@ def compute_pipeline_started_timestamp(
188187
if not pipeline_dag.nodes:
189188
return None
190189
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
191-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
190+
f"{task.node_id}": task for task in comp_tasks
192191
}
193-
TOMORROW = arrow.utcnow().shift(days=1).datetime
192+
tomorrow = arrow.utcnow().shift(days=1).datetime
194193
pipeline_started_at: datetime.datetime | None = min(
195-
node_id_to_comp_task[node_id].start or TOMORROW
194+
node_id_to_comp_task[node_id].start or tomorrow
196195
for node_id in pipeline_dag.nodes
197196
)
198-
if pipeline_started_at == TOMORROW:
197+
if pipeline_started_at == tomorrow:
199198
pipeline_started_at = None
200199
return pipeline_started_at
201200

@@ -206,13 +205,13 @@ def compute_pipeline_stopped_timestamp(
206205
if not pipeline_dag.nodes:
207206
return None
208207
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
209-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
208+
f"{task.node_id}": task for task in comp_tasks
210209
}
211-
TOMORROW = arrow.utcnow().shift(days=1).datetime
210+
tomorrow = arrow.utcnow().shift(days=1).datetime
212211
pipeline_stopped_at: datetime.datetime | None = max(
213-
node_id_to_comp_task[node_id].end or TOMORROW for node_id in pipeline_dag.nodes
212+
node_id_to_comp_task[node_id].end or tomorrow for node_id in pipeline_dag.nodes
214213
)
215-
if pipeline_stopped_at == TOMORROW:
214+
if pipeline_stopped_at == tomorrow:
216215
pipeline_stopped_at = None
217216
return pipeline_stopped_at
218217

@@ -227,15 +226,15 @@ async def compute_pipeline_details(
227226

228227
# NOTE: the latest progress is available in comp_tasks only
229228
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
230-
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
229+
f"{task.node_id}": task for task in comp_tasks
231230
}
232231
pipeline_progress = None
233232
if len(pipeline_dag.nodes) > 0:
234-
235233
pipeline_progress = sum(
236234
(node_id_to_comp_task[node_id].progress or 0) / len(pipeline_dag.nodes)
237235
for node_id in pipeline_dag.nodes
238-
if node_id_to_comp_task[node_id].progress is not None
236+
if node_id in node_id_to_comp_task
237+
and node_id_to_comp_task[node_id].progress is not None
239238
)
240239
pipeline_progress = max(0.0, min(pipeline_progress, 1.0))
241240

@@ -246,10 +245,15 @@ async def compute_pipeline_details(
246245
node_id: NodeState(
247246
modified=node_data.get(kNODE_MODIFIED_STATE, False),
248247
dependencies=node_data.get(kNODE_DEPENDENCIES_TO_COMPUTE, set()),
249-
current_status=node_id_to_comp_task[node_id].state,
248+
current_status=(
249+
node_id_to_comp_task[node_id].state
250+
if node_id in node_id_to_comp_task
251+
else RunningState.UNKNOWN
252+
),
250253
progress=(
251254
node_id_to_comp_task[node_id].progress
252-
if node_id_to_comp_task[node_id].progress is not None
255+
if node_id in node_id_to_comp_task
256+
and node_id_to_comp_task[node_id].progress is not None
253257
else None
254258
),
255259
)
@@ -261,12 +265,13 @@ async def compute_pipeline_details(
261265

262266
def find_computational_node_cycles(dag: nx.DiGraph) -> list[list[str]]:
263267
"""returns a list of nodes part of a cycle and computational, which is currently forbidden."""
264-
computational_node_cycles = []
268+
265269
list_potential_cycles = nx.algorithms.cycles.simple_cycles(dag)
266-
for cycle in list_potential_cycles:
270+
return [
271+
deepcopy(cycle)
272+
for cycle in list_potential_cycles
267273
if any(
268274
dag.nodes[node_id]["node_class"] is NodeClass.COMPUTATIONAL
269275
for node_id in cycle
270-
):
271-
computational_node_cycles.append(deepcopy(cycle))
272-
return computational_node_cycles
276+
)
277+
]

services/director-v2/tests/unit/test_utils_dags.py

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,13 +390,6 @@ def pipeline_test_params(
390390
list_comp_tasks: list[CompTaskAtDB],
391391
expected_pipeline_details_output: PipelineDetails,
392392
) -> PipelineDetailsTestParams:
393-
# check the inputs make sense
394-
assert len(set(dag_adjacency)) == len(node_keys) == len(list_comp_tasks)
395-
assert dag_adjacency.keys() == node_keys.keys()
396-
assert len(
397-
{t.node_id for t in list_comp_tasks}.intersection(node_keys.keys())
398-
) == len(set(dag_adjacency))
399-
400393
# resolve the naming
401394
node_name_to_uuid_map = {}
402395
resolved_dag_adjacency: dict[str, list[str]] = {}
@@ -596,3 +589,95 @@ async def test_compute_pipeline_details(
596589
received_details.model_dump()
597590
== pipeline_test_params.expected_pipeline_details.model_dump()
598591
)
592+
593+
594+
@pytest.mark.parametrize(
595+
"dag_adjacency, node_keys, list_comp_tasks, expected_pipeline_details_output",
596+
[
597+
pytest.param(
598+
{"node_1": ["node_2", "node_3"], "node_2": ["node_3"], "node_3": []},
599+
{
600+
"node_1": {
601+
"key": "simcore/services/comp/fake",
602+
"node_class": NodeClass.COMPUTATIONAL,
603+
"state": RunningState.NOT_STARTED,
604+
"outputs": None,
605+
},
606+
"node_2": {
607+
"key": "simcore/services/comp/fake",
608+
"node_class": NodeClass.COMPUTATIONAL,
609+
"state": RunningState.NOT_STARTED,
610+
"outputs": None,
611+
},
612+
"node_3": {
613+
"key": "simcore/services/comp/fake",
614+
"node_class": NodeClass.COMPUTATIONAL,
615+
"state": RunningState.NOT_STARTED,
616+
"outputs": None,
617+
},
618+
},
619+
[
620+
# NOTE: we use construct here to be able to use non uuid names to simplify test setup
621+
CompTaskAtDB.model_construct(
622+
project_id=uuid4(),
623+
node_id="node_1",
624+
schema=NodeSchema(inputs={}, outputs={}),
625+
inputs=None,
626+
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
627+
state=RunningState.NOT_STARTED,
628+
internal_id=2,
629+
node_class=NodeClass.COMPUTATIONAL,
630+
created=datetime.datetime.now(tz=datetime.UTC),
631+
modified=datetime.datetime.now(tz=datetime.UTC),
632+
last_heartbeat=None,
633+
),
634+
CompTaskAtDB.model_construct(
635+
project_id=uuid4(),
636+
node_id="node_2",
637+
schema=NodeSchema(inputs={}, outputs={}),
638+
inputs=None,
639+
image=Image(name="simcore/services/comp/fake", tag="1.3.4"),
640+
state=RunningState.NOT_STARTED,
641+
internal_id=3,
642+
node_class=NodeClass.COMPUTATIONAL,
643+
created=datetime.datetime.now(tz=datetime.UTC),
644+
modified=datetime.datetime.now(tz=datetime.UTC),
645+
last_heartbeat=None,
646+
),
647+
],
648+
PipelineDetails.model_construct(
649+
adjacency_list={
650+
"node_1": ["node_2", "node_3"],
651+
"node_2": ["node_3"],
652+
"node_3": [],
653+
},
654+
progress=0.0,
655+
node_states={
656+
"node_1": NodeState(modified=True, progress=None),
657+
"node_2": NodeState(modified=True, progress=None),
658+
"node_3": NodeState(
659+
modified=True,
660+
progress=None,
661+
current_status=RunningState.UNKNOWN,
662+
),
663+
},
664+
),
665+
id="dag with missing tasks (node 3 is missing, so it is not skipped in the pipeline details)",
666+
)
667+
],
668+
)
669+
@pytest.mark.acceptance_test(
670+
"For https://github.com/ITISFoundation/osparc-simcore/issues/8172"
671+
)
672+
async def test_compute_pipeline_details_with_missing_tasks(
673+
pipeline_test_params: PipelineDetailsTestParams,
674+
):
675+
received_details = await compute_pipeline_details(
676+
pipeline_test_params.complete_dag,
677+
pipeline_test_params.pipeline_dag,
678+
pipeline_test_params.comp_tasks,
679+
)
680+
assert (
681+
received_details.model_dump()
682+
== pipeline_test_params.expected_pipeline_details.model_dump()
683+
)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_notifier.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
)
1212
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
1313
from models_library.api_schemas_webserver.socketio import SocketIORoomStr
14-
from models_library.users import UserID
14+
from models_library.projects import ProjectID
1515
from servicelib.fastapi.app_state import SingletonInAppStateMixin
1616
from servicelib.services_utils import get_status_as_dict
1717

@@ -23,20 +23,22 @@ def __init__(self, sio_manager: socketio.AsyncAioPikaManager):
2323
self._sio_manager = sio_manager
2424

2525
async def notify_service_status(
26-
self, user_id: UserID, status: NodeGet | DynamicServiceGet | NodeGetIdle
26+
self, project_id: ProjectID, status: NodeGet | DynamicServiceGet | NodeGetIdle
2727
) -> None:
2828
await self._sio_manager.emit(
2929
SOCKET_IO_SERVICE_STATUS_EVENT,
3030
data=jsonable_encoder(get_status_as_dict(status)),
31-
room=SocketIORoomStr.from_user_id(user_id),
31+
room=SocketIORoomStr.from_project_id(project_id),
3232
)
3333

3434

3535
async def notify_service_status_change(
36-
app: FastAPI, user_id: UserID, status: NodeGet | DynamicServiceGet | NodeGetIdle
36+
app: FastAPI,
37+
project_id: ProjectID,
38+
status: NodeGet | DynamicServiceGet | NodeGetIdle,
3739
) -> None:
3840
notifier: Notifier = Notifier.get_from_app_state(app)
39-
await notifier.notify_service_status(user_id=user_id, status=status)
41+
await notifier.notify_service_status(project_id=project_id, status=status)
4042

4143

4244
async def lifespan(app: FastAPI) -> AsyncIterator[State]:

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from ._api import (
22
NORMAL_RATE_POLL_INTERVAL,
33
get_all_tracked_services,
4+
get_project_id_for_service,
45
get_tracked_service,
5-
get_user_id_for_service,
66
remove_tracked_service,
77
set_frontend_notified_for_service,
88
set_if_status_changed_for_service,
@@ -17,11 +17,11 @@
1717

1818
__all__: tuple[str, ...] = (
1919
"get_all_tracked_services",
20+
"get_project_id_for_service",
2021
"get_tracked_service",
21-
"get_user_id_for_service",
22-
"service_tracker_lifespan",
2322
"NORMAL_RATE_POLL_INTERVAL",
2423
"remove_tracked_service",
24+
"service_tracker_lifespan",
2525
"set_frontend_notified_for_service",
2626
"set_if_status_changed_for_service",
2727
"set_request_as_running",

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_api.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,16 @@
1111
DynamicServiceStop,
1212
)
1313
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
14+
from models_library.projects import ProjectID
1415
from models_library.projects_nodes_io import NodeID
1516
from models_library.services_enums import ServiceState
16-
from models_library.users import UserID
1717
from servicelib.deferred_tasks import TaskUID
1818

19-
from ._models import SchedulerServiceState, TrackedServiceModel, UserRequestedState
19+
from ._models import (
20+
SchedulerServiceState,
21+
TrackedServiceModel,
22+
UserRequestedState,
23+
)
2024
from ._setup import get_tracker
2125

2226
_logger = logging.getLogger(__name__)
@@ -242,7 +246,7 @@ async def get_all_tracked_services(app: FastAPI) -> dict[NodeID, TrackedServiceM
242246
return await get_tracker(app).all()
243247

244248

245-
async def get_user_id_for_service(app: FastAPI, node_id: NodeID) -> UserID | None:
246-
"""returns user_id for the service"""
249+
async def get_project_id_for_service(app: FastAPI, node_id: NodeID) -> ProjectID | None:
250+
"""returns project_id for the service"""
247251
model: TrackedServiceModel | None = await get_tracker(app).load(node_id)
248-
return model.user_id if model else None
252+
return model.project_id if model else None

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
RunningDynamicServiceDetails,
88
)
99
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
10+
from models_library.projects import ProjectID
1011
from models_library.projects_nodes_io import NodeID
11-
from models_library.users import UserID
1212
from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID
1313
from servicelib.deferred_tasks._base_deferred_handler import DeferredContext
1414

@@ -69,15 +69,15 @@ async def on_result(
6969
if await service_tracker.should_notify_frontend_for_service(
7070
app, node_id, status_changed=status_changed
7171
):
72-
user_id: UserID | None = await service_tracker.get_user_id_for_service(
73-
app, node_id
72+
project_id: ProjectID | None = (
73+
await service_tracker.get_project_id_for_service(app, node_id)
7474
)
75-
if user_id:
76-
await notify_service_status_change(app, user_id, result)
75+
if project_id:
76+
await notify_service_status_change(app, project_id, result)
7777
await service_tracker.set_frontend_notified_for_service(app, node_id)
7878
else:
7979
_logger.info(
80-
"Did not find a user for '%s', skipping status delivery of: %s",
80+
"Did not find a project for '%s', skipping status delivery of: %s",
8181
node_id,
8282
result,
8383
)

0 commit comments

Comments
 (0)