44import json
55import logging
66from datetime import datetime
7- from typing import Any , Dict , Generic , TypeVar , Union
7+ from typing import Any , Dict , TypeVar , Union
88
99from fastapi import APIRouter , WebSocket , WebSocketDisconnect
1010from sqlmodel import select
1313from murfey .server .murfey_db import get_murfey_db_session
1414from murfey .util import sanitise
1515from murfey .util .db import ClientEnvironment
16- from murfey .util .state import State , global_state
1716
1817T = TypeVar ("T" )
1918
2019ws = APIRouter (prefix = "/ws" , tags = ["websocket" ])
2120log = logging .getLogger ("murfey.server.websocket" )
2221
2322
24- class ConnectionManager ( Generic [ T ]) :
25- def __init__ (self , state : State [ T ] ):
23+ class ConnectionManager :
24+ def __init__ (self ):
2625 self .active_connections : Dict [int | str , WebSocket ] = {}
27- self ._state = state
28- self ._state .subscribe (self ._broadcast_state_update )
2926
3027 async def connect (
3128 self , websocket : WebSocket , client_id : int | str , register_client : bool = True
@@ -38,7 +35,6 @@ async def connect(
3835 "To register a client the client ID must be an integer"
3936 )
4037 self ._register_new_client (client_id )
41- await websocket .send_json ({"message" : "state-full" , "state" : self ._state .data })
4238
4339 @staticmethod
4440 def _register_new_client (client_id : int ):
@@ -48,9 +44,7 @@ def _register_new_client(client_id: int):
4844 murfey_db .commit ()
4945 murfey_db .close ()
5046
51- def disconnect (
52- self , websocket : WebSocket , client_id : int | str , unregister_client : bool = True
53- ):
47+ def disconnect (self , client_id : int | str , unregister_client : bool = True ):
5448 self .active_connections .pop (client_id )
5549 if unregister_client :
5650 murfey_db = next (get_murfey_db_session ())
@@ -67,33 +61,14 @@ async def broadcast(self, message: str):
6761 for connection in self .active_connections :
6862 await self .active_connections [connection ].send_text (message )
6963
70- async def _broadcast_state_update (
71- self , attribute : str , value : T | None , message : str = "state-update"
72- ):
73- for connection in self .active_connections :
74- await self .active_connections [connection ].send_json (
75- {"message" : message , "attribute" : attribute , "value" : value }
76- )
77-
78- async def set_state (self , attribute : str , value : T ):
79- log .info (
80- f"State attribute { sanitise (attribute )!r} set to { sanitise (str (value ))!r} "
81- )
82- await self ._state .set (attribute , value )
83-
84- async def delete_state (self , attribute : str ):
85- log .info (f"State attribute { sanitise (attribute )!r} removed" )
86- await self ._state .delete (attribute )
87-
8864
89- manager = ConnectionManager (global_state )
65+ manager = ConnectionManager ()
9066
9167
9268@ws .websocket ("/test/{client_id}" )
9369async def websocket_endpoint (websocket : WebSocket , client_id : int ):
9470 await manager .connect (websocket , client_id )
9571 await manager .broadcast (f"Client { client_id } joined" )
96- await manager .set_state (f"Client { client_id } " , "joined" )
9772 try :
9873 while True :
9974 data = await websocket .receive_text ()
@@ -111,9 +86,8 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
11186 select (ClientEnvironment ).where (ClientEnvironment .client_id == client_id )
11287 ).one ()
11388 prom .monitoring_switch .labels (visit = client_env .visit ).set (0 )
114- manager .disconnect (websocket , client_id )
89+ manager .disconnect (client_id )
11590 await manager .broadcast (f"Client #{ client_id } disconnected" )
116- await manager .delete_state (f"Client { client_id } " )
11791
11892
11993@ws .websocket ("/connect/{client_id}" )
@@ -122,7 +96,6 @@ async def websocket_connection_endpoint(
12296):
12397 await manager .connect (websocket , client_id , register_client = False )
12498 await manager .broadcast (f"Client { client_id } joined" )
125- await manager .set_state (f"Client { client_id } " , "joined" )
12699 try :
127100 while True :
128101 data = await websocket .receive_text ()
@@ -138,9 +111,8 @@ async def websocket_connection_endpoint(
138111 await manager .broadcast (f"Client #{ client_id } sent message { data } " )
139112 except WebSocketDisconnect :
140113 log .info (f"Disconnecting Client { sanitise (str (client_id ))} " )
141- manager .disconnect (websocket , client_id , unregister_client = False )
114+ manager .disconnect (client_id , unregister_client = False )
142115 await manager .broadcast (f"Client #{ client_id } disconnected" )
143- await manager .delete_state (f"Client { client_id } " )
144116
145117
146118async def check_connections (active_connections ):
@@ -178,7 +150,7 @@ async def close_ws_connection(client_id: int):
178150 murfey_db .close ()
179151 client_id_str = str (client_id ).replace ("\r \n " , "" ).replace ("\n " , "" )
180152 log .info (f"Disconnecting { client_id_str } " )
181- manager .disconnect (manager . active_connections [ client_id ], client_id )
153+ manager .disconnect (client_id )
182154 prom .monitoring_switch .labels (visit = visit_name ).set (0 )
183155 await manager .broadcast (f"Client #{ client_id } disconnected" )
184156
@@ -187,5 +159,5 @@ async def close_ws_connection(client_id: int):
187159async def close_unrecorded_ws_connection (client_id : Union [int , str ]):
188160 client_id_str = str (client_id ).replace ("\r \n " , "" ).replace ("\n " , "" )
189161 log .info (f"Disconnecting { client_id_str } " )
190- manager .disconnect (manager . active_connections [ client_id ], client_id )
162+ manager .disconnect (client_id )
191163 await manager .broadcast (f"Client #{ client_id } disconnected" )
0 commit comments