11import asyncio
2+ import logging
23
34from fastapi import WebSocket
45
56from src .config import settings
67
8+ logger = logging .getLogger (__name__ )
9+
710
811class ConnectionManager :
912 def __init__ (self ):
@@ -22,6 +25,7 @@ def disconnect(self, ws: WebSocket):
2225 self .tag_to_ws .get (tag_id , set ()).discard (ws )
2326 if tag_id in self .tag_to_ws and not self .tag_to_ws [tag_id ]:
2427 del self .tag_to_ws [tag_id ]
28+
2529 self .ws_to_tags .pop (ws , None )
2630 self .ws_to_queue .pop (ws , None )
2731 task = self .ws_to_flush_task .pop (ws , None )
@@ -34,14 +38,17 @@ def subscribe(self, ws: WebSocket, tag_ids: list[int]):
3438 self .ws_to_tags [ws ].add (tag_id )
3539 if tag_id not in self .tag_to_ws :
3640 self .tag_to_ws [tag_id ] = set ()
41+
3742 self .tag_to_ws [tag_id ].add (ws )
3843
3944 async def broadcast_to_tag (self , tag_id : int , message : dict ):
4045 if tag_id not in self .tag_to_ws :
4146 return
47+
4248 for ws in list (self .tag_to_ws [tag_id ]):
4349 if ws not in self .ws_to_queue :
4450 self .ws_to_queue [ws ] = []
51+
4552 self .ws_to_queue [ws ].append (message )
4653
4754 if ws not in self .ws_to_flush_task or self .ws_to_flush_task [ws ].done ():
@@ -51,17 +58,18 @@ async def _flush(self, ws: WebSocket):
5158 await asyncio .sleep (settings .BATCH_INTERVAL_MS / 1000.0 )
5259 if ws in self .ws_to_queue and self .ws_to_queue [ws ]:
5360 batch = self .ws_to_queue [ws ]
54- del self .ws_to_queue [ws ]
5561 try :
5662 await ws .send_json (batch )
57- except Exception :
58- pass
63+ except Exception as e :
64+ logger .exception (e )
65+ else :
66+ del self .ws_to_queue [ws ]
5967
6068 async def send_immediate (self , ws : WebSocket , message : dict ):
6169 try :
6270 await ws .send_json ([message ])
63- except Exception :
64- pass
71+ except Exception as e :
72+ logger . exception ( e )
6573
6674
6775manager = ConnectionManager ()
0 commit comments