Skip to content

Commit 2e2779e

Browse files
add RPC communication
1 parent f887e3e commit 2e2779e

File tree

11 files changed

+110
-22
lines changed

11 files changed

+110
-22
lines changed

packages/models-library/src/models_library/rpc/webserver/projects/__init__.py

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from pydantic import BaseModel
2+
3+
4+
class ProjectNodeGet(BaseModel):
5+
key: str
6+
version: str
7+
label: str
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import logging
2+
3+
from models_library.api_schemas_webserver import WEBSERVER_RPC_NAMESPACE
4+
from models_library.api_schemas_webserver.projects import ProjectID
5+
from models_library.projects_nodes import Node
6+
from models_library.rabbitmq_basic_types import RPCMethodName
7+
from models_library.rpc.webserver.projects.projects_nodes import ProjectNodeGet
8+
from pydantic import TypeAdapter
9+
from servicelib.logging_utils import log_decorator
10+
from servicelib.rabbitmq import RabbitMQRPCClient
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
@log_decorator(_logger, level=logging.DEBUG)
16+
async def list_project_nodes(
17+
rpc_client: RabbitMQRPCClient,
18+
*,
19+
project_uuid: ProjectID,
20+
) -> list[Node]:
21+
result: list[ProjectNodeGet] = await rpc_client.request(
22+
WEBSERVER_RPC_NAMESPACE,
23+
TypeAdapter(RPCMethodName).validate_python("list_project_nodes"),
24+
project_uuid=project_uuid,
25+
)
26+
return TypeAdapter(list[Node]).validate_python(result)

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
from servicelib.async_utils import run_sequentially_in_context
3838
from servicelib.logging_utils import log_decorator
3939
from servicelib.rabbitmq import RabbitMQRPCClient
40+
from servicelib.rabbitmq.rpc_interfaces.webserver.projects import (
41+
projects_nodes as projects_nodes_rpc,
42+
)
4043
from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError
4144
from starlette import status
4245
from starlette.requests import Request
@@ -301,9 +304,14 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
301304

302305
# check if current state allow to modify the computation
303306
await _check_pipeline_not_running_or_raise_409(comp_tasks_repo, computation)
304-
307+
workbench = {
308+
node.key: node
309+
for node in await projects_nodes_rpc.list_project_nodes(
310+
rpc_client, project_uuid=computation.project_id
311+
)
312+
}
305313
# create the complete DAG graph
306-
complete_dag = create_complete_dag(project.workbench)
314+
complete_dag = create_complete_dag(workbench)
307315
# find the minimal viable graph to be run
308316
minimal_computational_dag: nx.DiGraph = (
309317
await create_minimal_computational_graph_based_on_selection(

services/director-v2/src/simcore_service_director_v2/utils/dags.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ def create_complete_dag(workbench: NodesDict) -> nx.DiGraph:
4242
)
4343
if node.input_nodes:
4444
for input_node_id in node.input_nodes:
45-
predecessor_node = workbench.get(NodeIDStr(input_node_id))
45+
predecessor_node = workbench.get(f"{input_node_id}")
4646
if predecessor_node:
47-
dag_graph.add_edge(str(input_node_id), node_id)
47+
dag_graph.add_edge(f"{input_node_id}", node_id)
4848

4949
return dag_graph
5050

services/web/server/src/simcore_service_webserver/projects/_projects_nodes_repository.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@
3636
]
3737

3838

