Skip to content

Commit 333d947

Browse files
author
Andrei Neagu
committed
Merge remote-tracking branch 'upstream/master' into pr-osparc-fix-orphan-task-removal
2 parents ac9ad80 + 7881d83 commit 333d947

File tree

13 files changed

+182
-73
lines changed

13 files changed

+182
-73
lines changed

packages/models-library/src/models_library/projects_state.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
from enum import Enum, unique
6-
from typing import Annotated, Self, TypeAlias
6+
from typing import Annotated, Final, Self, TypeAlias
77

88
from pydantic import (
99
BaseModel,
@@ -65,6 +65,13 @@ def is_running(self) -> bool:
6565
return self in self.list_running_states()
6666

6767

68+
RUNNING_STATE_COMPLETED_STATES: Final[tuple[RunningState, ...]] = (
69+
RunningState.ABORTED,
70+
RunningState.FAILED,
71+
RunningState.SUCCESS,
72+
)
73+
74+
6875
@unique
6976
class DataState(str, Enum):
7077
UP_TO_DATE = "UPTODATE"

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,4 @@ class ComputationalPipelineStatusMessage(RabbitMessageBase, ProjectMessageBase):
325325
run_result: RunningState
326326

327327
def routing_key(self) -> str | None:
328-
return f"{self.project_id}"
328+
return f"{self.project_id}.all_nodes"

packages/pytest-simcore/src/pytest_simcore/docker_registry.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import docker
1616
import jsonschema
1717
import pytest
18+
import pytest_asyncio
1819
import tenacity
1920
from pytest_simcore.helpers.logging_tools import log_context
2021
from pytest_simcore.helpers.typing_env import EnvVarsDict
@@ -146,7 +147,7 @@ def wait_till_registry_is_responsive(url: str) -> bool:
146147
# ********************************************************* Services ***************************************
147148

148149

149-
def _pull_push_service(
150+
async def _pull_push_service(
150151
pull_key: str,
151152
tag: str,
152153
new_registry: str,
@@ -213,11 +214,13 @@ def _pull_push_service(
213214
assert image.tag(new_image_tag)
214215

215216
# push the image to the new location
216-
with log_context(
217-
logging.INFO,
218-
msg=f"Pushing {pull_key}:{tag} -> {new_image_tag} ...",
219-
):
220-
client.images.push(new_image_tag)
217+
async with aiodocker.Docker() as client:
218+
await client.images.push(new_image_tag)
219+
# with log_context(
220+
# logging.INFO,
221+
# msg=f"Pushing {pull_key}:{tag} -> {new_image_tag} ...",
222+
# ):
223+
# client.images.push(new_image_tag)
221224

222225
# return image io.simcore.* labels
223226
image_labels = dict(image.labels)
@@ -230,10 +233,10 @@ def _pull_push_service(
230233
}
231234

232235

233-
@pytest.fixture(scope="session")
236+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
234237
def docker_registry_image_injector(
235238
docker_registry: str, node_meta_schema: dict
236-
) -> Callable[..., dict[str, Any]]:
239+
) -> Callable[[str, str, str | None], Awaitable[dict[str, Any]]]:
237240
def inject_image(
238241
source_image_repo: str, source_image_tag: str, owner_email: str | None = None
239242
):
@@ -249,82 +252,86 @@ def inject_image(
249252

250253

251254
@pytest.fixture
252-
def osparc_service(
255+
async def osparc_service(
253256
docker_registry: str, node_meta_schema: dict, service_repo: str, service_tag: str
254257
) -> dict[str, Any]:
255258
"""pulls the service from service_repo:service_tag and pushes to docker_registry using the oSparc node meta schema
256259
NOTE: 'service_repo' and 'service_tag' defined as parametrization
257260
"""
258-
return _pull_push_service(
261+
return await _pull_push_service(
259262
service_repo, service_tag, docker_registry, node_meta_schema
260263
)
261264

262265

263-
@pytest.fixture(scope="session")
264-
def sleeper_service(docker_registry: str, node_meta_schema: dict) -> dict[str, Any]:
266+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
267+
async def sleeper_service(
268+
docker_registry: str, node_meta_schema: dict
269+
) -> dict[str, Any]:
265270
"""Adds a itisfoundation/sleeper in docker registry"""
266-
return _pull_push_service(
271+
return await _pull_push_service(
267272
"itisfoundation/sleeper", "1.0.0", docker_registry, node_meta_schema
268273
)
269274

270275

271-
@pytest.fixture(scope="session")
272-
def jupyter_service(docker_registry: str, node_meta_schema: dict) -> dict[str, Any]:
276+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
277+
async def jupyter_service(
278+
docker_registry: str, node_meta_schema: dict
279+
) -> dict[str, Any]:
273280
"""Adds a itisfoundation/jupyter-base-notebook in docker registry"""
274-
return _pull_push_service(
281+
return await _pull_push_service(
275282
"itisfoundation/jupyter-base-notebook",
276283
"2.13.0",
277284
docker_registry,
278285
node_meta_schema,
279286
)
280287

281288

282-
@pytest.fixture(scope="session", params=["2.0.7"])
289+
@pytest_asyncio.fixture(scope="session", loop_scope="session", params=["2.0.7"])
283290
def dy_static_file_server_version(request: pytest.FixtureRequest):
284291
return request.param
285292

286293

287-
@pytest.fixture(scope="session")
288-
def dy_static_file_server_service(
294+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
295+
async def dy_static_file_server_service(
289296
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
290297
) -> dict[str, Any]:
291298
"""
292299
Adds the below service in docker registry
293300
itisfoundation/dy-static-file-server
294301
"""
295-
return _pull_push_service(
302+
return await _pull_push_service(
296303
"itisfoundation/dy-static-file-server",
297304
dy_static_file_server_version,
298305
docker_registry,
299306
node_meta_schema,
300307
)
301308

302309

303-
@pytest.fixture(scope="session")
304-
def dy_static_file_server_dynamic_sidecar_service(
310+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
311+
async def dy_static_file_server_dynamic_sidecar_service(
305312
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
306313
) -> dict[str, Any]:
307314
"""
308315
Adds the below service in docker registry
309316
itisfoundation/dy-static-file-server-dynamic-sidecar
310317
"""
311-
return _pull_push_service(
318+
return await _pull_push_service(
312319
"itisfoundation/dy-static-file-server-dynamic-sidecar",
313320
dy_static_file_server_version,
314321
docker_registry,
315322
node_meta_schema,
316323
)
317324

318325

319-
@pytest.fixture(scope="session")
320-
def dy_static_file_server_dynamic_sidecar_compose_spec_service(
326+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
327+
async def dy_static_file_server_dynamic_sidecar_compose_spec_service(
321328
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
322329
) -> dict[str, Any]:
323330
"""
324331
Adds the below service in docker registry
325332
itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec
326333
"""
327-
return _pull_push_service(
334+
return await _pull_push_service(
328335
"itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec",
329336
dy_static_file_server_version,
330337
docker_registry,

packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,29 @@ def __call__(self, message: str) -> bool:
302302
return False
303303

304304

305+
@dataclass
306+
class SocketIOWaitNodeForOutputs:
307+
logger: logging.Logger
308+
expected_number_of_outputs: int
309+
node_id: str
310+
311+
def __call__(self, message: str) -> bool:
312+
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
313+
decoded_message = decode_socketio_42_message(message)
314+
if decoded_message.name == _OSparcMessages.NODE_UPDATED:
315+
assert "data" in decoded_message.obj
316+
assert "node_id" in decoded_message.obj
317+
if decoded_message.obj["node_id"] == self.node_id:
318+
assert "outputs" in decoded_message.obj["data"]
319+
320+
return (
321+
len(decoded_message.obj["data"]["outputs"])
322+
== self.expected_number_of_outputs
323+
)
324+
325+
return False
326+
327+
305328
@dataclass
306329
class SocketIOOsparcMessagePrinter:
307330
include_logger_messages: bool = False

services/director-v2/src/simcore_service_director_v2/utils/computations.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Any
44

55
import arrow
6-
from models_library.projects_state import RunningState
6+
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES, RunningState
77
from models_library.services import ServiceKeyVersion
88
from models_library.services_regex import SERVICE_KEY_RE
99
from models_library.users import UserID
@@ -15,7 +15,7 @@
1515

1616
_logger = logging.getLogger(__name__)
1717

18-
_COMPLETED_STATES = (RunningState.ABORTED, RunningState.FAILED, RunningState.SUCCESS)
18+
1919
_RUNNING_STATES = (RunningState.STARTED,)
2020
_TASK_TO_PIPELINE_CONVERSIONS = {
2121
# tasks are initially in NOT_STARTED state, then they transition to published
@@ -50,16 +50,16 @@
5050
RunningState.NOT_STARTED,
5151
): RunningState.NOT_STARTED,
5252
# if there are only completed states with FAILED --> FAILED
53-
(*_COMPLETED_STATES,): RunningState.FAILED,
53+
(*RUNNING_STATE_COMPLETED_STATES,): RunningState.FAILED,
5454
# if there are only completed states with FAILED and not started ones --> NOT_STARTED
5555
(
56-
*_COMPLETED_STATES,
56+
*RUNNING_STATE_COMPLETED_STATES,
5757
RunningState.NOT_STARTED,
5858
): RunningState.NOT_STARTED,
5959
# the generic case where we have a combination of completed states, running states,
6060
# or published/pending tasks, not_started is a started pipeline
6161
(
62-
*_COMPLETED_STATES,
62+
*RUNNING_STATE_COMPLETED_STATES,
6363
*_RUNNING_STATES,
6464
RunningState.PUBLISHED,
6565
RunningState.PENDING,

services/director-v2/tests/integration/02/test_dynamic_services_routes.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,11 +276,13 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]:
276276
async def key_version_expected(
277277
dy_static_file_server_dynamic_sidecar_service: dict,
278278
dy_static_file_server_service: dict,
279-
docker_registry_image_injector: Callable,
279+
docker_registry_image_injector: Callable[
280+
[str, str, str | None], Awaitable[dict[str, Any]]
281+
],
280282
) -> list[tuple[ServiceKeyVersion, bool]]:
281283
results: list[tuple[ServiceKeyVersion, bool]] = []
282284

283-
sleeper_service = docker_registry_image_injector(
285+
sleeper_service = await docker_registry_image_injector(
284286
"itisfoundation/sleeper", "2.1.1", "[email protected]"
285287
)
286288

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from aiohttp import web
88
from models_library.groups import GroupID
9+
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES
910
from models_library.rabbitmq_messages import (
1011
ComputationalPipelineStatusMessage,
1112
EventRabbitMessage,
@@ -19,9 +20,9 @@
1920
from pydantic import TypeAdapter
2021
from servicelib.logging_utils import log_catch, log_context
2122
from servicelib.rabbitmq import RabbitMQClient
22-
from servicelib.utils import logged_gather
23+
from servicelib.utils import limited_gather, logged_gather
2324

24-
from ..projects import _projects_service
25+
from ..projects import _nodes_service, _projects_service
2526
from ..rabbitmq import get_rabbitmq_client
2627
from ..socketio.messages import (
2728
SOCKET_IO_EVENT,
@@ -80,6 +81,10 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
8081
return True
8182

8283

84+
def _is_computational_node(node_key: str) -> bool:
85+
return "/comp/" in node_key
86+
87+
8388
async def _computational_pipeline_status_message_parser(
8489
app: web.Application, data: bytes
8590
) -> bool:
@@ -90,6 +95,24 @@ async def _computational_pipeline_status_message_parser(
9095
rabbit_message.user_id,
9196
include_state=True,
9297
)
98+
if rabbit_message.run_result in RUNNING_STATE_COMPLETED_STATES:
99+
# the pipeline finished, the frontend needs to update all computational nodes
100+
computational_node_ids = (
101+
n.node_id
102+
for n in await _nodes_service.get_project_nodes(
103+
app, project_uuid=project["uuid"]
104+
)
105+
if _is_computational_node(n.key)
106+
)
107+
await limited_gather(
108+
*[
109+
_projects_service.notify_project_node_update(
110+
app, project, n_id, errors=None
111+
)
112+
for n_id in computational_node_ids
113+
],
114+
limit=10, # notify 10 nodes at a time
115+
)
93116
await _projects_service.notify_project_state_update(app, project)
94117

95118
return True
Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
from aiohttp import web
22
from models_library.projects import ProjectID
33
from models_library.services_types import ServiceKey, ServiceVersion
4-
from simcore_postgres_database.utils_projects_nodes import ProjectNodesRepo
4+
from simcore_postgres_database.utils_projects_nodes import ProjectNode, ProjectNodesRepo
5+
from simcore_postgres_database.utils_repos import pass_or_acquire_connection
56

6-
from ..db.plugin import get_database_engine_legacy
7+
from ..db.plugin import get_asyncpg_engine
78

89

910
async def get_project_nodes_services(
1011
app: web.Application, *, project_uuid: ProjectID
1112
) -> list[tuple[ServiceKey, ServiceVersion]]:
1213
repo = ProjectNodesRepo(project_uuid=project_uuid)
1314

14-
async with get_database_engine_legacy(app).acquire() as conn:
15+
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
1516
nodes = await repo.list(conn)
1617

1718
# removes duplicates by preserving order
1819
return list(dict.fromkeys((node.key, node.version) for node in nodes))
20+
21+
22+
async def get_project_nodes(
23+
app: web.Application, *, project_uuid: ProjectID
24+
) -> list[ProjectNode]:
25+
repo = ProjectNodesRepo(project_uuid=project_uuid)
26+
27+
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
28+
return await repo.list(conn)

services/web/server/src/simcore_service_webserver/projects/_nodes_service.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
model_validator,
2525
)
2626
from servicelib.utils import logged_gather
27+
from simcore_postgres_database.utils_projects_nodes import ProjectNode
2728

2829
from ..application_settings import get_application_settings
2930
from ..storage.api import get_download_link, get_files_in_node_folder
@@ -81,6 +82,12 @@ async def get_project_nodes_services(
8182
)
8283

8384

85+
async def get_project_nodes(
86+
app: web.Application, *, project_uuid: ProjectID
87+
) -> list[ProjectNode]:
88+
return await _nodes_repository.get_project_nodes(app, project_uuid=project_uuid)
89+
90+
8491
#
8592
# PREVIEWS
8693
#

0 commit comments

Comments
 (0)