Skip to content

Commit 25f2302

Browse files
🐛 fix loss of logs (on project close/websocket disconnect) & add wallet permission check on wallet (on project open) (#8426)
1 parent 15df617 commit 25f2302

File tree

6 files changed

+89
-28
lines changed

6 files changed

+89
-28
lines changed

services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@
1515
from servicelib.utils import logged_gather
1616

1717
from ...notifications import project_logs
18-
from ...resource_manager.user_sessions import PROJECT_ID_KEY, managed_resource
19-
from .._projects_service import retrieve_and_notify_project_locked_state
18+
from ...resource_manager.user_sessions import (
19+
PROJECT_ID_KEY,
20+
managed_resource,
21+
)
22+
from .._projects_service import (
23+
conditionally_unsubscribe_from_project_logs,
24+
retrieve_and_notify_project_locked_state,
25+
)
2026

2127
_logger = logging.getLogger(__name__)
2228

@@ -67,15 +73,6 @@ async def _on_user_disconnected(
6773

6874
assert len(projects) <= 1, "At the moment, at most one project per session" # nosec
6975

70-
with log_context(
71-
_logger,
72-
logging.DEBUG,
73-
msg=f"user disconnects and unsubscribes from following {projects=}",
74-
):
75-
await logged_gather(
76-
*[project_logs.unsubscribe(app, ProjectID(prj)) for prj in projects]
77-
)
78-
7976
await logged_gather(
8077
*[
8178
retrieve_and_notify_project_locked_state(
@@ -85,6 +82,11 @@ async def _on_user_disconnected(
8582
]
8683
)
8784

85+
for _project_id in projects: # At the moment, only 1 is expected
86+
await conditionally_unsubscribe_from_project_logs(
87+
app, ProjectID(_project_id), user_id
88+
)
89+
8890

8991
def setup_project_observer_events(app: web.Application) -> None:
9092
setup_observer_registry(app)

services/web/server/src/simcore_service_webserver/projects/_controller/projects_states_rest.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from ...users import users_service
3535
from ...utils_aiohttp import envelope_json_response, get_api_base_url
3636
from .. import _projects_service, projects_wallets_service
37+
from .._projects_service import conditionally_unsubscribe_from_project_logs
3738
from ..exceptions import ProjectStartsTooManyDynamicNodesError
3839
from ._rest_exceptions import handle_plugin_requests_exceptions
3940
from ._rest_schemas import AuthenticatedRequestContext, ProjectPathParams
@@ -91,9 +92,10 @@ async def open_project(request: web.Request) -> web.Response:
9192
),
9293
)
9394

94-
await projects_wallets_service.check_project_financial_status(
95+
await projects_wallets_service.check_project_financial_status_and_wallet_access(
9596
request.app,
9697
project_id=path_params.project_id,
98+
user_id=req_ctx.user_id,
9799
product_name=req_ctx.product_name,
98100
)
99101

@@ -220,7 +222,11 @@ async def close_project(request: web.Request) -> web.Response:
220222
X_SIMCORE_USER_AGENT, UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
221223
),
222224
)
223-
await project_logs.unsubscribe(request.app, path_params.project_id)
225+
226+
await conditionally_unsubscribe_from_project_logs(
227+
request.app, path_params.project_id, req_ctx.user_id
228+
)
229+
224230
return web.json_response(status=status.HTTP_204_NO_CONTENT)
225231

226232

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,15 @@
115115
from ..director_v2 import director_v2_service
116116
from ..dynamic_scheduler import api as dynamic_scheduler_service
117117
from ..models import ClientSessionID
118+
from ..notifications import project_logs
118119
from ..products import products_web
119120
from ..rabbitmq import get_rabbitmq_rpc_client
120121
from ..redis import get_redis_lock_manager_client_sdk
121122
from ..resource_manager.models import UserSession
123+
from ..resource_manager.registry import get_registry
122124
from ..resource_manager.user_sessions import (
123125
PROJECT_ID_KEY,
126+
SOCKET_ID_FIELDNAME,
124127
managed_resource,
125128
)
126129
from ..resource_usage import service as rut_api
@@ -184,6 +187,43 @@
184187
_logger = logging.getLogger(__name__)
185188

186189

190+
async def conditionally_unsubscribe_from_project_logs(
191+
app: web.Application, project_id: ProjectID, user_id: UserID
192+
) -> None:
193+
"""
194+
Unsubscribes from project logs only if no active socket connections remain for the project.
195+
196+
This function checks for actual socket connections rather than just user sessions,
197+
ensuring logs are only unsubscribed when truly no users are connected.
198+
199+
Args:
200+
app: The web application instance
201+
project_id: The project ID to check
202+
user_id: Optional user ID to use for the resource session (defaults to 0 if None)
203+
"""
204+
redis_resource_registry = get_registry(app)
205+
with managed_resource(user_id, None, app) as user_session:
206+
all_user_sessions_with_project = await user_session.find_users_of_resource(
207+
app, key=PROJECT_ID_KEY, value=f"{project_id}"
208+
)
209+
210+
# Check for each user session if it has an active socket_id
211+
actually_used_sockets_on_project = 0
212+
for user_session_key in all_user_sessions_with_project:
213+
output = await redis_resource_registry.find_resources(
214+
key=user_session_key, resource_name=SOCKET_ID_FIELDNAME
215+
)
216+
if output:
217+
actually_used_sockets_on_project += 1
218+
219+
# Only unsubscribe from logs if there are no active socket connections to the project.
220+
# NOTE: With multiple webserver replicas, this ensures we don't unsubscribe until
221+
# the last socket is closed, though another replica may still maintain an active
222+
# subscription even if no users are connected to it.
223+
if actually_used_sockets_on_project == 0:
224+
await project_logs.unsubscribe(app, project_id)
225+
226+
187227
async def patch_project_and_notify_users(
188228
app: web.Application,
189229
*,

services/web/server/src/simcore_service_webserver/projects/_wallets_service.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,23 @@ async def get_project_wallet(app, project_id: ProjectID) -> WalletGet | None:
3636
return wallet
3737

3838

39-
async def check_project_financial_status(
40-
app, *, project_id: ProjectID, product_name: ProductName
39+
async def check_project_financial_status_and_wallet_access(
40+
app, *, project_id: ProjectID, user_id: UserID, product_name: ProductName
4141
):
4242
db: ProjectDBAPI = ProjectDBAPI.get_from_app_context(app)
4343

4444
current_project_wallet = await db.get_project_wallet(project_uuid=project_id)
4545
rpc_client = get_rabbitmq_rpc_client(app)
4646

4747
if current_project_wallet:
48+
# ensure the wallet can be used by the user
49+
await wallets_service.get_wallet_by_user(
50+
app,
51+
user_id=user_id,
52+
wallet_id=current_project_wallet.wallet_id,
53+
product_name=product_name,
54+
)
55+
4856
# Do not allow to open project if the project connected wallet is in DEBT!
4957
project_wallet_credits_in_debt = (
5058
await credit_transactions.get_project_wallet_total_credits(

services/web/server/src/simcore_service_webserver/projects/projects_wallets_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from ._wallets_service import (
2-
check_project_financial_status,
2+
check_project_financial_status_and_wallet_access,
33
connect_wallet_to_project,
44
get_project_wallet,
55
)
66

77
__all__: tuple[str, ...] = (
8-
"check_project_financial_status",
8+
"check_project_financial_status_and_wallet_access",
99
"connect_wallet_to_project",
1010
"get_project_wallet",
1111
)

services/web/server/src/simcore_service_webserver/resource_manager/user_sessions.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from collections.abc import Iterator
33
from contextlib import contextmanager
44
from dataclasses import dataclass
5-
from functools import cached_property
65
from typing import Final
76

87
from aiohttp import web
@@ -20,10 +19,10 @@
2019
_logger = logging.getLogger(__name__)
2120

2221

23-
_SOCKET_ID_FIELDNAME: Final[str] = "socket_id"
22+
SOCKET_ID_FIELDNAME: Final[str] = "socket_id"
2423
PROJECT_ID_KEY: Final[str] = "project_id"
2524

26-
assert _SOCKET_ID_FIELDNAME in ResourcesDict.__annotations__ # nosec
25+
assert SOCKET_ID_FIELDNAME in ResourcesDict.__annotations__ # nosec
2726
assert PROJECT_ID_KEY in ResourcesDict.__annotations__ # nosec
2827

2928

@@ -64,7 +63,7 @@ class UserSessionResourcesRegistry:
6463
def _registry(self) -> RedisResourceRegistry:
6564
return get_registry(self.app)
6665

67-
@cached_property
66+
@property
6867
def resource_key(self) -> UserSession:
6968
return UserSession(
7069
user_id=self.user_id,
@@ -81,7 +80,7 @@ async def set_socket_id(self, socket_id: str) -> None:
8180
)
8281

8382
await self._registry.set_resource(
84-
self.resource_key, (_SOCKET_ID_FIELDNAME, socket_id)
83+
self.resource_key, (SOCKET_ID_FIELDNAME, socket_id)
8584
)
8685
# NOTE: hearthbeat is not emulated in tests, make sure that with very small GC intervals
8786
# the resources do not expire; this value is usually in the order of minutes
@@ -113,7 +112,7 @@ async def remove_socket_id(self) -> None:
113112
extra=get_log_record_extra(user_id=self.user_id),
114113
)
115114

116-
await self._registry.remove_resource(self.resource_key, _SOCKET_ID_FIELDNAME)
115+
await self._registry.remove_resource(self.resource_key, SOCKET_ID_FIELDNAME)
117116
await self._registry.set_key_alive(
118117
self.resource_key,
119118
expiration_time=_get_service_deletion_timeout(self.app),
@@ -132,13 +131,13 @@ async def find_socket_ids(self) -> list[str]:
132131
"user %s/tab %s finding %s from registry...",
133132
self.user_id,
134133
self.client_session_id,
135-
_SOCKET_ID_FIELDNAME,
134+
SOCKET_ID_FIELDNAME,
136135
extra=get_log_record_extra(user_id=self.user_id),
137136
)
138137

139138
return await self._registry.find_resources(
140139
UserSession(user_id=self.user_id, client_session_id="*"),
141-
_SOCKET_ID_FIELDNAME,
140+
SOCKET_ID_FIELDNAME,
142141
)
143142

144143
async def find_all_resources_of_user(self, key: str) -> list[str]:
@@ -148,8 +147,11 @@ async def find_all_resources_of_user(self, key: str) -> list[str]:
148147
msg=f"{self.user_id=} finding all {key} from registry",
149148
extra=get_log_record_extra(user_id=self.user_id),
150149
):
151-
return await get_registry(self.app).find_resources(
152-
UserSession(user_id=self.user_id, client_session_id="*"), key
150+
return await self._registry.find_resources(
151+
UserSession(
152+
user_id=self.user_id, client_session_id="*"
153+
), # <-- this one checks for all user tabs
154+
key,
153155
)
154156

155157
async def find(self, resource_name: str) -> list[str]:
@@ -161,7 +163,10 @@ async def find(self, resource_name: str) -> list[str]:
161163
extra=get_log_record_extra(user_id=self.user_id),
162164
)
163165

164-
return await self._registry.find_resources(self.resource_key, resource_name)
166+
return await self._registry.find_resources(
167+
self.resource_key,
168+
resource_name, # <-- when initialized with specific tab (client_session_id), checks only that tab otherwise all tabs
169+
)
165170

166171
async def add(self, key: str, value: str) -> None:
167172
_logger.debug(

0 commit comments

Comments
 (0)