Skip to content

Commit 24e5b88

Browse files
add additional fixes
1 parent 115ebd6 commit 24e5b88

File tree

4 files changed

+64
-27
lines changed

4 files changed

+64
-27
lines changed

services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@
1515
from servicelib.utils import logged_gather
1616

1717
from ...notifications import project_logs
18-
from ...resource_manager.user_sessions import PROJECT_ID_KEY, managed_resource
19-
from .._projects_service import retrieve_and_notify_project_locked_state
18+
from ...resource_manager.user_sessions import (
19+
PROJECT_ID_KEY,
20+
managed_resource,
21+
)
22+
from .._projects_service import (
23+
conditionally_unsubscribe_from_project_logs,
24+
retrieve_and_notify_project_locked_state,
25+
)
2026

2127
_logger = logging.getLogger(__name__)
2228

@@ -67,15 +73,6 @@ async def _on_user_disconnected(
6773

6874
assert len(projects) <= 1, "At the moment, at most one project per session" # nosec
6975

70-
with log_context(
71-
_logger,
72-
logging.DEBUG,
73-
msg=f"user disconnects and unsubscribes from following {projects=}",
74-
):
75-
await logged_gather(
76-
*[project_logs.unsubscribe(app, ProjectID(prj)) for prj in projects]
77-
)
78-
7976
await logged_gather(
8077
*[
8178
retrieve_and_notify_project_locked_state(
@@ -85,6 +82,11 @@ async def _on_user_disconnected(
8582
]
8683
)
8784

85+
for _project_id in projects: # At the moment, only 1 is expected
86+
await conditionally_unsubscribe_from_project_logs(
87+
app, ProjectID(_project_id), user_id
88+
)
89+
8890

8991
def setup_project_observer_events(app: web.Application) -> None:
9092
setup_observer_registry(app)

services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@
2828
from ...notifications import project_logs
2929
from ...products import products_web
3030
from ...products.models import Product
31-
from ...resource_manager.user_sessions import PROJECT_ID_KEY, managed_resource
31+
from ...resource_manager.user_sessions import managed_resource
3232
from ...security.decorators import permission_required
3333
from ...socketio.server import get_socket_server
3434
from ...users import users_service
3535
from ...utils_aiohttp import envelope_json_response, get_api_base_url
3636
from .. import _projects_service, projects_wallets_service
37+
from .._projects_service import conditionally_unsubscribe_from_project_logs
3738
from ..exceptions import ProjectStartsTooManyDynamicNodesError
3839
from ._rest_exceptions import handle_plugin_requests_exceptions
3940
from ._rest_schemas import AuthenticatedRequestContext, ProjectPathParams
@@ -222,15 +223,9 @@ async def close_project(request: web.Request) -> web.Response:
222223
),
223224
)
224225

225-
with managed_resource(
226-
req_ctx.user_id, client_session_id, request.app
227-
) as user_session:
228-
all_user_sessions_with_project = await user_session.find_users_of_resource(
229-
request.app, key=PROJECT_ID_KEY, value=f"{path_params.project_id}"
230-
)
231-
# Only unsubscribe from logs if there is no other occurrence of the open project
232-
if len(all_user_sessions_with_project) == 0:
233-
await project_logs.unsubscribe(request.app, path_params.project_id)
226+
await conditionally_unsubscribe_from_project_logs(
227+
request.app, path_params.project_id, req_ctx.user_id
228+
)
234229

235230
return web.json_response(status=status.HTTP_204_NO_CONTENT)
236231

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,15 @@
115115
from ..director_v2 import director_v2_service
116116
from ..dynamic_scheduler import api as dynamic_scheduler_service
117117
from ..models import ClientSessionID
118+
from ..notifications import project_logs
118119
from ..products import products_web
119120
from ..rabbitmq import get_rabbitmq_rpc_client
120121
from ..redis import get_redis_lock_manager_client_sdk
121122
from ..resource_manager.models import UserSession
123+
from ..resource_manager.registry import get_registry
122124
from ..resource_manager.user_sessions import (
123125
PROJECT_ID_KEY,
126+
SOCKET_ID_FIELDNAME,
124127
managed_resource,
125128
)
126129
from ..resource_usage import service as rut_api
@@ -184,6 +187,43 @@
184187
_logger = logging.getLogger(__name__)
185188

186189

190+
async def conditionally_unsubscribe_from_project_logs(
191+
app: web.Application, project_id: ProjectID, user_id: UserID | None = None
192+
) -> None:
193+
"""
194+
Unsubscribes from project logs only if no active socket connections remain for the project.
195+
196+
This function checks for actual socket connections rather than just user sessions,
197+
ensuring logs are only unsubscribed when truly no users are connected.
198+
199+
Args:
200+
app: The web application instance
201+
project_id: The project ID to check
202+
user_id: Optional user ID to use for the resource session (defaults to 0 if None)
203+
"""
204+
redis_resource_registry = get_registry(app)
205+
with managed_resource(user_id or 0, None, app) as user_session:
206+
all_user_sessions_with_project = await user_session.find_users_of_resource(
207+
app, key=PROJECT_ID_KEY, value=f"{project_id}"
208+
)
209+
210+
# Check for each user session if it has an active socket_id
211+
actually_used_sockets_on_project = 0
212+
for user_session_key in all_user_sessions_with_project:
213+
output = await redis_resource_registry.find_resources(
214+
key=user_session_key, resource_name=SOCKET_ID_FIELDNAME
215+
)
216+
if output:
217+
actually_used_sockets_on_project += 1
218+
219+
# Only unsubscribe from logs if there are no active socket connections to the project.
220+
# NOTE: With multiple webserver replicas, this ensures we don't unsubscribe until
221+
# the last socket is closed, though another replica may still maintain an active
222+
# subscription even if no users are connected to it.
223+
if actually_used_sockets_on_project <= 1:
224+
await project_logs.unsubscribe(app, project_id)
225+
226+
187227
async def patch_project_and_notify_users(
188228
app: web.Application,
189229
*,

services/web/server/src/simcore_service_webserver/resource_manager/user_sessions.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
_logger = logging.getLogger(__name__)
2020

2121

22-
_SOCKET_ID_FIELDNAME: Final[str] = "socket_id"
22+
SOCKET_ID_FIELDNAME: Final[str] = "socket_id"
2323
PROJECT_ID_KEY: Final[str] = "project_id"
2424

25-
assert _SOCKET_ID_FIELDNAME in ResourcesDict.__annotations__ # nosec
25+
assert SOCKET_ID_FIELDNAME in ResourcesDict.__annotations__ # nosec
2626
assert PROJECT_ID_KEY in ResourcesDict.__annotations__ # nosec
2727

2828

@@ -80,7 +80,7 @@ async def set_socket_id(self, socket_id: str) -> None:
8080
)
8181

8282
await self._registry.set_resource(
83-
self.resource_key, (_SOCKET_ID_FIELDNAME, socket_id)
83+
self.resource_key, (SOCKET_ID_FIELDNAME, socket_id)
8484
)
8585
# NOTE: hearthbeat is not emulated in tests, make sure that with very small GC intervals
8686
# the resources do not expire; this value is usually in the order of minutes
@@ -112,7 +112,7 @@ async def remove_socket_id(self) -> None:
112112
extra=get_log_record_extra(user_id=self.user_id),
113113
)
114114

115-
await self._registry.remove_resource(self.resource_key, _SOCKET_ID_FIELDNAME)
115+
await self._registry.remove_resource(self.resource_key, SOCKET_ID_FIELDNAME)
116116
await self._registry.set_key_alive(
117117
self.resource_key,
118118
expiration_time=_get_service_deletion_timeout(self.app),
@@ -131,13 +131,13 @@ async def find_socket_ids(self) -> list[str]:
131131
"user %s/tab %s finding %s from registry...",
132132
self.user_id,
133133
self.client_session_id,
134-
_SOCKET_ID_FIELDNAME,
134+
SOCKET_ID_FIELDNAME,
135135
extra=get_log_record_extra(user_id=self.user_id),
136136
)
137137

138138
return await self._registry.find_resources(
139139
UserSession(user_id=self.user_id, client_session_id="*"),
140-
_SOCKET_ID_FIELDNAME,
140+
SOCKET_ID_FIELDNAME,
141141
)
142142

143143
async def find_all_resources_of_user(self, key: str) -> list[str]:

0 commit comments

Comments
 (0)