Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aiohttp import web
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.users import UserID
from servicelib.aiohttp.observer import (
registed_observers_report,
register_observer,
Expand All @@ -20,8 +21,40 @@
_logger = logging.getLogger(__name__)


async def _on_user_connected(
user_id: UserID,
app: web.Application,
product_name: ProductName,
client_session_id: str,
) -> None:
assert product_name # nosec
# check if there is a project resource
with managed_resource(user_id, client_session_id, app) as user_session:
projects: list[str] = await user_session.find(PROJECT_ID_KEY)
assert len(projects) <= 1, "At the moment, at most one project per session" # nosec

if projects:
with log_context(
_logger,
logging.DEBUG,
msg=f"user connects and subscribes to following {projects=}",
):
await logged_gather(
*[project_logs.subscribe(app, ProjectID(prj)) for prj in projects]
)

await logged_gather(
*[
retrieve_and_notify_project_locked_state(
user_id, prj, app, notify_only_prj_user=True
)
for prj in projects
]
)


async def _on_user_disconnected(
user_id: int,
user_id: UserID,
client_session_id: str,
app: web.Application,
product_name: ProductName,
Expand Down Expand Up @@ -56,6 +89,7 @@ async def _on_user_disconnected(
def setup_project_observer_events(app: web.Application) -> None:
setup_observer_registry(app)

register_observer(app, _on_user_connected, event="SIGNAL_USER_CONNECTED")
register_observer(app, _on_user_disconnected, event="SIGNAL_USER_DISCONNECTED")

_logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from aiohttp import web
from models_library.products import ProductName
from models_library.users import UserID
from servicelib.aiohttp.observer import (
registed_observers_report,
register_observer,
Expand All @@ -18,7 +19,7 @@


async def _on_user_disconnected(
user_id: int,
user_id: UserID,
client_session_id: str,
app: web.Application,
product_name: ProductName,
Expand All @@ -38,8 +39,12 @@ async def _on_user_disconnected(


async def _on_user_connected(
user_id: int, app: web.Application, product_name: str
user_id: UserID,
app: web.Application,
product_name: ProductName,
client_session_id: str,
) -> None:
assert client_session_id # nosec
# Get all user wallets and subscribe
user_wallet = await wallets_service.list_wallets_for_user(
app, user_id=user_id, product_name=product_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

def auth_user_factory(socket_id: SocketID):
@login_required
async def _handler(request: web.Request) -> tuple[UserID, ProductName]:
async def _handler(request: web.Request) -> tuple[UserID, ProductName, str]:
"""
Raises:
web.HTTPUnauthorized: when the user is not recognized. Keeps the original request
Expand Down Expand Up @@ -81,7 +81,7 @@ async def _handler(request: web.Request) -> tuple[UserID, ProductName]:
)
await resource_registry.set_socket_id(socket_id)

return user_id, product.name
return user_id, product.name, client_session_id

return _handler

Expand Down Expand Up @@ -126,18 +126,16 @@ async def connect(

try:
auth_user_handler = auth_user_factory(socket_id)
user_id, product_name = await auth_user_handler(environ["aiohttp.request"])
user_id, product_name, client_session_id = await auth_user_handler(
environ["aiohttp.request"]
)

await _set_user_in_group_rooms(app, user_id, socket_id)

_logger.info("Sending set_heartbeat_emit_interval with %s", _EMIT_INTERVAL_S)

await emit(
app,
"SIGNAL_USER_CONNECTED",
user_id,
app,
product_name,
app, "SIGNAL_USER_CONNECTED", user_id, app, product_name, client_session_id
)

await send_message_to_user(
Expand Down
Loading