Skip to content

Commit 7881d83

Browse files
authored
🎨Send NodeUpdate when computational pipeline completes (#8250)
1 parent e873406 commit 7881d83

File tree

11 files changed

+142
-56
lines changed

11 files changed

+142
-56
lines changed

packages/models-library/src/models_library/projects_state.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
from enum import Enum, unique
6-
from typing import Annotated, Self, TypeAlias
6+
from typing import Annotated, Final, Self, TypeAlias
77

88
from pydantic import (
99
BaseModel,
@@ -65,6 +65,13 @@ def is_running(self) -> bool:
6565
return self in self.list_running_states()
6666

6767

68+
RUNNING_STATE_COMPLETED_STATES: Final[tuple[RunningState, ...]] = (
69+
RunningState.ABORTED,
70+
RunningState.FAILED,
71+
RunningState.SUCCESS,
72+
)
73+
74+
6875
@unique
6976
class DataState(str, Enum):
7077
UP_TO_DATE = "UPTODATE"

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,4 @@ class ComputationalPipelineStatusMessage(RabbitMessageBase, ProjectMessageBase):
325325
run_result: RunningState
326326

327327
def routing_key(self) -> str | None:
328-
return f"{self.project_id}"
328+
return f"{self.project_id}.all_nodes"

packages/pytest-simcore/src/pytest_simcore/docker_registry.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import docker
1616
import jsonschema
1717
import pytest
18+
import pytest_asyncio
1819
import tenacity
1920
from pytest_simcore.helpers.logging_tools import log_context
2021
from pytest_simcore.helpers.typing_env import EnvVarsDict
@@ -146,7 +147,7 @@ def wait_till_registry_is_responsive(url: str) -> bool:
146147
# ********************************************************* Services ***************************************
147148

148149

149-
def _pull_push_service(
150+
async def _pull_push_service(
150151
pull_key: str,
151152
tag: str,
152153
new_registry: str,
@@ -213,11 +214,13 @@ def _pull_push_service(
213214
assert image.tag(new_image_tag)
214215

215216
# push the image to the new location
216-
with log_context(
217-
logging.INFO,
218-
msg=f"Pushing {pull_key}:{tag} -> {new_image_tag} ...",
219-
):
220-
client.images.push(new_image_tag)
217+
async with aiodocker.Docker() as client:
218+
await client.images.push(new_image_tag)
219+
# with log_context(
220+
# logging.INFO,
221+
# msg=f"Pushing {pull_key}:{tag} -> {new_image_tag} ...",
222+
# ):
223+
# client.images.push(new_image_tag)
221224

222225
# return image io.simcore.* labels
223226
image_labels = dict(image.labels)
@@ -230,10 +233,10 @@ def _pull_push_service(
230233
}
231234

232235

233-
@pytest.fixture(scope="session")
236+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
234237
def docker_registry_image_injector(
235238
docker_registry: str, node_meta_schema: dict
236-
) -> Callable[..., dict[str, Any]]:
239+
) -> Callable[[str, str, str | None], Awaitable[dict[str, Any]]]:
237240
def inject_image(
238241
source_image_repo: str, source_image_tag: str, owner_email: str | None = None
239242
):
@@ -249,82 +252,86 @@ def inject_image(
249252

250253

251254
@pytest.fixture
252-
def osparc_service(
255+
async def osparc_service(
253256
docker_registry: str, node_meta_schema: dict, service_repo: str, service_tag: str
254257
) -> dict[str, Any]:
255258
"""pulls the service from service_repo:service_tag and pushes to docker_registry using the oSparc node meta schema
256259
NOTE: 'service_repo' and 'service_tag' defined as parametrization
257260
"""
258-
return _pull_push_service(
261+
return await _pull_push_service(
259262
service_repo, service_tag, docker_registry, node_meta_schema
260263
)
261264

262265

263-
@pytest.fixture(scope="session")
264-
def sleeper_service(docker_registry: str, node_meta_schema: dict) -> dict[str, Any]:
266+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
267+
async def sleeper_service(
268+
docker_registry: str, node_meta_schema: dict
269+
) -> dict[str, Any]:
265270
"""Adds a itisfoundation/sleeper in docker registry"""
266-
return _pull_push_service(
271+
return await _pull_push_service(
267272
"itisfoundation/sleeper", "1.0.0", docker_registry, node_meta_schema
268273
)
269274

270275

271-
@pytest.fixture(scope="session")
272-
def jupyter_service(docker_registry: str, node_meta_schema: dict) -> dict[str, Any]:
276+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
277+
async def jupyter_service(
278+
docker_registry: str, node_meta_schema: dict
279+
) -> dict[str, Any]:
273280
"""Adds a itisfoundation/jupyter-base-notebook in docker registry"""
274-
return _pull_push_service(
281+
return await _pull_push_service(
275282
"itisfoundation/jupyter-base-notebook",
276283
"2.13.0",
277284
docker_registry,
278285
node_meta_schema,
279286
)
280287

281288

282-
@pytest.fixture(scope="session", params=["2.0.7"])
289+
@pytest_asyncio.fixture(scope="session", loop_scope="session", params=["2.0.7"])
283290
def dy_static_file_server_version(request: pytest.FixtureRequest):
284291
return request.param
285292

286293

287-
@pytest.fixture(scope="session")
288-
def dy_static_file_server_service(
294+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
295+
async def dy_static_file_server_service(
289296
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
290297
) -> dict[str, Any]:
291298
"""
292299
Adds the below service in docker registry
293300
itisfoundation/dy-static-file-server
294301
"""
295-
return _pull_push_service(
302+
return await _pull_push_service(
296303
"itisfoundation/dy-static-file-server",
297304
dy_static_file_server_version,
298305
docker_registry,
299306
node_meta_schema,
300307
)
301308

302309

303-
@pytest.fixture(scope="session")
304-
def dy_static_file_server_dynamic_sidecar_service(
310+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
311+
async def dy_static_file_server_dynamic_sidecar_service(
305312
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
306313
) -> dict[str, Any]:
307314
"""
308315
Adds the below service in docker registry
309316
itisfoundation/dy-static-file-server-dynamic-sidecar
310317
"""
311-
return _pull_push_service(
318+
return await _pull_push_service(
312319
"itisfoundation/dy-static-file-server-dynamic-sidecar",
313320
dy_static_file_server_version,
314321
docker_registry,
315322
node_meta_schema,
316323
)
317324

318325

319-
@pytest.fixture(scope="session")
320-
def dy_static_file_server_dynamic_sidecar_compose_spec_service(
326+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
327+
async def dy_static_file_server_dynamic_sidecar_compose_spec_service(
321328
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
322329
) -> dict[str, Any]:
323330
"""
324331
Adds the below service in docker registry
325332
itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec
326333
"""
327-
return _pull_push_service(
334+
return await _pull_push_service(
328335
"itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec",
329336
dy_static_file_server_version,
330337
docker_registry,

services/director-v2/src/simcore_service_director_v2/utils/computations.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Any
44

55
import arrow
6-
from models_library.projects_state import RunningState
6+
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES, RunningState
77
from models_library.services import ServiceKeyVersion
88
from models_library.services_regex import SERVICE_KEY_RE
99
from models_library.users import UserID
@@ -15,7 +15,7 @@
1515

1616
_logger = logging.getLogger(__name__)
1717

18-
_COMPLETED_STATES = (RunningState.ABORTED, RunningState.FAILED, RunningState.SUCCESS)
18+
1919
_RUNNING_STATES = (RunningState.STARTED,)
2020
_TASK_TO_PIPELINE_CONVERSIONS = {
2121
# tasks are initially in NOT_STARTED state, then they transition to published
@@ -50,16 +50,16 @@
5050
RunningState.NOT_STARTED,
5151
): RunningState.NOT_STARTED,
5252
# if there are only completed states with FAILED --> FAILED
53-
(*_COMPLETED_STATES,): RunningState.FAILED,
53+
(*RUNNING_STATE_COMPLETED_STATES,): RunningState.FAILED,
5454
# if there are only completed states with FAILED and not started ones --> NOT_STARTED
5555
(
56-
*_COMPLETED_STATES,
56+
*RUNNING_STATE_COMPLETED_STATES,
5757
RunningState.NOT_STARTED,
5858
): RunningState.NOT_STARTED,
5959
# the generic case where we have a combination of completed states, running states,
6060
# or published/pending tasks, not_started is a started pipeline
6161
(
62-
*_COMPLETED_STATES,
62+
*RUNNING_STATE_COMPLETED_STATES,
6363
*_RUNNING_STATES,
6464
RunningState.PUBLISHED,
6565
RunningState.PENDING,

services/director-v2/tests/integration/02/test_dynamic_services_routes.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,11 +276,13 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]:
276276
async def key_version_expected(
277277
dy_static_file_server_dynamic_sidecar_service: dict,
278278
dy_static_file_server_service: dict,
279-
docker_registry_image_injector: Callable,
279+
docker_registry_image_injector: Callable[
280+
[str, str, str | None], Awaitable[dict[str, Any]]
281+
],
280282
) -> list[tuple[ServiceKeyVersion, bool]]:
281283
results: list[tuple[ServiceKeyVersion, bool]] = []
282284

283-
sleeper_service = docker_registry_image_injector(
285+
sleeper_service = await docker_registry_image_injector(
284286
"itisfoundation/sleeper", "2.1.1", "[email protected]"
285287
)
286288

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from aiohttp import web
88
from models_library.groups import GroupID
9+
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES
910
from models_library.rabbitmq_messages import (
1011
ComputationalPipelineStatusMessage,
1112
EventRabbitMessage,
@@ -19,9 +20,9 @@
1920
from pydantic import TypeAdapter
2021
from servicelib.logging_utils import log_catch, log_context
2122
from servicelib.rabbitmq import RabbitMQClient
22-
from servicelib.utils import logged_gather
23+
from servicelib.utils import limited_gather, logged_gather
2324

24-
from ..projects import _projects_service
25+
from ..projects import _nodes_service, _projects_service
2526
from ..rabbitmq import get_rabbitmq_client
2627
from ..socketio.messages import (
2728
SOCKET_IO_EVENT,
@@ -80,6 +81,10 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
8081
return True
8182

8283

84+
def _is_computational_node(node_key: str) -> bool:
85+
return "/comp/" in node_key
86+
87+
8388
async def _computational_pipeline_status_message_parser(
8489
app: web.Application, data: bytes
8590
) -> bool:
@@ -90,6 +95,24 @@ async def _computational_pipeline_status_message_parser(
9095
rabbit_message.user_id,
9196
include_state=True,
9297
)
98+
if rabbit_message.run_result in RUNNING_STATE_COMPLETED_STATES:
99+
# the pipeline finished, the frontend needs to update all computational nodes
100+
computational_node_ids = (
101+
n.node_id
102+
for n in await _nodes_service.get_project_nodes(
103+
app, project_uuid=project["uuid"]
104+
)
105+
if _is_computational_node(n.key)
106+
)
107+
await limited_gather(
108+
*[
109+
_projects_service.notify_project_node_update(
110+
app, project, n_id, errors=None
111+
)
112+
for n_id in computational_node_ids
113+
],
114+
limit=10, # notify 10 nodes at a time
115+
)
93116
await _projects_service.notify_project_state_update(app, project)
94117

95118
return True
Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
from aiohttp import web
22
from models_library.projects import ProjectID
33
from models_library.services_types import ServiceKey, ServiceVersion
4-
from simcore_postgres_database.utils_projects_nodes import ProjectNodesRepo
4+
from simcore_postgres_database.utils_projects_nodes import ProjectNode, ProjectNodesRepo
5+
from simcore_postgres_database.utils_repos import pass_or_acquire_connection
56

6-
from ..db.plugin import get_database_engine_legacy
7+
from ..db.plugin import get_asyncpg_engine
78

89

910
async def get_project_nodes_services(
1011
app: web.Application, *, project_uuid: ProjectID
1112
) -> list[tuple[ServiceKey, ServiceVersion]]:
1213
repo = ProjectNodesRepo(project_uuid=project_uuid)
1314

14-
async with get_database_engine_legacy(app).acquire() as conn:
15+
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
1516
nodes = await repo.list(conn)
1617

1718
# removes duplicates by preserving order
1819
return list(dict.fromkeys((node.key, node.version) for node in nodes))
20+
21+
22+
async def get_project_nodes(
23+
app: web.Application, *, project_uuid: ProjectID
24+
) -> list[ProjectNode]:
25+
repo = ProjectNodesRepo(project_uuid=project_uuid)
26+
27+
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
28+
return await repo.list(conn)

services/web/server/src/simcore_service_webserver/projects/_nodes_service.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
model_validator,
2525
)
2626
from servicelib.utils import logged_gather
27+
from simcore_postgres_database.utils_projects_nodes import ProjectNode
2728

2829
from ..application_settings import get_application_settings
2930
from ..storage.api import get_download_link, get_files_in_node_folder
@@ -81,6 +82,12 @@ async def get_project_nodes_services(
8182
)
8283

8384

85+
async def get_project_nodes(
86+
app: web.Application, *, project_uuid: ProjectID
87+
) -> list[ProjectNode]:
88+
return await _nodes_repository.get_project_nodes(app, project_uuid=project_uuid)
89+
90+
8491
#
8592
# PREVIEWS
8693
#

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,9 +1367,10 @@ async def is_node_id_present_in_any_project_workbench(
13671367
async def _get_node_share_state(
13681368
app: web.Application,
13691369
*,
1370-
user_id: UserID,
13711370
project_uuid: ProjectID,
13721371
node_id: NodeID,
1372+
computational_pipeline_running: bool | None,
1373+
user_primrary_groupid: GroupID,
13731374
) -> NodeShareState:
13741375
node = await _projects_nodes_repository.get(
13751376
app, project_id=project_uuid, node_id=node_id
@@ -1405,11 +1406,11 @@ async def _get_node_share_state(
14051406
return NodeShareState(locked=False)
14061407

14071408
# if the service is computational and no pipeline is running it is not locked
1408-
if await director_v2_service.is_pipeline_running(app, user_id, project_uuid):
1409+
if computational_pipeline_running:
14091410
return NodeShareState(
14101411
locked=True,
14111412
current_user_groupids=[
1412-
await users_service.get_user_primary_group_id(app, user_id)
1413+
user_primrary_groupid,
14131414
],
14141415
status=NodeShareStatus.OPENED,
14151416
)
@@ -1913,6 +1914,10 @@ async def add_project_states_for_user(
19131914
)
19141915

19151916
# compose the node states
1917+
is_pipeline_running = await director_v2_service.is_pipeline_running(
1918+
app, user_id, project["uuid"]
1919+
)
1920+
user_primary_group_id = await users_service.get_user_primary_group_id(app, user_id)
19161921
for node_uuid, node in project["workbench"].items():
19171922
assert isinstance(node_uuid, str) # nosec
19181923
assert isinstance(node, dict) # nosec
@@ -1921,9 +1926,10 @@ async def add_project_states_for_user(
19211926
with contextlib.suppress(NodeShareStateCannotBeComputedError):
19221927
node_lock_state = await _get_node_share_state(
19231928
app,
1924-
user_id=user_id,
19251929
project_uuid=project["uuid"],
19261930
node_id=NodeID(node_uuid),
1931+
computational_pipeline_running=is_pipeline_running,
1932+
user_primrary_groupid=user_primary_group_id,
19271933
)
19281934
if NodeID(node_uuid) in computational_node_states:
19291935
node_state = computational_node_states[NodeID(node_uuid)].model_copy(

0 commit comments

Comments
 (0)