Skip to content

Commit 35f4330

Browse files
authored
Merge branch 'master' into fix_duplicate_map_inputs
2 parents dba8dd7 + c35f874 commit 35f4330

File tree

10 files changed

+340
-113
lines changed

10 files changed

+340
-113
lines changed

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
from abc import abstractmethod
44
from decimal import Decimal
55
from enum import Enum, IntEnum, auto
6-
from typing import Any, Literal, TypeAlias
6+
from typing import Annotated, Any, Literal, TypeAlias
77

88
import arrow
9+
from common_library.basic_types import DEFAULT_FACTORY
910
from pydantic import BaseModel, Field
1011

1112
from .products import ProductName
@@ -77,6 +78,21 @@ def routing_key(self) -> str | None:
7778
return None
7879

7980

81+
class WebserverInternalEventRabbitMessageAction(StrAutoEnum):
82+
UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE = auto()
83+
84+
85+
class WebserverInternalEventRabbitMessage(RabbitMessageBase):
86+
channel_name: Literal["simcore.services.webserver_internal_events"] = (
87+
"simcore.services.webserver_internal_events"
88+
)
89+
action: WebserverInternalEventRabbitMessageAction
90+
data: Annotated[dict[str, Any], Field(default_factory=dict)] = DEFAULT_FACTORY
91+
92+
def routing_key(self) -> str | None:
93+
return None
94+
95+
8096
class ProgressType(StrAutoEnum):
8197
COMPUTATION_RUNNING = auto() # NOTE: this is the original only progress report
8298

packages/pytest-simcore/src/pytest_simcore/helpers/webserver_parametrizations.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import NamedTuple
1+
from typing import NamedTuple, TypedDict
22
from unittest import mock
33

44
from servicelib.aiohttp import status
@@ -120,3 +120,7 @@ class MockedStorageSubsystem(NamedTuple):
120120
delete_project: mock.MagicMock
121121
delete_node: mock.MagicMock
122122
get_project_total_size_simcore_s3: mock.MagicMock
123+
124+
125+
class SocketHandlers(TypedDict):
126+
SOCKET_IO_PROJECT_UPDATED_EVENT: mock.Mock

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from aiohttp import web
88
from models_library.groups import GroupID
9+
from models_library.projects import ProjectID
910
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES
1011
from models_library.rabbitmq_messages import (
1112
ComputationalPipelineStatusMessage,
@@ -15,6 +16,8 @@
1516
ProgressRabbitMessageProject,
1617
ProgressType,
1718
WalletCreditsMessage,
19+
WebserverInternalEventRabbitMessage,
20+
WebserverInternalEventRabbitMessageAction,
1821
)
1922
from models_library.socketio import SocketMessageDict
2023
from pydantic import TypeAdapter
@@ -35,6 +38,7 @@
3538
)
3639
from ..socketio.models import WebSocketNodeProgress, WebSocketProjectProgress
3740
from ..wallets import api as wallets_service
41+
from . import project_logs
3842
from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq
3943

4044
_logger = logging.getLogger(__name__)
@@ -161,6 +165,44 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool:
161165
return True
162166

163167

168+
async def _webserver_internal_events_message_parser(
169+
app: web.Application, data: bytes
170+
) -> bool:
171+
"""
172+
Handles internal webserver events that need to be propagated to other webserver replicas
173+
174+
Ex. Log unsubscription is triggered by user closing a project, which is a REST API call
175+
that can reach any webserver replica. Then this event is propagated to all replicas
176+
so that the one holding the websocket connection can unsubscribe from the logs queue.
177+
"""
178+
179+
rabbit_message = WebserverInternalEventRabbitMessage.model_validate_json(data)
180+
181+
if (
182+
rabbit_message.action
183+
== WebserverInternalEventRabbitMessageAction.UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE
184+
):
185+
_project_id = rabbit_message.data.get("project_id")
186+
187+
if _project_id:
188+
_logger.debug(
189+
"Received UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE event for project %s",
190+
_project_id,
191+
)
192+
await project_logs.unsubscribe(app, ProjectID(_project_id))
193+
else:
194+
_logger.error(
195+
"Missing project_id in UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE event, this should never happen, investigate!"
196+
)
197+
198+
else:
199+
_logger.warning(
200+
"Unknown webserver internal event message action %s", rabbit_message.action
201+
)
202+
203+
return True
204+
205+
164206
async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> bool:
165207
rabbit_message = TypeAdapter(WalletCreditsMessage).validate_json(data)
166208
wallet_groups = await wallets_service.list_wallet_groups_with_read_access_by_wallet(
@@ -201,6 +243,11 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b
201243
_events_message_parser,
202244
{},
203245
),
246+
SubcribeArgumentsTuple(
247+
WebserverInternalEventRabbitMessage.get_channel_name(),
248+
_webserver_internal_events_message_parser,
249+
{},
250+
),
204251
SubcribeArgumentsTuple(
205252
WalletCreditsMessage.get_channel_name(),
206253
_osparc_credits_message_parser,

services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
managed_resource,
2121
)
2222
from .._projects_service import (
23-
conditionally_unsubscribe_from_project_logs,
23+
conditionally_unsubscribe_project_logs_across_replicas,
2424
retrieve_and_notify_project_locked_state,
2525
)
2626

