diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py index 8537fbc66168..55b08c396925 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/projects_slot.py @@ -5,6 +5,7 @@ from aiohttp import web from models_library.products import ProductName from models_library.projects import ProjectID +from models_library.users import UserID from servicelib.aiohttp.observer import ( registed_observers_report, register_observer, @@ -20,8 +21,40 @@ _logger = logging.getLogger(__name__) +async def _on_user_connected( + user_id: UserID, + app: web.Application, + product_name: ProductName, + client_session_id: str, +) -> None: + assert product_name # nosec + # check if there is a project resource + with managed_resource(user_id, client_session_id, app) as user_session: + projects: list[str] = await user_session.find(PROJECT_ID_KEY) + assert len(projects) <= 1, "At the moment, at most one project per session" # nosec + + if projects: + with log_context( + _logger, + logging.DEBUG, + msg=f"user connects and subscribes to following {projects=}", + ): + await logged_gather( + *[project_logs.subscribe(app, ProjectID(prj)) for prj in projects] + ) + + await logged_gather( + *[ + retrieve_and_notify_project_locked_state( + user_id, prj, app, notify_only_prj_user=True + ) + for prj in projects + ] + ) + + async def _on_user_disconnected( - user_id: int, + user_id: UserID, client_session_id: str, app: web.Application, product_name: ProductName, @@ -56,6 +89,7 @@ async def _on_user_disconnected( def setup_project_observer_events(app: web.Application) -> None: setup_observer_registry(app) + register_observer(app, _on_user_connected, event="SIGNAL_USER_CONNECTED") register_observer(app, _on_user_disconnected, event="SIGNAL_USER_DISCONNECTED") _logger.info( diff --git a/services/web/server/src/simcore_service_webserver/resource_usage/_observer.py b/services/web/server/src/simcore_service_webserver/resource_usage/_observer.py index 114bc1df2982..d205fc43f668 100644 --- a/services/web/server/src/simcore_service_webserver/resource_usage/_observer.py +++ b/services/web/server/src/simcore_service_webserver/resource_usage/_observer.py @@ -4,6 +4,7 @@ from aiohttp import web from models_library.products import ProductName +from models_library.users import UserID from servicelib.aiohttp.observer import ( registed_observers_report, register_observer, @@ -18,7 +19,7 @@ async def _on_user_disconnected( - user_id: int, + user_id: UserID, client_session_id: str, app: web.Application, product_name: ProductName, @@ -38,8 +39,12 @@ async def _on_user_disconnected( async def _on_user_connected( - user_id: int, app: web.Application, product_name: str + user_id: UserID, + app: web.Application, + product_name: ProductName, + client_session_id: str, ) -> None: + assert client_session_id # nosec # Get all user wallets and subscribe user_wallet = await wallets_service.list_wallets_for_user( app, user_id=user_id, product_name=product_name diff --git a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py index c864a735d52d..181a24199c16 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py +++ b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py @@ -44,7 +44,7 @@ def auth_user_factory(socket_id: SocketID): @login_required - async def _handler(request: web.Request) -> tuple[UserID, ProductName]: + async def _handler(request: web.Request) -> tuple[UserID, ProductName, str]: """ Raises: web.HTTPUnauthorized: when the user is not recognized. Keeps the original request @@ -81,7 +81,7 @@ async def _handler(request: web.Request) -> tuple[UserID, ProductName]: ) await resource_registry.set_socket_id(socket_id) - return user_id, product.name + return user_id, product.name, client_session_id return _handler @@ -126,18 +126,16 @@ async def connect( try: auth_user_handler = auth_user_factory(socket_id) - user_id, product_name = await auth_user_handler(environ["aiohttp.request"]) + user_id, product_name, client_session_id = await auth_user_handler( + environ["aiohttp.request"] + ) await _set_user_in_group_rooms(app, user_id, socket_id) _logger.info("Sending set_heartbeat_emit_interval with %s", _EMIT_INTERVAL_S) await emit( - app, - "SIGNAL_USER_CONNECTED", - user_id, - app, - product_name, + app, "SIGNAL_USER_CONNECTED", user_id, app, product_name, client_session_id ) await send_message_to_user(