39-
async def get_node(
39+
async def get_project_node(
4040
app: web.Application,
4141
connection: AsyncConnection | None = None,
4242
*,
43-
project_id: ProjectID,
43+
project_uuid: ProjectID,
4444
node_id: NodeID,
4545
) -> Node:
4646
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
4747
get_stmt = sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where(
48-
(projects_nodes.c.project_uuid == f"{project_id}")
48+
(projects_nodes.c.project_uuid == f"{project_uuid}")
4949
& (projects_nodes.c.node_id == f"{node_id}")
5050
)
5151

@@ -55,44 +55,48 @@ async def get_node(
5555
row = await result.first()
5656
if row is None:
5757
raise NodeNotFoundError(
58-
project_uuid=f"{project_id}", node_uuid=f"{node_id}"
58+
project_uuid=f"{project_uuid}", node_uuid=f"{node_id}"
5959
)
6060
assert row # nosec
6161
return Node.model_validate(row, from_attributes=True)
6262

6363

64-
async def list_nodes(
64+
async def list_project_nodes(
6565
app: web.Application,
6666
connection: AsyncConnection | None = None,
6767
*,
68-
project_id: ProjectID,
68+
project_uuid: ProjectID,
6969
) -> list[Node]:
7070
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
7171
result = await conn.stream(
7272
sa.select(*_SELECTION_PROJECTS_NODES_DB_ARGS).where(
73-
projects_nodes.c.project_uuid == f"{project_id}"
73+
projects_nodes.c.project_uuid == f"{project_uuid}"
7474
)
7575
)
7676
rows = await result.all() or []
7777
return TypeAdapter(list[Node]).validate_python(rows)
7878

7979

80-
async def update_node(
80+
async def update_project_node(
8181
app: web.Application,
8282
connection: AsyncConnection | None = None,
8383
*,
84-
project_id: ProjectID,
84+
project_uuid: ProjectID,
8585
node_id: NodeID,
8686
partial_node: PartialNode,
87-
) -> None:
87+
) -> Node:
8888
values = partial_node.model_dump(mode="json", exclude_unset=True)
8989

9090
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
91-
await conn.stream(
91+
result = await conn.stream(
9292
projects_nodes.update()
9393
.values(**values)
9494
.where(
95-
(projects_nodes.c.project_uuid == f"{project_id}")
95+
(projects_nodes.c.project_uuid == f"{project_uuid}")
9696
& (projects_nodes.c.node_id == f"{node_id}")
9797
)
98+
.returning(*_SELECTION_PROJECTS_NODES_DB_ARGS)
9899
)
100+
row = await result.first()
101+
assert row
102+
return Node.model_validate(row, from_attributes=True)

services/web/server/src/simcore_service_webserver/projects/api/__init__.py

Whitespace-only changes.

services/web/server/src/simcore_service_webserver/projects/api/rpc/__init__.py

Whitespace-only changes.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from aiohttp import web
2+
from models_library.api_schemas_webserver import WEBSERVER_RPC_NAMESPACE
3+
from models_library.projects import ProjectID
4+
from models_library.projects_nodes import Node
5+
from models_library.rpc.webserver.projects.projects_nodes import ProjectNodeGet
6+
from pydantic import TypeAdapter
7+
from servicelib.rabbitmq import RPCRouter
8+
9+
from ....rabbitmq import get_rabbitmq_rpc_server
10+
from ... import projects_service
11+
12+
router = RPCRouter()
13+
14+
15+
@router.expose()
16+
async def list_project_nodes(
17+
app: web.Application,
18+
*,
19+
project_uuid: ProjectID,
20+
) -> list[ProjectNodeGet]:
21+
nodes: list[Node] = await projects_service.list_project_nodes(app, project_uuid)
22+
return TypeAdapter(list[ProjectNodeGet]).validate_python(nodes)
23+
24+
25+
async def register_rpc_routes(app: web.Application):
26+
rpc_server = get_rabbitmq_rpc_server(app)
27+
await rpc_server.register_router(router, WEBSERVER_RPC_NAMESPACE, app)

services/web/server/src/simcore_service_webserver/projects/plugin.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup
1010

1111
from .._constants import APP_SETTINGS_KEY
12+
from ..rabbitmq import setup_rabbitmq
1213
from . import (
1314
_comments_handlers,
1415
_crud_handlers,
@@ -26,6 +27,7 @@
2627
)
2728
from ._observer import setup_project_observer_events
2829
from ._projects_access import setup_projects_access
30+
from .api.rpc.routes import register_rpc_routes
2931
from .db import setup_projects_db
3032

3133
logger = logging.getLogger(__name__)
@@ -64,4 +66,9 @@ def setup_projects(app: web.Application) -> bool:
6466
app.router.add_routes(_workspaces_handlers.routes)
6567
app.router.add_routes(_trash_rest.routes)
6668

69+
# RPC
70+
setup_rabbitmq(app)
71+
if app[APP_SETTINGS_KEY].WEBSERVER_RABBITMQ:
72+
app.on_startup.append(register_rpc_routes)
73+
6774
return True

0 commit comments

Comments
 (0)