@@ -83,7 +83,7 @@ async def _on_user_disconnected(
8383
)
8484

8585
for _project_id in projects: # At the moment, only 1 is expected
86-
await conditionally_unsubscribe_from_project_logs(
86+
await conditionally_unsubscribe_project_logs_across_replicas(
8787
app, ProjectID(_project_id), user_id
8888
)
8989

services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from ...users import users_service
3535
from ...utils_aiohttp import envelope_json_response, get_api_base_url
3636
from .. import _projects_service, projects_wallets_service
37-
from .._projects_service import conditionally_unsubscribe_from_project_logs
3837
from ..exceptions import ProjectStartsTooManyDynamicNodesError
3938
from ._rest_exceptions import handle_plugin_requests_exceptions
4039
from ._rest_schemas import AuthenticatedRequestContext, ProjectPathParams
@@ -223,10 +222,6 @@ async def close_project(request: web.Request) -> web.Response:
223222
),
224223
)
225224

226-
await conditionally_unsubscribe_from_project_logs(
227-
request.app, path_params.project_id, req_ctx.user_id
228-
)
229-
230225
return web.json_response(status=status.HTTP_204_NO_CONTENT)
231226

232227

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@
6262
ProjectStatus,
6363
RunningState,
6464
)
65+
from models_library.rabbitmq_messages import (
66+
WebserverInternalEventRabbitMessage,
67+
WebserverInternalEventRabbitMessageAction,
68+
)
6569
from models_library.resource_tracker import (
6670
HardwareInfo,
6771
PricingAndHardwareInfoTuple,
@@ -115,9 +119,8 @@
115119
from ..director_v2 import director_v2_service
116120
from ..dynamic_scheduler import api as dynamic_scheduler_service
117121
from ..models import ClientSessionID
118-
from ..notifications import project_logs
119122
from ..products import products_web
120-
from ..rabbitmq import get_rabbitmq_rpc_client
123+
from ..rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client
121124
from ..redis import get_redis_lock_manager_client_sdk
122125
from ..resource_manager.models import UserSession
123126
from ..resource_manager.registry import get_registry
@@ -187,19 +190,43 @@
187190
_logger = logging.getLogger(__name__)
188191

189192

190-
async def conditionally_unsubscribe_from_project_logs(
193+
async def _publish_unsubscribe_from_project_logs_event(
194+
app: web.Application, project_id: ProjectID, user_id: UserID
195+
) -> None:
196+
"""Publishes an unsubscribe event for project logs to all webserver replicas."""
197+
rabbitmq_client = get_rabbitmq_client(app)
198+
message = WebserverInternalEventRabbitMessage(
199+
action=WebserverInternalEventRabbitMessageAction.UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE,
200+
data={"project_id": f"{project_id}"},
201+
)
202+
_logger.debug(
203+
"No active socket connections detected for project %s by user %s. Sending unsubscribe event to all replicas.",
204+
project_id,
205+
user_id,
206+
)
207+
await rabbitmq_client.publish(message.channel_name, message)
208+
209+
210+
async def conditionally_unsubscribe_project_logs_across_replicas(
191211
app: web.Application, project_id: ProjectID, user_id: UserID
192212
) -> None:
193213
"""
194-
Unsubscribes from project logs only if no active socket connections remain for the project.
214+
Unsubscribes from project logs only if no active socket connections remain for the project (across all users).
195215
196216
This function checks for actual socket connections rather than just user sessions,
197-
ensuring logs are only unsubscribed when truly no users are connected.
217+
ensuring logs are only unsubscribed when truly no users are connected. When no active
218+
connections are detected, an unsubscribe event is sent via RabbitMQ to all webserver
219+
replicas to coordinate the unsubscription across the entire cluster.
220+
221+
Note: With multiple webserver replicas, this ensures we don't unsubscribe until
222+
the last socket is closed. However, another replica may still maintain an active
223+
subscription even if no users are connected to it, as the RabbitMQ event ensures
224+
all replicas receive the unsubscription notification.
198225
199226
Args:
200227
app: The web application instance
201-
project_id: The project ID to check
202-
user_id: Optional user ID to use for the resource session (defaults to 0 if None)
228+
project_id: The project ID to check for active connections
229+
user_id: User ID to use for the resource session management
203230
"""
204231
redis_resource_registry = get_registry(app)
205232
with managed_resource(user_id, None, app) as user_session:
@@ -221,7 +248,7 @@ async def conditionally_unsubscribe_from_project_logs(
221248
# the last socket is closed, though another replica may still maintain an active
222249
# subscription even if no users are connected to it.
223250
if actually_used_sockets_on_project == 0:
224-
await project_logs.unsubscribe(app, project_id)
251+
await _publish_unsubscribe_from_project_logs_event(app, project_id, user_id)
225252

226253

227254
async def patch_project_and_notify_users(
@@ -1569,7 +1596,7 @@ async def _leave_project_room(
15691596
sio = get_socket_server(app)
15701597
await sio.leave_room(socket_id, SocketIORoomStr.from_project_id(project_uuid))
15711598
else:
1572-
_logger.error(
1599+
_logger.warning(
15731600
"User %s/%s has no socket_id, cannot leave project room %s",
15741601
user_id,
15751602
client_session_id,
@@ -1787,6 +1814,10 @@ async def close_project_for_user(
17871814
)
17881815
await notify_project_state_update(app, project)
17891816

1817+
await conditionally_unsubscribe_project_logs_across_replicas(
1818+
app, project_uuid, user_id
1819+
)
1820+
17901821

17911822
async def _get_project_share_state(
17921823
user_id: int,

services/web/server/tests/unit/with_dbs/02/conftest.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# pylint: disable=unused-argument
44
# pylint: disable=unused-variable
55

6+
import asyncio
67
import contextlib
78
import re
89
from collections.abc import AsyncIterator, Awaitable, Callable
@@ -14,7 +15,8 @@
1415
from unittest import mock
1516

1617
import pytest
17-
from aiohttp.test_utils import TestClient
18+
import socketio
19+
from aiohttp.test_utils import TestClient, TestServer
1820
from aioresponses import aioresponses
1921
from common_library.json_serialization import json_dumps
2022
from faker import Faker
@@ -30,12 +32,14 @@
3032
from pytest_simcore.helpers.assert_checks import assert_status
3133
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
3234
from pytest_simcore.helpers.typing_env import EnvVarsDict
35+
from pytest_simcore.helpers.webserver_parametrizations import SocketHandlers
3336
from pytest_simcore.helpers.webserver_projects import NewProject, delete_all_projects
3437
from pytest_simcore.helpers.webserver_users import UserInfoDict
3538
from settings_library.catalog import CatalogSettings
3639
from simcore_service_webserver.application_settings import get_application_settings
3740
from simcore_service_webserver.catalog.settings import get_plugin_settings
3841
from simcore_service_webserver.projects.models import ProjectDict
42+
from simcore_service_webserver.socketio.messages import SOCKET_IO_PROJECT_UPDATED_EVENT
3943

4044

4145
@pytest.fixture
@@ -533,3 +537,71 @@ def with_enabled_rtc_collaboration_limited_to_1_user(
533537
)
534538
},
535539
)
540+
541+
542+
@pytest.fixture
543+
async def client_on_running_server_factory(
544+
client: TestClient,
545+
) -> AsyncIterator[Callable[[], TestClient]]:
546+
# Creates clients connected to the same server as the reference client
547+
#
548+
# Implemented as aihttp_client but creates a client using a running server,
549+
# i.e. avoid client.start_server
550+
551+
assert isinstance(client.server, TestServer)
552+
553+
clients = []
554+
555+
def go() -> TestClient:
556+
cli = TestClient(client.server, loop=asyncio.get_event_loop())
557+
assert client.server.started
558+
# AVOIDS client.start_server
559+
clients.append(cli)
560+
return cli
561+
562+
yield go
563+
564+
async def close_client_but_not_server(cli: TestClient) -> None:
565+
# pylint: disable=protected-access
566+
if not cli._closed: # noqa: SLF001
567+
for resp in cli._responses: # noqa: SLF001
568+
resp.close()
569+
for ws in cli._websockets: # noqa: SLF001
570+
await ws.close()
571+
await cli._session.close() # noqa: SLF001
572+
cli._closed = True # noqa: SLF001
573+
574+
async def finalize():
575+
while clients:
576+
await close_client_but_not_server(clients.pop())
577+
578+
await finalize()
579+
580+
581+
@pytest.fixture
582+
async def create_socketio_connection_with_handlers(
583+
create_socketio_connection: Callable[
584+
[str | None, TestClient | None], Awaitable[tuple[socketio.AsyncClient, str]]
585+
],
586+
mocker: MockerFixture,
587+
) -> Callable[
588+
[str | None, TestClient],
589+
Awaitable[tuple[socketio.AsyncClient, str, SocketHandlers]],
590+
]:
591+
async def _(
592+
client_session_id: str | None, client: TestClient
593+
) -> tuple[socketio.AsyncClient, str, SocketHandlers]:
594+
sio, received_client_id = await create_socketio_connection(
595+
client_session_id, client
596+
)
597+
assert sio.sid
598+
599+
event_handlers = SocketHandlers(
600+
**{SOCKET_IO_PROJECT_UPDATED_EVENT: mocker.Mock()}
601+
)
602+
603+
for event, handler in event_handlers.items():
604+
sio.on(event, handler=handler)
605+
return sio, received_client_id, event_handlers
606+
607+
return _

0 commit comments

Comments
 (0)