diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index d6c8135eb99..8ea54f2e9e5 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -3,9 +3,10 @@ from abc import abstractmethod from decimal import Decimal from enum import Enum, IntEnum, auto -from typing import Any, Literal, TypeAlias +from typing import Annotated, Any, Literal, TypeAlias import arrow +from common_library.basic_types import DEFAULT_FACTORY from pydantic import BaseModel, Field from .products import ProductName @@ -77,6 +78,21 @@ def routing_key(self) -> str | None: return None +class WebserverInternalEventRabbitMessageAction(StrAutoEnum): + UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE = auto() + + +class WebserverInternalEventRabbitMessage(RabbitMessageBase): + channel_name: Literal["simcore.services.webserver_internal_events"] = ( + "simcore.services.webserver_internal_events" + ) + action: WebserverInternalEventRabbitMessageAction + data: Annotated[dict[str, Any], Field(default_factory=dict)] = DEFAULT_FACTORY + + def routing_key(self) -> str | None: + return None + + class ProgressType(StrAutoEnum): COMPUTATION_RUNNING = auto() # NOTE: this is the original only progress report diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/webserver_parametrizations.py b/packages/pytest-simcore/src/pytest_simcore/helpers/webserver_parametrizations.py index 6422122f4f4..1a2c6367e86 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/webserver_parametrizations.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/webserver_parametrizations.py @@ -1,4 +1,4 @@ -from typing import NamedTuple +from typing import NamedTuple, TypedDict from unittest import mock from servicelib.aiohttp import status @@ -120,3 +120,7 @@ class MockedStorageSubsystem(NamedTuple): delete_project: mock.MagicMock delete_node: mock.MagicMock get_project_total_size_simcore_s3: mock.MagicMock + + +class SocketHandlers(TypedDict): + SOCKET_IO_PROJECT_UPDATED_EVENT: mock.Mock diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index c53c290f3bb..0026e332c82 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -6,6 +6,7 @@ from aiohttp import web from models_library.groups import GroupID +from models_library.projects import ProjectID from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES from models_library.rabbitmq_messages import ( ComputationalPipelineStatusMessage, @@ -15,6 +16,8 @@ ProgressRabbitMessageProject, ProgressType, WalletCreditsMessage, + WebserverInternalEventRabbitMessage, + WebserverInternalEventRabbitMessageAction, ) from models_library.socketio import SocketMessageDict from pydantic import TypeAdapter @@ -35,6 +38,7 @@ ) from ..socketio.models import WebSocketNodeProgress, WebSocketProjectProgress from ..wallets import api as wallets_service +from . import project_logs from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq _logger = logging.getLogger(__name__) @@ -161,6 +165,44 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool: return True +async def _webserver_internal_events_message_parser( + app: web.Application, data: bytes +) -> bool: + """ + Handles internal webserver events that need to be propagated to other webserver replicas + + Ex. Log unsubscription is triggered by user closing a project, which is a REST API call + that can reach any webserver replica. Then this event is propagated to all replicas + so that the one holding the websocket connection can unsubscribe from the logs queue. + """ + + rabbit_message = WebserverInternalEventRabbitMessage.model_validate_json(data) + + if ( + rabbit_message.action + == WebserverInternalEventRabbitMessageAction.UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE + ): + _project_id = rabbit_message.data.get("project_id") + + if _project_id: + _logger.debug( + "Received UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE event for project %s", + _project_id, + ) + await project_logs.unsubscribe(app, ProjectID(_project_id)) + else: + _logger.error( + "Missing project_id in UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE event, this should never happen, investigate!" + ) + + else: + _logger.warning( + "Unknown webserver internal event message action %s", rabbit_message.action + ) + + return True + + async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = TypeAdapter(WalletCreditsMessage).validate_json(data) wallet_groups = await wallets_service.list_wallet_groups_with_read_access_by_wallet( @@ -201,6 +243,11 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b _events_message_parser, {}, ), + SubcribeArgumentsTuple( + WebserverInternalEventRabbitMessage.get_channel_name(), + _webserver_internal_events_message_parser, + {}, + ), SubcribeArgumentsTuple( WalletCreditsMessage.get_channel_name(), _osparc_credits_message_parser, diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py index a1a8411a8f9..6d55df236ec 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py @@ -20,7 +20,7 @@ managed_resource, ) from .._projects_service import ( - conditionally_unsubscribe_from_project_logs, + conditionally_unsubscribe_project_logs_across_replicas, retrieve_and_notify_project_locked_state, ) @@ -83,7 +83,7 @@ async def _on_user_disconnected( ) for _project_id in projects: # At the moment, only 1 is expected - await conditionally_unsubscribe_from_project_logs( + await conditionally_unsubscribe_project_logs_across_replicas( app, ProjectID(_project_id), user_id ) diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py index bec0a0017f1..24dbacc3e1e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py @@ -34,7 +34,6 @@ from ...users import users_service from ...utils_aiohttp import envelope_json_response, get_api_base_url from .. import _projects_service, projects_wallets_service -from .._projects_service import conditionally_unsubscribe_from_project_logs from ..exceptions import ProjectStartsTooManyDynamicNodesError from ._rest_exceptions import handle_plugin_requests_exceptions from ._rest_schemas import AuthenticatedRequestContext, ProjectPathParams @@ -223,10 +222,6 @@ async def close_project(request: web.Request) -> web.Response: ), ) - await conditionally_unsubscribe_from_project_logs( - request.app, path_params.project_id, req_ctx.user_id - ) - return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index 72711e7b7dd..9bd620d3d23 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -62,6 +62,10 @@ ProjectStatus, RunningState, ) +from models_library.rabbitmq_messages import ( + WebserverInternalEventRabbitMessage, + WebserverInternalEventRabbitMessageAction, +) from models_library.resource_tracker import ( HardwareInfo, PricingAndHardwareInfoTuple, @@ -115,9 +119,8 @@ from ..director_v2 import director_v2_service from ..dynamic_scheduler import api as dynamic_scheduler_service from ..models import ClientSessionID -from ..notifications import project_logs from ..products import products_web -from ..rabbitmq import get_rabbitmq_rpc_client +from ..rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client from ..redis import get_redis_lock_manager_client_sdk from ..resource_manager.models import UserSession from ..resource_manager.registry import get_registry @@ -187,19 +190,43 @@ _logger = logging.getLogger(__name__) -async def conditionally_unsubscribe_from_project_logs( +async def _publish_unsubscribe_from_project_logs_event( + app: web.Application, project_id: ProjectID, user_id: UserID +) -> None: + """Publishes an unsubscribe event for project logs to all webserver replicas.""" + rabbitmq_client = get_rabbitmq_client(app) + message = WebserverInternalEventRabbitMessage( + action=WebserverInternalEventRabbitMessageAction.UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE, + data={"project_id": f"{project_id}"}, + ) + _logger.debug( + "No active socket connections detected for project %s by user %s. Sending unsubscribe event to all replicas.", + project_id, + user_id, + ) + await rabbitmq_client.publish(message.channel_name, message) + + +async def conditionally_unsubscribe_project_logs_across_replicas( app: web.Application, project_id: ProjectID, user_id: UserID ) -> None: """ - Unsubscribes from project logs only if no active socket connections remain for the project. + Unsubscribes from project logs only if no active socket connections remain for the project (across all users). This function checks for actual socket connections rather than just user sessions, - ensuring logs are only unsubscribed when truly no users are connected. + ensuring logs are only unsubscribed when truly no users are connected. When no active + connections are detected, an unsubscribe event is sent via RabbitMQ to all webserver + replicas to coordinate the unsubscription across the entire cluster. + + Note: With multiple webserver replicas, this ensures we don't unsubscribe until + the last socket is closed. However, another replica may still maintain an active + subscription even if no users are connected to it, as the RabbitMQ event ensures + all replicas receive the unsubscription notification. Args: app: The web application instance - project_id: The project ID to check - user_id: Optional user ID to use for the resource session (defaults to 0 if None) + project_id: The project ID to check for active connections + user_id: User ID to use for the resource session management """ redis_resource_registry = get_registry(app) with managed_resource(user_id, None, app) as user_session: @@ -221,7 +248,7 @@ async def conditionally_unsubscribe_from_project_logs( # the last socket is closed, though another replica may still maintain an active # subscription even if no users are connected to it. if actually_used_sockets_on_project == 0: - await project_logs.unsubscribe(app, project_id) + await _publish_unsubscribe_from_project_logs_event(app, project_id, user_id) async def patch_project_and_notify_users( @@ -1569,7 +1596,7 @@ async def _leave_project_room( sio = get_socket_server(app) await sio.leave_room(socket_id, SocketIORoomStr.from_project_id(project_uuid)) else: - _logger.error( + _logger.warning( "User %s/%s has no socket_id, cannot leave project room %s", user_id, client_session_id, @@ -1787,6 +1814,10 @@ async def close_project_for_user( ) await notify_project_state_update(app, project) + await conditionally_unsubscribe_project_logs_across_replicas( + app, project_uuid, user_id + ) + async def _get_project_share_state( user_id: int, diff --git a/services/web/server/tests/unit/with_dbs/02/conftest.py b/services/web/server/tests/unit/with_dbs/02/conftest.py index 714614c9d95..a7503be096c 100644 --- a/services/web/server/tests/unit/with_dbs/02/conftest.py +++ b/services/web/server/tests/unit/with_dbs/02/conftest.py @@ -3,6 +3,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable +import asyncio import contextlib import re from collections.abc import AsyncIterator, Awaitable, Callable @@ -14,7 +15,8 @@ from unittest import mock import pytest -from aiohttp.test_utils import TestClient +import socketio +from aiohttp.test_utils import TestClient, TestServer from aioresponses import aioresponses from common_library.json_serialization import json_dumps from faker import Faker @@ -30,12 +32,14 @@ from pytest_simcore.helpers.assert_checks import assert_status from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.webserver_parametrizations import SocketHandlers from pytest_simcore.helpers.webserver_projects import NewProject, delete_all_projects from pytest_simcore.helpers.webserver_users import UserInfoDict from settings_library.catalog import CatalogSettings from simcore_service_webserver.application_settings import get_application_settings from simcore_service_webserver.catalog.settings import get_plugin_settings from simcore_service_webserver.projects.models import ProjectDict +from simcore_service_webserver.socketio.messages import SOCKET_IO_PROJECT_UPDATED_EVENT @pytest.fixture @@ -533,3 +537,71 @@ def with_enabled_rtc_collaboration_limited_to_1_user( ) }, ) + + +@pytest.fixture +async def client_on_running_server_factory( + client: TestClient, +) -> AsyncIterator[Callable[[], TestClient]]: + # Creates clients connected to the same server as the reference client + # + # Implemented as aihttp_client but creates a client using a running server, + # i.e. avoid client.start_server + + assert isinstance(client.server, TestServer) + + clients = [] + + def go() -> TestClient: + cli = TestClient(client.server, loop=asyncio.get_event_loop()) + assert client.server.started + # AVOIDS client.start_server + clients.append(cli) + return cli + + yield go + + async def close_client_but_not_server(cli: TestClient) -> None: + # pylint: disable=protected-access + if not cli._closed: # noqa: SLF001 + for resp in cli._responses: # noqa: SLF001 + resp.close() + for ws in cli._websockets: # noqa: SLF001 + await ws.close() + await cli._session.close() # noqa: SLF001 + cli._closed = True # noqa: SLF001 + + async def finalize(): + while clients: + await close_client_but_not_server(clients.pop()) + + await finalize() + + +@pytest.fixture +async def create_socketio_connection_with_handlers( + create_socketio_connection: Callable[ + [str | None, TestClient | None], Awaitable[tuple[socketio.AsyncClient, str]] + ], + mocker: MockerFixture, +) -> Callable[ + [str | None, TestClient], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], +]: + async def _( + client_session_id: str | None, client: TestClient + ) -> tuple[socketio.AsyncClient, str, SocketHandlers]: + sio, received_client_id = await create_socketio_connection( + client_session_id, client + ) + assert sio.sid + + event_handlers = SocketHandlers( + **{SOCKET_IO_PROJECT_UPDATED_EVENT: mocker.Mock()} + ) + + for event, handler in event_handlers.items(): + sio.on(event, handler=handler) + return sio, received_client_id, event_handlers + + return _ diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_service_conditionally_unsubscribe_from_project_logs.py b/services/web/server/tests/unit/with_dbs/02/test_projects_service_conditionally_unsubscribe_from_project_logs.py new file mode 100644 index 00000000000..460a55ca3ce --- /dev/null +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_service_conditionally_unsubscribe_from_project_logs.py @@ -0,0 +1,115 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=too-many-positional-arguments +# pylint: disable=too-many-statements +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=protected-access + + +import contextlib +from collections.abc import Awaitable, Callable + +import pytest +import socketio +from aiohttp.test_utils import TestClient +from pytest_mock import MockerFixture, MockType +from pytest_simcore.helpers.webserver_login import log_client_in +from pytest_simcore.helpers.webserver_parametrizations import ( + ExpectedResponse, + SocketHandlers, +) +from servicelib.aiohttp import status +from simcore_service_webserver.db.models import UserRole +from test_projects_states_handlers import _close_project, _open_project + + +@pytest.fixture +def max_number_of_user_sessions() -> int: + return 3 + + +@pytest.fixture +def mocked_publish_unsubscribe_from_project_logs_event( + mocker: MockerFixture, +) -> MockType: + import simcore_service_webserver.projects._projects_service # noqa: PLC0415 + + return mocker.patch.object( + simcore_service_webserver.projects._projects_service, # noqa: SLF001 + "_publish_unsubscribe_from_project_logs_event", + autospec=True, + ) + + +@pytest.mark.parametrize( + "user_role,expected", + [ + (UserRole.USER, status.HTTP_200_OK), + ], +) +async def test_conditionally_unsubscribe_from_project_logs( + max_number_of_user_sessions: int, + with_enabled_rtc_collaboration: None, + client: TestClient, + client_on_running_server_factory: Callable[[], TestClient], + logged_user: dict, + shared_project: dict, + user_role: UserRole, + expected: ExpectedResponse, + exit_stack: contextlib.AsyncExitStack, + create_socketio_connection_with_handlers: Callable[ + [str | None, TestClient], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], + ], + mocked_dynamic_services_interface: dict[str, MockType], + mocked_publish_unsubscribe_from_project_logs_event: MockType, + mock_catalog_api: dict[str, MockType], + mocker: MockerFixture, +): + # Use-case: 2 users open the same shared project, then close it + # Only when the last user closes the project, the unsubscribe from project logs is done + # (this is important to avoid loosing logs when multiple users are working on the same project) + + client_1 = client + client_2 = client_on_running_server_factory() + + # 1. user 1 opens project + sio1, client_id1, sio1_handlers = await create_socketio_connection_with_handlers( + None, client_1 + ) + await _open_project( + client_1, + client_id1, + shared_project, + status.HTTP_200_OK, + ) + + # 2. create a separate client now and log in user2, open the same shared project + user_2 = await log_client_in( + client_2, + {"role": user_role.name}, + enable_check=True, + exit_stack=exit_stack, + ) + sio2, client_id2, sio2_handlers = await create_socketio_connection_with_handlers( + None, client_2 + ) + await _open_project( + client_2, + client_id2, + shared_project, + status.HTTP_200_OK, + ) + + # 3. user 1 closes the project (As user 2 is still connected, no unsubscribe should happen) + await _close_project( + client_1, client_id1, shared_project, status.HTTP_204_NO_CONTENT + ) + assert not mocked_publish_unsubscribe_from_project_logs_event.called + + # 4. user 2 closes the project (now unsubscribe should happen) + await _close_project( + client_2, client_id2, shared_project, status.HTTP_204_NO_CONTENT + ) + assert mocked_publish_unsubscribe_from_project_logs_event.called diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py b/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py index cfa61a8e5df..0f4f4e0405e 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py @@ -8,12 +8,12 @@ import asyncio import contextlib import logging -from collections.abc import AsyncIterator, Awaitable, Callable, Iterator +from collections.abc import Awaitable, Callable, Iterator from copy import deepcopy from datetime import UTC, datetime, timedelta from decimal import Decimal from http import HTTPStatus -from typing import Any, TypedDict +from typing import Any from unittest import mock from unittest.mock import call @@ -21,7 +21,7 @@ import socketio import sqlalchemy as sa from aiohttp import ClientResponse -from aiohttp.test_utils import TestClient, TestServer +from aiohttp.test_utils import TestClient from deepdiff import DeepDiff # type: ignore[attr-defined] from faker import Faker from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet @@ -49,6 +49,7 @@ ServiceResourcesDict, ServiceResourcesDictHelpers, ) +from models_library.users import UserID from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import TypeAdapter from pytest_mock import MockerFixture @@ -57,6 +58,7 @@ from pytest_simcore.helpers.webserver_login import LoggedUser, log_client_in from pytest_simcore.helpers.webserver_parametrizations import ( ExpectedResponse, + SocketHandlers, standard_role_response, standard_user_role_response, ) @@ -151,39 +153,6 @@ async def _replace_project( return data -class _SocketHandlers(TypedDict): - SOCKET_IO_PROJECT_UPDATED_EVENT: mock.Mock - - -@pytest.fixture -async def create_socketio_connection_with_handlers( - create_socketio_connection: Callable[ - [str | None, TestClient | None], Awaitable[tuple[socketio.AsyncClient, str]] - ], - mocker: MockerFixture, -) -> Callable[ - [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], -]: - async def _( - client_session_id: str | None, client: TestClient - ) -> tuple[socketio.AsyncClient, str, _SocketHandlers]: - sio, received_client_id = await create_socketio_connection( - client_session_id, client - ) - assert sio.sid - - event_handlers = _SocketHandlers( - **{SOCKET_IO_PROJECT_UPDATED_EVENT: mocker.Mock()} - ) - - for event, handler in event_handlers.items(): - sio.on(event, handler=handler) - return sio, received_client_id, event_handlers - - return _ - - async def _open_project( client: TestClient, client_id: str, @@ -471,7 +440,7 @@ async def test_open_project( user_project: ProjectDict, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: HTTPStatus, save_state: bool, @@ -564,7 +533,7 @@ async def test_open_project__in_debt( user_project: ProjectDict, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: HTTPStatus, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -628,7 +597,7 @@ async def test_open_template_project_for_edition( create_template_project: Callable[..., Awaitable[ProjectDict]], create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: HTTPStatus, save_state: bool, @@ -715,7 +684,7 @@ async def test_open_template_project_for_edition_with_missing_write_rights( create_template_project: Callable[..., Awaitable[ProjectDict]], create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: HTTPStatus, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -749,7 +718,7 @@ async def test_open_project_with_small_amount_of_dynamic_services_starts_them_au user_project_with_num_dynamic_services: Callable[[int], Awaitable[ProjectDict]], create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: ExpectedResponse, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -802,7 +771,7 @@ async def test_open_project_with_disable_service_auto_start_set_overrides_behavi user_project_with_num_dynamic_services: Callable[[int], Awaitable[ProjectDict]], create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: ExpectedResponse, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -855,7 +824,7 @@ async def test_open_project_with_large_amount_of_dynamic_services_does_not_start user_project_with_num_dynamic_services: Callable[[int], Awaitable[ProjectDict]], create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: ExpectedResponse, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -909,7 +878,7 @@ async def test_open_project_with_large_amount_of_dynamic_services_starts_them_if user_project_with_num_dynamic_services: Callable[[int], Awaitable[ProjectDict]], create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: ExpectedResponse, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -964,7 +933,7 @@ async def test_open_project_with_deprecated_services_ok_but_does_not_start_dynam user_project, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], expected: ExpectedResponse, mocked_dynamic_services_interface: dict[str, mock.Mock], @@ -1023,7 +992,7 @@ async def test_open_project_more_than_limitation_of_max_studies_open_per_user( logged_user, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], user_project: ProjectDict, shared_project: ProjectDict, @@ -1072,6 +1041,7 @@ async def test_close_project( fake_services: Callable[..., Awaitable[list[DynamicServiceGet]]], mock_dynamic_scheduler_rabbitmq: None, mocked_notifications_plugin: dict[str, mock.Mock], + mocked_conditionally_unsubscribe_project_logs: mock.Mock, ): # POST /v0/projects/{project_id}:close fake_dynamic_services = await fake_services(number_services=5) @@ -1109,8 +1079,10 @@ async def test_close_project( await assert_status(resp, expected.no_content) if resp.status == status.HTTP_204_NO_CONTENT: - mocked_notifications_plugin["unsubscribe"].assert_called_once_with( - client.app, ProjectID(user_project["uuid"]) + mocked_conditionally_unsubscribe_project_logs.assert_called_once_with( + client.app, + project_id=ProjectID(user_project["uuid"]), + user_id=UserID(user_id), ) # These checks are after a fire&forget, so we wait a moment await asyncio.sleep(2) @@ -1408,45 +1380,6 @@ async def test_project_node_lifetime( # noqa: PLR0915 mock_storage_api_delete_data_folders_of_project_node.assert_not_called() -@pytest.fixture -async def client_on_running_server_factory( - client: TestClient, -) -> AsyncIterator[Callable[[], TestClient]]: - # Creates clients connected to the same server as the reference client - # - # Implemented as aihttp_client but creates a client using a running server, - # i.e. avoid client.start_server - - assert isinstance(client.server, TestServer) - - clients = [] - - def go() -> TestClient: - cli = TestClient(client.server, loop=asyncio.get_event_loop()) - assert client.server.started - # AVOIDS client.start_server - clients.append(cli) - return cli - - yield go - - async def close_client_but_not_server(cli: TestClient) -> None: - # pylint: disable=protected-access - if not cli._closed: # noqa: SLF001 - for resp in cli._responses: # noqa: SLF001 - resp.close() - for ws in cli._websockets: # noqa: SLF001 - await ws.close() - await cli._session.close() # noqa: SLF001 - cli._closed = True # noqa: SLF001 - - async def finalize(): - while clients: - await close_client_but_not_server(clients.pop()) - - await finalize() - - @pytest.fixture def clean_redis_table(redis_client) -> None: """this just ensures the redis table is cleaned up between test runs""" @@ -1465,7 +1398,7 @@ async def test_open_shared_project_multiple_users( exit_stack: contextlib.AsyncExitStack, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], mocked_dynamic_services_interface: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], @@ -1506,7 +1439,7 @@ async def test_open_shared_project_multiple_users( # now we create more users and open the same project until we reach the maximum number of user sessions other_users: list[ - tuple[UserInfoDict, TestClient, str, socketio.AsyncClient, _SocketHandlers] + tuple[UserInfoDict, TestClient, str, socketio.AsyncClient, SocketHandlers] ] = [] for user_session in range(1, max_number_of_user_sessions): client_i = client_on_running_server_factory() @@ -1635,7 +1568,7 @@ async def test_refreshing_tab_of_opened_project_multiple_users( expected: ExpectedResponse, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], mocked_dynamic_services_interface: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], @@ -1729,7 +1662,7 @@ async def test_closing_and_reopening_tab_of_opened_project_multiple_users( exit_stack: contextlib.AsyncExitStack, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], mocked_dynamic_services_interface: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], @@ -1819,7 +1752,7 @@ async def test_open_shared_project_2_users_locked_remove_once_rtc_collaboration_ exit_stack: contextlib.AsyncExitStack, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], ): # Use-case: user 1 opens a shared project, user 2 tries to open it as well @@ -2060,7 +1993,7 @@ async def test_open_shared_project_at_same_time( exit_stack: contextlib.AsyncExitStack, create_socketio_connection_with_handlers: Callable[ [str | None, TestClient], - Awaitable[tuple[socketio.AsyncClient, str, _SocketHandlers]], + Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]], ], ): NUMBER_OF_ADDITIONAL_CLIENTS = 10 diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index b5040eb6455..085f8ea02eb 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -2,6 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable # pylint: disable=too-many-arguments +# pylint: disable=protected-access import random import sys @@ -44,7 +45,7 @@ from models_library.users import UserID from pydantic import ByteSize, TypeAdapter from pytest_docker.plugin import Services -from pytest_mock import MockerFixture +from pytest_mock import MockerFixture, MockType from pytest_simcore.helpers import postgres_tools from pytest_simcore.helpers.faker_factories import random_product from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict @@ -707,6 +708,19 @@ def mocked_notifications_plugin(mocker: MockerFixture) -> dict[str, mock.Mock]: return {"subscribe": mocked_subscribe, "unsubscribe": mocked_unsubscribe} +@pytest.fixture +def mocked_conditionally_unsubscribe_project_logs( + mocker: MockerFixture, +) -> MockType: + import simcore_service_webserver.projects._projects_service # noqa: PLC0415 + + return mocker.patch.object( + simcore_service_webserver.projects._projects_service, # noqa: SLF001 + "conditionally_unsubscribe_project_logs_across_replicas", + autospec=True, + ) + + @pytest.fixture async def user_project( client: TestClient,