1515 from python .helpers .websocket_manager import WebSocketManager
1616
1717
18+ ConnectionIdentity = tuple [str , str ] # (namespace, sid)
19+
20+
1821@dataclass
1922class ConnectionProjection :
23+ namespace : str
2024 sid : str
2125 active_context_id : str | None = None
2226 log_from : int = 0
@@ -40,9 +44,9 @@ class StateMonitor:
4044 def __init__ (self , debounce_seconds : float = 0.025 ) -> None :
4145 self .debounce_seconds = float (debounce_seconds )
4246 self ._lock = threading .RLock ()
43- self ._projections : dict [str , ConnectionProjection ] = {}
44- self ._debounce_handles : dict [str , asyncio .TimerHandle ] = {}
45- self ._push_tasks : dict [str , asyncio .Task [None ]] = {}
47+ self ._projections : dict [ConnectionIdentity , ConnectionProjection ] = {}
48+ self ._debounce_handles : dict [ConnectionIdentity , asyncio .TimerHandle ] = {}
49+ self ._push_tasks : dict [ConnectionIdentity , asyncio .Task [None ]] = {}
4650 self ._manager : WebSocketManager | None = None
4751 self ._emit_handler_id : str | None = None
4852 self ._dispatcher_loop : asyncio .AbstractEventLoop | None = None
@@ -61,23 +65,29 @@ def bind_manager(self, manager: "WebSocketManager", *, handler_id: str | None =
6165 f"[StateMonitor] bind_manager handler_id={ handler_id or self ._emit_handler_id } "
6266 )
6367
64- def register_sid (self , sid : str ) -> None :
68+ def register_sid (self , namespace : str , sid : str ) -> None :
69+ identity : ConnectionIdentity = (namespace , sid )
6570 with self ._lock :
66- self ._projections .setdefault (sid , ConnectionProjection (sid = sid ))
71+ self ._projections .setdefault (
72+ identity , ConnectionProjection (namespace = namespace , sid = sid )
73+ )
6774 if runtime .is_development ():
68- PrintStyle .debug (f"[StateMonitor] register_sid sid={ sid } " )
75+ PrintStyle .debug (f"[StateMonitor] register_sid namespace= { namespace } sid={ sid } " )
6976
70- def unregister_sid (self , sid : str ) -> None :
77+ def unregister_sid (self , namespace : str , sid : str ) -> None :
78+ identity : ConnectionIdentity = (namespace , sid )
7179 with self ._lock :
72- handle = self ._debounce_handles .pop (sid , None )
80+ handle = self ._debounce_handles .pop (identity , None )
7381 if handle is not None :
7482 handle .cancel ()
75- task = self ._push_tasks .pop (sid , None )
83+ task = self ._push_tasks .pop (identity , None )
7684 if task is not None :
7785 task .cancel ()
78- self ._projections .pop (sid , None )
86+ self ._projections .pop (identity , None )
7987 if runtime .is_development ():
80- PrintStyle .debug (f"[StateMonitor] unregister_sid sid={ sid } " )
88+ PrintStyle .debug (
89+ f"[StateMonitor] unregister_sid namespace={ namespace } sid={ sid } "
90+ )
8191
8292 def mark_dirty_all (self , * , reason : str | None = None ) -> None :
8393 wave_id = None
@@ -86,9 +96,9 @@ def mark_dirty_all(self, *, reason: str | None = None) -> None:
8696 self ._dirty_wave_seq += 1
8797 wave_id = f"all_{ self ._dirty_wave_seq } "
8898 with self ._lock :
89- sids = list (self ._projections .keys ())
90- for sid in sids :
91- self .mark_dirty (sid , reason = reason , wave_id = wave_id )
99+ identities = list (self ._projections .keys ())
100+ for namespace , sid in identities :
101+ self .mark_dirty (namespace , sid , reason = reason , wave_id = wave_id )
92102
93103 def mark_dirty_for_context (self , context_id : str , * , reason : str | None = None ) -> None :
94104 if not isinstance (context_id , str ) or not context_id .strip ():
@@ -100,16 +110,17 @@ def mark_dirty_for_context(self, context_id: str, *, reason: str | None = None)
100110 self ._dirty_wave_seq += 1
101111 wave_id = f"ctx_{ self ._dirty_wave_seq } "
102112 with self ._lock :
103- sids = [
104- sid
105- for sid , projection in self ._projections .items ()
113+ identities = [
114+ identity
115+ for identity , projection in self ._projections .items ()
106116 if projection .active_context_id == target
107117 ]
108- for sid in sids :
109- self .mark_dirty (sid , reason = reason , wave_id = wave_id )
118+ for namespace , sid in identities :
119+ self .mark_dirty (namespace , sid , reason = reason , wave_id = wave_id )
110120
111121 def update_projection (
112122 self ,
123+ namespace : str ,
113124 sid : str ,
114125 * ,
115126 context : str | None ,
@@ -118,9 +129,10 @@ def update_projection(
118129 timezone : str ,
119130 seq_base : int ,
120131 ) -> None :
132+ identity : ConnectionIdentity = (namespace , sid )
121133 with self ._lock :
122134 projection = self ._projections .setdefault (
123- sid , ConnectionProjection (sid = sid )
135+ identity , ConnectionProjection (namespace = namespace , sid = sid )
124136 )
125137 projection .active_context_id = context
126138 projection .log_from = log_from
@@ -130,18 +142,20 @@ def update_projection(
130142 projection .seq = seq_base
131143 if runtime .is_development ():
132144 PrintStyle .debug (
133- f"[StateMonitor] update_projection sid={ sid } context={ context !r} "
145+ f"[StateMonitor] update_projection namespace= { namespace } sid={ sid } context={ context !r} "
134146 f"log_from={ log_from } notifications_from={ notifications_from } "
135147 f"timezone={ timezone !r} seq_base={ seq_base } "
136148 )
137149
138150 def mark_dirty (
139151 self ,
152+ namespace : str ,
140153 sid : str ,
141154 * ,
142155 reason : str | None = None ,
143156 wave_id : str | None = None ,
144157 ) -> None :
158+ identity : ConnectionIdentity = (namespace , sid )
145159 loop = self ._dispatcher_loop
146160 if loop is None or loop .is_closed ():
147161 try :
@@ -155,19 +169,19 @@ def mark_dirty(
155169 running_loop = None
156170
157171 if running_loop is loop :
158- self ._mark_dirty_on_loop (sid , reason = reason , wave_id = wave_id )
172+ self ._mark_dirty_on_loop (identity , reason = reason , wave_id = wave_id )
159173 return
160174
161- loop .call_soon_threadsafe (self ._mark_dirty_on_loop , sid , reason , wave_id )
175+ loop .call_soon_threadsafe (self ._mark_dirty_on_loop , identity , reason , wave_id )
162176
163177 def _mark_dirty_on_loop (
164178 self ,
165- sid : str ,
179+ identity : ConnectionIdentity ,
166180 reason : str | None = None ,
167181 wave_id : str | None = None ,
168182 ) -> None :
169183 with self ._lock :
170- projection = self ._projections .get (sid )
184+ projection = self ._projections .get (identity )
171185 if projection is None :
172186 return
173187 projection .dirty_version += 1
@@ -178,12 +192,12 @@ def _mark_dirty_on_loop(
178192 else "unknown"
179193 )
180194 projection .dirty_wave_id = wave_id
181- self ._schedule_debounce_on_loop (sid )
195+ self ._schedule_debounce_on_loop (identity )
182196
183- def _schedule_debounce_on_loop (self , sid : str ) -> None :
197+ def _schedule_debounce_on_loop (self , identity : ConnectionIdentity ) -> None :
184198 loop = asyncio .get_running_loop ()
185199 with self ._lock :
186- projection = self ._projections .get (sid )
200+ projection = self ._projections .get (identity )
187201 if projection is None :
188202 return
189203 # INVARIANT.STATE.GATING: do not schedule pushes until a successful state_request
@@ -194,40 +208,44 @@ def _schedule_debounce_on_loop(self, sid: str) -> None:
194208 # Throttled coalescing: schedule at most one push per debounce window.
195209 # Do not postpone the scheduled push on subsequent dirties; this keeps
196210 # streaming updates smooth while still capping to <= 1 push / 100ms / sid.
197- existing = self ._debounce_handles .get (sid )
211+ existing = self ._debounce_handles .get (identity )
198212 if existing is not None and not existing .cancelled ():
199213 return
200214
201- running = self ._push_tasks .get (sid )
215+ running = self ._push_tasks .get (identity )
202216 if running is not None and not running .done ():
203217 return
204218
205- handle = loop .call_later (self .debounce_seconds , self ._on_debounce_fire , sid )
206- self ._debounce_handles [sid ] = handle
219+ handle = loop .call_later (
220+ self .debounce_seconds , self ._on_debounce_fire , identity
221+ )
222+ self ._debounce_handles [identity ] = handle
207223 if runtime .is_development ():
208224 PrintStyle .debug (
209- f"[StateMonitor] schedule_push sid={ sid } delay_s={ self .debounce_seconds } "
225+ f"[StateMonitor] schedule_push namespace={ projection .namespace } sid={ projection .sid } "
226+ f"delay_s={ self .debounce_seconds } "
210227 f"dirty={ projection .dirty_version } pushed={ projection .pushed_version } "
211228 f"reason={ projection .dirty_reason !r} wave={ projection .dirty_wave_id !r} "
212229 )
213230
214- def _on_debounce_fire (self , sid : str ) -> None :
231+ def _on_debounce_fire (self , identity : ConnectionIdentity ) -> None :
215232 with self ._lock :
216- self ._debounce_handles .pop (sid , None )
217- existing = self ._push_tasks .get (sid )
233+ self ._debounce_handles .pop (identity , None )
234+ existing = self ._push_tasks .get (identity )
218235 if existing is not None and not existing .done ():
219236 return
220- task = asyncio .create_task (self ._flush_push (sid ))
221- self ._push_tasks [sid ] = task
237+ task = asyncio .create_task (self ._flush_push (identity ))
238+ self ._push_tasks [identity ] = task
222239
223- async def _flush_push (self , sid : str ) -> None :
240+ async def _flush_push (self , identity : ConnectionIdentity ) -> None :
241+ namespace , sid = identity
224242 task = asyncio .current_task ()
225243 base_version = 0
226244 dirty_reason : str | None = None
227245 dirty_wave_id : str | None = None
228246 try :
229247 with self ._lock :
230- projection = self ._projections .get (sid )
248+ projection = self ._projections .get (identity )
231249 manager = self ._manager
232250 handler_id = self ._emit_handler_id
233251
@@ -257,7 +275,7 @@ async def _flush_push(self, sid: str) -> None:
257275 )
258276
259277 with self ._lock :
260- projection = self ._projections .get (sid )
278+ projection = self ._projections .get (identity )
261279 if projection is None :
262280 return
263281
@@ -297,11 +315,12 @@ async def _flush_push(self, sid: str) -> None:
297315 else None
298316 )
299317 PrintStyle .debug (
300- f"[StateMonitor] emit state_push sid={ sid } seq={ seq } "
318+ f"[StateMonitor] emit state_push namespace= { namespace } sid={ sid } seq={ seq } "
301319 f"context={ active_context_id !r} logs_len={ logs_len } "
302320 f"reason={ dirty_reason !r} wave={ dirty_wave_id !r} "
303321 )
304322 await manager .emit_to (
323+ namespace ,
305324 sid ,
306325 "state_push" ,
307326 payload ,
@@ -310,21 +329,25 @@ async def _flush_push(self, sid: str) -> None:
310329 except ConnectionNotFoundError :
311330 # Sid was removed before the emit; treat as benign.
312331 if runtime .is_development ():
313- PrintStyle .debug (f"[StateMonitor] emit skipped: sid not found sid={ sid } " )
332+ PrintStyle .debug (
333+ f"[StateMonitor] emit skipped: sid not found namespace={ namespace } sid={ sid } "
334+ )
314335 return
315336 except RuntimeError :
316337 # Dispatcher loop may be closing (e.g., during shutdown or test teardown).
317338 if runtime .is_development ():
318- PrintStyle .debug (f"[StateMonitor] emit skipped: dispatcher closing sid={ sid } " )
339+ PrintStyle .debug (
340+ f"[StateMonitor] emit skipped: dispatcher closing namespace={ namespace } sid={ sid } "
341+ )
319342 return
320343 finally :
321344 follow_up = False
322345 dirty_version = 0
323346 pushed_version = 0
324347 with self ._lock :
325- if task is not None and self ._push_tasks .get (sid ) is task :
326- self ._push_tasks .pop (sid , None )
327- projection = self ._projections .get (sid )
348+ if task is not None and self ._push_tasks .get (identity ) is task :
349+ self ._push_tasks .pop (identity , None )
350+ projection = self ._projections .get (identity )
328351 if projection is not None :
329352 dirty_version = projection .dirty_version
330353 pushed_version = projection .pushed_version
@@ -338,21 +361,21 @@ async def _flush_push(self, sid: str) -> None:
338361
339362 if runtime .is_development ():
340363 PrintStyle .debug (
341- f"[StateMonitor] follow_up_push sid={ sid } dirty={ dirty_version } pushed={ pushed_version } "
364+ f"[StateMonitor] follow_up_push namespace= { namespace } sid={ sid } dirty={ dirty_version } pushed={ pushed_version } "
342365 )
343366 try :
344367 loop = self ._dispatcher_loop or asyncio .get_running_loop ()
345368 except RuntimeError :
346369 return
347370 if loop .is_closed ():
348371 return
349- loop .call_soon_threadsafe (self ._schedule_debounce_on_loop , sid )
372+ loop .call_soon_threadsafe (self ._schedule_debounce_on_loop , identity )
350373
351374 # Testing hook: keep argument surface stable for future extensions
352375 def _debug_state (self ) -> dict [str , Any ]: # pragma: no cover - helper
353376 with self ._lock :
354377 return {
355- "sids " : list (self ._projections .keys ()),
378+ "identities " : list (self ._projections .keys ()),
356379 "handles" : list (self ._debounce_handles .keys ()),
357380 }
358381
0 commit comments