Skip to content

Commit 378c79e

Browse files
committed
ensure connection is restored
1 parent 961c173 commit 378c79e

File tree

3 files changed

+48
-11
lines changed

3 files changed

+48
-11
lines changed

services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from aiohttp import web
66
from models_library.products import ProductName
77
from models_library.projects import ProjectID
8+
from models_library.users import UserID
89
from servicelib.aiohttp.observer import (
910
registed_observers_report,
1011
register_observer,
@@ -20,8 +21,40 @@
2021
_logger = logging.getLogger(__name__)
2122

2223

24+
async def _on_user_connected(
25+
user_id: UserID,
26+
app: web.Application,
27+
product_name: ProductName,
28+
client_session_id: str,
29+
) -> None:
30+
assert product_name # nosec
31+
# check if there is a project resource
32+
with managed_resource(user_id, client_session_id, app) as user_session:
33+
projects: list[str] = await user_session.find(PROJECT_ID_KEY)
34+
assert len(projects) <= 1, "At the moment, at most one project per session" # nosec
35+
36+
if projects:
37+
with log_context(
38+
_logger,
39+
logging.DEBUG,
40+
msg=f"user connects and subscribes to following {projects=}",
41+
):
42+
await logged_gather(
43+
*[project_logs.subscribe(app, ProjectID(prj)) for prj in projects]
44+
)
45+
46+
await logged_gather(
47+
*[
48+
retrieve_and_notify_project_locked_state(
49+
user_id, prj, app, notify_only_prj_user=True
50+
)
51+
for prj in projects
52+
]
53+
)
54+
55+
2356
async def _on_user_disconnected(
24-
user_id: int,
57+
user_id: UserID,
2558
client_session_id: str,
2659
app: web.Application,
2760
product_name: ProductName,
@@ -56,6 +89,7 @@ async def _on_user_disconnected(
5689
def setup_project_observer_events(app: web.Application) -> None:
5790
setup_observer_registry(app)
5891

92+
register_observer(app, _on_user_connected, event="SIGNAL_USER_CONNECTED")
5993
register_observer(app, _on_user_disconnected, event="SIGNAL_USER_DISCONNECTED")
6094

6195
_logger.info(

services/web/server/src/simcore_service_webserver/resource_usage/_observer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from aiohttp import web
66
from models_library.products import ProductName
7+
from models_library.users import UserID
78
from servicelib.aiohttp.observer import (
89
registed_observers_report,
910
register_observer,
@@ -18,7 +19,7 @@
1819

1920

2021
async def _on_user_disconnected(
21-
user_id: int,
22+
user_id: UserID,
2223
client_session_id: str,
2324
app: web.Application,
2425
product_name: ProductName,
@@ -38,8 +39,12 @@ async def _on_user_disconnected(
3839

3940

4041
async def _on_user_connected(
41-
user_id: int, app: web.Application, product_name: str
42+
user_id: UserID,
43+
app: web.Application,
44+
product_name: ProductName,
45+
client_session_id: str,
4246
) -> None:
47+
assert client_session_id # nosec
4348
# Get all user wallets and subscribe
4449
user_wallet = await wallets_service.list_wallets_for_user(
4550
app, user_id=user_id, product_name=product_name

services/web/server/src/simcore_service_webserver/socketio/_handlers.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545
def auth_user_factory(socket_id: SocketID):
4646
@login_required
47-
async def _handler(request: web.Request) -> tuple[UserID, ProductName]:
47+
async def _handler(request: web.Request) -> tuple[UserID, ProductName, str]:
4848
"""
4949
Raises:
5050
web.HTTPUnauthorized: when the user is not recognized. Keeps the original request
@@ -81,7 +81,7 @@ async def _handler(request: web.Request) -> tuple[UserID, ProductName]:
8181
)
8282
await resource_registry.set_socket_id(socket_id)
8383

84-
return user_id, product.name
84+
return user_id, product.name, client_session_id
8585

8686
return _handler
8787

@@ -126,18 +126,16 @@ async def connect(
126126

127127
try:
128128
auth_user_handler = auth_user_factory(socket_id)
129-
user_id, product_name = await auth_user_handler(environ["aiohttp.request"])
129+
user_id, product_name, client_session_id = await auth_user_handler(
130+
environ["aiohttp.request"]
131+
)
130132

131133
await _set_user_in_group_rooms(app, user_id, socket_id)
132134

133135
_logger.info("Sending set_heartbeat_emit_interval with %s", _EMIT_INTERVAL_S)
134136

135137
await emit(
136-
app,
137-
"SIGNAL_USER_CONNECTED",
138-
user_id,
139-
app,
140-
product_name,
138+
app, "SIGNAL_USER_CONNECTED", user_id, app, product_name, client_session_id
141139
)
142140

143141
await send_message_to_user(

0 commit comments

Comments
 (0)