diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index 438dff9ef1fe..cbf26e95d3ad 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -29,6 +29,7 @@ SOCKET_IO_LOG_EVENT, SOCKET_IO_NODE_UPDATED_EVENT, SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT, + send_message_to_project_room, send_message_to_standard_group, send_message_to_user, ) @@ -90,9 +91,9 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool: ).to_socket_dict() if message: - await send_message_to_user( + await send_message_to_project_room( app, - rabbit_message.user_id, + project_id=rabbit_message.project_id, message=message, ) return True diff --git a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py index 2b1c2ca7dd3d..9f01181b6f79 100644 --- a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py @@ -15,6 +15,7 @@ import sqlalchemy as sa from aiohttp.test_utils import TestClient from faker import Faker +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.progress_bar import ProgressReport from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID @@ -54,6 +55,7 @@ from simcore_service_webserver.rest.plugin import setup_rest from simcore_service_webserver.security.plugin import setup_security from simcore_service_webserver.session.plugin import setup_session +from simcore_service_webserver.socketio._utils import get_socket_server from simcore_service_webserver.socketio.messages import ( SOCKET_IO_EVENT, SOCKET_IO_LOG_EVENT, @@ -73,7 +75,7 @@ "redis", ] -pytest_simcore_ops_services_selection = [] +pytest_simcore_ops_services_selection = ["redis-commander"] _STABLE_DELAY_S = 2 @@ -113,7 +115,7 @@ async def _assert_handler_called_with_json( handler: mock.Mock, expected_call: dict[str, Any] ) -> None: async for attempt in AsyncRetrying( - wait=wait_fixed(0.1), + wait=wait_fixed(0.2), stop=stop_after_delay(10), retry=retry_if_exception_type(AssertionError), reraise=True, @@ -186,13 +188,13 @@ def user_project_id(user_project: ProjectDict) -> ProjectID: @pytest.fixture def user_id(logged_user: UserInfoDict) -> UserID: - return UserID(logged_user["id"]) + return logged_user["id"] @pytest.fixture def sender_user_id(user_id: UserID, sender_same_user_id: bool, faker: Faker) -> UserID: if sender_same_user_id is False: - return UserID(faker.pyint(min_value=user_id + 1)) + return faker.pyint(min_value=user_id + 1) return user_id @@ -333,8 +335,10 @@ async def test_progress_non_computational_workflow( """ socket_io_conn, *_ = await create_socketio_connection(None, client) + # the project must be opened here mock_progress_handler = mocker.MagicMock() + socket_io_conn.on( WebSocketNodeProgress.get_event_type(), handler=mock_progress_handler ) @@ -343,6 +347,10 @@ async def test_progress_non_computational_workflow( assert client.app await project_logs.subscribe(client.app, user_project_id) + # this simulates the user openning the project + await get_socket_server(client.app).enter_room( + socket_io_conn.get_sid(), SocketIORoomStr.from_project_id(user_project_id) + ) progress_message = ProgressRabbitMessageNode( user_id=sender_user_id, project_id=user_project_id, @@ -352,7 +360,7 @@ async def test_progress_non_computational_workflow( ) await rabbitmq_publisher.publish(progress_message.channel_name, progress_message) - call_expected = sender_same_user_id and subscribe_to_logs + call_expected = subscribe_to_logs if call_expected: expected_call = WebSocketNodeProgress.from_rabbit_message( progress_message @@ -399,6 +407,10 @@ async def test_progress_computational_workflow( if subscribe_to_logs: assert client.app await project_logs.subscribe(client.app, user_project_id) + # this simulates the user openning the project + await get_socket_server(client.app).enter_room( + socket_io_conn.get_sid(), SocketIORoomStr.from_project_id(user_project_id) + ) progress_message = ProgressRabbitMessageNode( user_id=sender_user_id, project_id=user_project_id, diff --git a/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py index 20d471553393..02c2d34dcbfa 100644 --- a/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py @@ -80,17 +80,17 @@ async def test_regression_progress_message_parser( mocker: MockerFixture, raw_data: bytes, expected_socket_message: SocketMessageDict ): - send_messages_to_user_mock = mocker.patch( - "simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_message_to_user", + send_message_to_project_room_mock = mocker.patch( + "simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_message_to_project_room", autospec=True, ) app = AsyncMock() assert await _progress_message_parser(app, raw_data) - # tests how send_message_to_user is called - assert send_messages_to_user_mock.call_count == 1 - message = send_messages_to_user_mock.call_args.kwargs["message"] + # tests how send_message_to_project_room is called + assert send_message_to_project_room_mock.call_count == 1 + message = send_message_to_project_room_mock.call_args.kwargs["message"] # check that all fields are sent as expected assert message["data"] == expected_socket_message["data"]