Skip to content

Commit 924b843

Browse files
committed
emit_update: take token instead of sid
This allows the app to be more resilient in the face of websocket reconnects. The event is processed against a token, so there's no reason to maintain websocket affinity for event processing. Whenever the update is ready to send, it will be sent to the current websocket/sid associated.
1 parent 4468e14 commit 924b843

File tree

2 files changed

+17
-19
lines changed

2 files changed

+17
-19
lines changed

reflex/app.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
State,
9898
StateManager,
9999
StateUpdate,
100+
_split_substate_key,
100101
_substate_key,
101102
all_base_state_classes,
102103
code_uses_state_contexts,
@@ -1559,7 +1560,7 @@ async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
15591560
state._clean()
15601561
await self.event_namespace.emit_update(
15611562
update=StateUpdate(delta=delta),
1562-
sid=state.router.session.session_id,
1563+
token=token,
15631564
)
15641565

15651566
def _process_background(
@@ -1599,7 +1600,7 @@ async def _coro():
15991600
# Send the update to the client.
16001601
await self.event_namespace.emit_update(
16011602
update=update,
1602-
sid=state.router.session.session_id,
1603+
token=event.token,
16031604
)
16041605

16051606
task = asyncio.create_task(
@@ -2061,20 +2062,19 @@ def on_disconnect(self, sid: str):
20612062
and console.error(f"Token cleanup error: {t.exception()}")
20622063
)
20632064

2064-
async def emit_update(self, update: StateUpdate, sid: str) -> None:
2065+
async def emit_update(self, update: StateUpdate, token: str) -> None:
20652066
"""Emit an update to the client.
20662067
20672068
Args:
20682069
update: The state update to send.
2069-
sid: The Socket.IO session id.
2070+
token: The client token (tab) associated with the event.
20702071
"""
2071-
if not sid:
2072+
client_token, _ = _split_substate_key(token)
2073+
sid = self.token_to_sid.get(client_token)
2074+
if sid is None:
20722075
# If the sid is None, we are not connected to a client. Prevent sending
20732076
# updates to all clients.
2074-
return
2075-
token = self.sid_to_token.get(sid)
2076-
if token is None:
2077-
console.warn(f"Attempting to send delta to disconnected websocket {sid}")
2077+
console.warn(f"Attempting to send delta to disconnected client {token!r}")
20782078
return
20792079
# Creating a task prevents the update from being blocked behind other coroutines.
20802080
await asyncio.create_task(
@@ -2165,7 +2165,7 @@ async def on_event(self, sid: str, data: Any):
21652165
# Process the events.
21662166
async for update in updates_gen:
21672167
# Emit the update from processing the event.
2168-
await self.emit_update(update=update, sid=sid)
2168+
await self.emit_update(update=update, token=event.token)
21692169

21702170
async def on_ping(self, sid: str):
21712171
"""Event for testing the API endpoint.

reflex/istate/proxy.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ def __init__(
7171
state_instance: The state instance to proxy.
7272
parent_state_proxy: The parent state proxy, for linked mutability and context tracking.
7373
"""
74+
from reflex.state import _substate_key
75+
7476
super().__init__(state_instance)
75-
# compile is not relevant to backend logic
7677
self._self_app = prerequisites.get_and_validate_app().app
7778
self._self_substate_path = tuple(state_instance.get_full_name().split("."))
79+
self._self_substate_token = _substate_key(
80+
state_instance.router.session.client_token,
81+
self._self_substate_path,
82+
)
7883
self._self_actx = None
7984
self._self_mutable = False
8085
self._self_actx_lock = asyncio.Lock()
@@ -127,16 +132,9 @@ async def __aenter__(self) -> StateProxy:
127132
msg = "The state is already mutable. Do not nest `async with self` blocks."
128133
raise ImmutableStateError(msg)
129134

130-
from reflex.state import _substate_key
131-
132135
await self._self_actx_lock.acquire()
133136
self._self_actx_lock_holder = current_task
134-
self._self_actx = self._self_app.modify_state(
135-
token=_substate_key(
136-
self.__wrapped__.router.session.client_token,
137-
self._self_substate_path,
138-
)
139-
)
137+
self._self_actx = self._self_app.modify_state(token=self._self_substate_token)
140138
mutable_state = await self._self_actx.__aenter__()
141139
super().__setattr__(
142140
"__wrapped__", mutable_state.get_substate(self._self_substate_path)

0 commit comments

Comments
 (0)