Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from enum import Enum, unique
from typing import Annotated, Self, TypeAlias
from typing import Annotated, Final, Self, TypeAlias

from pydantic import (
BaseModel,
Expand Down Expand Up @@ -65,6 +65,13 @@ def is_running(self) -> bool:
return self in self.list_running_states()


RUNNING_STATE_COMPLETED_STATES: Final[tuple[RunningState, ...]] = (
RunningState.ABORTED,
RunningState.FAILED,
RunningState.SUCCESS,
)


@unique
class DataState(str, Enum):
UP_TO_DATE = "UPTODATE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,4 @@ class ComputationalPipelineStatusMessage(RabbitMessageBase, ProjectMessageBase):
run_result: RunningState

def routing_key(self) -> str | None:
return f"{self.project_id}"
return f"{self.project_id}.all_nodes"
59 changes: 33 additions & 26 deletions packages/pytest-simcore/src/pytest_simcore/docker_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import docker
import jsonschema
import pytest
import pytest_asyncio
import tenacity
from pytest_simcore.helpers.logging_tools import log_context
from pytest_simcore.helpers.typing_env import EnvVarsDict
Expand Down Expand Up @@ -146,7 +147,7 @@ def wait_till_registry_is_responsive(url: str) -> bool:
# ********************************************************* Services ***************************************


def _pull_push_service(
async def _pull_push_service(
pull_key: str,
tag: str,
new_registry: str,
Expand Down Expand Up @@ -213,11 +214,13 @@ def _pull_push_service(
assert image.tag(new_image_tag)

# push the image to the new location
with log_context(
logging.INFO,
msg=f"Pushing {pull_key}:{tag} -> {new_image_tag} ...",
):
client.images.push(new_image_tag)
async with aiodocker.Docker() as client:
await client.images.push(new_image_tag)
# with log_context(
# logging.INFO,
# msg=f"Pushing {pull_key}:{tag} -> {new_image_tag} ...",
# ):
# client.images.push(new_image_tag)

# return image io.simcore.* labels
image_labels = dict(image.labels)
Expand All @@ -230,10 +233,10 @@ def _pull_push_service(
}


@pytest.fixture(scope="session")
@pytest_asyncio.fixture(scope="session", loop_scope="session")
def docker_registry_image_injector(
docker_registry: str, node_meta_schema: dict
) -> Callable[..., dict[str, Any]]:
) -> Callable[[str, str, str | None], Awaitable[dict[str, Any]]]:
def inject_image(
source_image_repo: str, source_image_tag: str, owner_email: str | None = None
):
Expand All @@ -249,82 +252,86 @@ def inject_image(


@pytest.fixture
def osparc_service(
async def osparc_service(
docker_registry: str, node_meta_schema: dict, service_repo: str, service_tag: str
) -> dict[str, Any]:
"""pulls the service from service_repo:service_tag and pushes to docker_registry using the oSparc node meta schema
NOTE: 'service_repo' and 'service_tag' defined as parametrization
"""
return _pull_push_service(
return await _pull_push_service(
service_repo, service_tag, docker_registry, node_meta_schema
)


@pytest.fixture(scope="session")
def sleeper_service(docker_registry: str, node_meta_schema: dict) -> dict[str, Any]:
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def sleeper_service(
docker_registry: str, node_meta_schema: dict
) -> dict[str, Any]:
"""Adds a itisfoundation/sleeper in docker registry"""
return _pull_push_service(
return await _pull_push_service(
"itisfoundation/sleeper", "1.0.0", docker_registry, node_meta_schema
)


@pytest.fixture(scope="session")
def jupyter_service(docker_registry: str, node_meta_schema: dict) -> dict[str, Any]:
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def jupyter_service(
docker_registry: str, node_meta_schema: dict
) -> dict[str, Any]:
"""Adds a itisfoundation/jupyter-base-notebook in docker registry"""
return _pull_push_service(
return await _pull_push_service(
"itisfoundation/jupyter-base-notebook",
"2.13.0",
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session", params=["2.0.7"])
@pytest_asyncio.fixture(scope="session", loop_scope="session", params=["2.0.7"])
def dy_static_file_server_version(request: pytest.FixtureRequest):
return request.param


@pytest.fixture(scope="session")
def dy_static_file_server_service(
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def dy_static_file_server_service(
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
) -> dict[str, Any]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server
"""
return _pull_push_service(
return await _pull_push_service(
"itisfoundation/dy-static-file-server",
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session")
def dy_static_file_server_dynamic_sidecar_service(
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def dy_static_file_server_dynamic_sidecar_service(
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
) -> dict[str, Any]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server-dynamic-sidecar
"""
return _pull_push_service(
return await _pull_push_service(
"itisfoundation/dy-static-file-server-dynamic-sidecar",
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session")
def dy_static_file_server_dynamic_sidecar_compose_spec_service(
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def dy_static_file_server_dynamic_sidecar_compose_spec_service(
docker_registry: str, node_meta_schema: dict, dy_static_file_server_version: str
) -> dict[str, Any]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec
"""
return _pull_push_service(
return await _pull_push_service(
"itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec",
dy_static_file_server_version,
docker_registry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

import arrow
from models_library.projects_state import RunningState
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES, RunningState
from models_library.services import ServiceKeyVersion
from models_library.services_regex import SERVICE_KEY_RE
from models_library.users import UserID
Expand All @@ -15,7 +15,7 @@

_logger = logging.getLogger(__name__)

_COMPLETED_STATES = (RunningState.ABORTED, RunningState.FAILED, RunningState.SUCCESS)

_RUNNING_STATES = (RunningState.STARTED,)
_TASK_TO_PIPELINE_CONVERSIONS = {
# tasks are initially in NOT_STARTED state, then they transition to published
Expand Down Expand Up @@ -50,16 +50,16 @@
RunningState.NOT_STARTED,
): RunningState.NOT_STARTED,
# if there are only completed states with FAILED --> FAILED
(*_COMPLETED_STATES,): RunningState.FAILED,
(*RUNNING_STATE_COMPLETED_STATES,): RunningState.FAILED,
# if there are only completed states with FAILED and not started ones --> NOT_STARTED
(
*_COMPLETED_STATES,
*RUNNING_STATE_COMPLETED_STATES,
RunningState.NOT_STARTED,
): RunningState.NOT_STARTED,
# the generic case where we have a combination of completed states, running states,
# or published/pending tasks, not_started is a started pipeline
(
*_COMPLETED_STATES,
*RUNNING_STATE_COMPLETED_STATES,
*_RUNNING_STATES,
RunningState.PUBLISHED,
RunningState.PENDING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,13 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]:
async def key_version_expected(
dy_static_file_server_dynamic_sidecar_service: dict,
dy_static_file_server_service: dict,
docker_registry_image_injector: Callable,
docker_registry_image_injector: Callable[
[str, str, str | None], Awaitable[dict[str, Any]]
],
) -> list[tuple[ServiceKeyVersion, bool]]:
results: list[tuple[ServiceKeyVersion, bool]] = []

sleeper_service = docker_registry_image_injector(
sleeper_service = await docker_registry_image_injector(
"itisfoundation/sleeper", "2.1.1", "[email protected]"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from aiohttp import web
from models_library.groups import GroupID
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES
from models_library.rabbitmq_messages import (
ComputationalPipelineStatusMessage,
EventRabbitMessage,
Expand All @@ -19,9 +20,9 @@
from pydantic import TypeAdapter
from servicelib.logging_utils import log_catch, log_context
from servicelib.rabbitmq import RabbitMQClient
from servicelib.utils import logged_gather
from servicelib.utils import limited_gather, logged_gather

from ..projects import _projects_service
from ..projects import _nodes_service, _projects_service
from ..rabbitmq import get_rabbitmq_client
from ..socketio.messages import (
SOCKET_IO_EVENT,
Expand Down Expand Up @@ -80,6 +81,10 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
return True


def _is_computational_node(node_key: str) -> bool:
return "/comp/" in node_key


async def _computational_pipeline_status_message_parser(
app: web.Application, data: bytes
) -> bool:
Expand All @@ -90,6 +95,24 @@ async def _computational_pipeline_status_message_parser(
rabbit_message.user_id,
include_state=True,
)
if rabbit_message.run_result in RUNNING_STATE_COMPLETED_STATES:
# the pipeline finished, the frontend needs to update all computational nodes
computational_node_ids = (
n.node_id
for n in await _nodes_service.get_project_nodes(
app, project_uuid=project["uuid"]
)
if _is_computational_node(n.key)
)
await limited_gather(
*[
_projects_service.notify_project_node_update(
app, project, n_id, errors=None
)
for n_id in computational_node_ids
],
limit=10, # notify 10 nodes at a time
)
await _projects_service.notify_project_state_update(app, project)

return True
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
from aiohttp import web
from models_library.projects import ProjectID
from models_library.services_types import ServiceKey, ServiceVersion
from simcore_postgres_database.utils_projects_nodes import ProjectNodesRepo
from simcore_postgres_database.utils_projects_nodes import ProjectNode, ProjectNodesRepo
from simcore_postgres_database.utils_repos import pass_or_acquire_connection

from ..db.plugin import get_database_engine_legacy
from ..db.plugin import get_asyncpg_engine


async def get_project_nodes_services(
app: web.Application, *, project_uuid: ProjectID
) -> list[tuple[ServiceKey, ServiceVersion]]:
repo = ProjectNodesRepo(project_uuid=project_uuid)

async with get_database_engine_legacy(app).acquire() as conn:
async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
nodes = await repo.list(conn)

# removes duplicates by preserving order
return list(dict.fromkeys((node.key, node.version) for node in nodes))


async def get_project_nodes(
app: web.Application, *, project_uuid: ProjectID
) -> list[ProjectNode]:
repo = ProjectNodesRepo(project_uuid=project_uuid)

async with pass_or_acquire_connection(get_asyncpg_engine(app)) as conn:
return await repo.list(conn)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
model_validator,
)
from servicelib.utils import logged_gather
from simcore_postgres_database.utils_projects_nodes import ProjectNode

from ..application_settings import get_application_settings
from ..storage.api import get_download_link, get_files_in_node_folder
Expand Down Expand Up @@ -81,6 +82,12 @@ async def get_project_nodes_services(
)


async def get_project_nodes(
app: web.Application, *, project_uuid: ProjectID
) -> list[ProjectNode]:
return await _nodes_repository.get_project_nodes(app, project_uuid=project_uuid)


#
# PREVIEWS
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1367,9 +1367,10 @@ async def is_node_id_present_in_any_project_workbench(
async def _get_node_share_state(
app: web.Application,
*,
user_id: UserID,
project_uuid: ProjectID,
node_id: NodeID,
computational_pipeline_running: bool | None,
user_primrary_groupid: GroupID,
) -> NodeShareState:
node = await _projects_nodes_repository.get(
app, project_id=project_uuid, node_id=node_id
Expand Down Expand Up @@ -1405,11 +1406,11 @@ async def _get_node_share_state(
return NodeShareState(locked=False)

# if the service is computational and no pipeline is running it is not locked
if await director_v2_service.is_pipeline_running(app, user_id, project_uuid):
if computational_pipeline_running:
return NodeShareState(
locked=True,
current_user_groupids=[
await users_service.get_user_primary_group_id(app, user_id)
user_primrary_groupid,
],
status=NodeShareStatus.OPENED,
)
Expand Down Expand Up @@ -1913,6 +1914,10 @@ async def add_project_states_for_user(
)

# compose the node states
is_pipeline_running = await director_v2_service.is_pipeline_running(
app, user_id, project["uuid"]
)
user_primary_group_id = await users_service.get_user_primary_group_id(app, user_id)
for node_uuid, node in project["workbench"].items():
assert isinstance(node_uuid, str) # nosec
assert isinstance(node, dict) # nosec
Expand All @@ -1921,9 +1926,10 @@ async def add_project_states_for_user(
with contextlib.suppress(NodeShareStateCannotBeComputedError):
node_lock_state = await _get_node_share_state(
app,
user_id=user_id,
project_uuid=project["uuid"],
node_id=NodeID(node_uuid),
computational_pipeline_running=is_pipeline_running,
user_primrary_groupid=user_primary_group_id,
)
if NodeID(node_uuid) in computational_node_states:
node_state = computational_node_states[NodeID(node_uuid)].model_copy(
Expand Down
Loading
Loading