Skip to content

Commit 5881c10

Browse files
committed
notify
1 parent 4dd373c commit 5881c10

File tree

3 files changed

+46
-6
lines changed

3 files changed

+46
-6
lines changed

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from aiohttp import web
88
from models_library.groups import GroupID
9-
from models_library.projects_state import RunningState
9+
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES
1010
from models_library.rabbitmq_messages import (
1111
ComputationalPipelineStatusMessage,
1212
EventRabbitMessage,
@@ -20,9 +20,9 @@
2020
from pydantic import TypeAdapter
2121
from servicelib.logging_utils import log_catch, log_context
2222
from servicelib.rabbitmq import RabbitMQClient
23-
from servicelib.utils import logged_gather
23+
from servicelib.utils import limited_gather, logged_gather
2424

25-
from ..projects import _projects_service
25+
from ..projects import _nodes_service, _projects_service
2626
from ..rabbitmq import get_rabbitmq_client
2727
from ..socketio.messages import (
2828
SOCKET_IO_EVENT,
@@ -81,6 +81,10 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
8181
return True
8282

8383

84+
def _is_computational_node(node_key: str) -> bool:
85+
return "/comp/" in node_key
86+
87+
8488
async def _computational_pipeline_status_message_parser(
8589
app: web.Application, data: bytes
8690
) -> bool:
@@ -91,6 +95,25 @@ async def _computational_pipeline_status_message_parser(
9195
rabbit_message.user_id,
9296
include_state=True,
9397
)
98+
if rabbit_message.run_result in RUNNING_STATE_COMPLETED_STATES:
99+
# the pipeline finished, the frontend needs to update all computational nodes
100+
computational_node_ids = (
101+
n.node_id
102+
for n in await _nodes_service.get_project_nodes(
103+
app, project_uuid=project["uuid"]
104+
)
105+
if _is_computational_node(n.key)
106+
)
107+
108+
await limited_gather(
109+
*[
110+
_projects_service.notify_project_node_update(
111+
app, project, n_id, errors=None
112+
)
113+
for n_id in computational_node_ids
114+
],
115+
limit=10, # notify 10 nodes at a time
116+
)
94117
await _projects_service.notify_project_state_update(app, project)
95118

96119
return True
Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
from aiohttp import web
22
from models_library.projects import ProjectID
33
from models_library.services_types import ServiceKey, ServiceVersion
4-
from simcore_postgres_database.utils_projects_nodes import ProjectNodesRepo
4+
from simcore_postgres_database.utils_projects_nodes import ProjectNode, ProjectNodesRepo
5+
from simcore_postgres_database.utils_repos import pass_or_acquire_connection
56

6-
from ..db.plugin import get_database_engine_legacy
7+
from ..db.plugin import get_asyncpg_engine
78

89

910
async def get_project_nodes_services(
1011
app: web.Application, *, project_uuid: ProjectID
1112
) -> list[tuple[ServiceKey, ServiceVersion]]:
1213
repo = ProjectNodesRepo(project_uuid=project_uuid)
1314

14-
async with get_database_engine_legacy(app).acquire() as conn:
15+
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
1516
nodes = await repo.list(conn)
1617

1718
# removes duplicates by preserving order
1819
return list(dict.fromkeys((node.key, node.version) for node in nodes))
20+
21+
22+
async def get_project_nodes(
23+
app: web.Application, *, project_uuid: ProjectID
24+
) -> list[ProjectNode]:
25+
repo = ProjectNodesRepo(project_uuid=project_uuid)
26+
27+
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
28+
return await repo.list(conn)

services/web/server/src/simcore_service_webserver/projects/_nodes_service.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
model_validator,
2525
)
2626
from servicelib.utils import logged_gather
27+
from simcore_postgres_database.utils_projects_nodes import ProjectNode
2728

2829
from ..application_settings import get_application_settings
2930
from ..storage.api import get_download_link, get_files_in_node_folder
@@ -81,6 +82,12 @@ async def get_project_nodes_services(
8182
)
8283

8384

85+
async def get_project_nodes(
86+
app: web.Application, *, project_uuid: ProjectID
87+
) -> list[ProjectNode]:
88+
return await _nodes_repository.get_project_nodes(app, project_uuid=project_uuid)
89+
90+
8491
#
8592
# PREVIEWS
8693
#

0 commit comments

Comments
 (0)