Skip to content

Commit 673f3c7

Browse files
committed
Merge branch 'masenf/emit-update-by-token' into masenf/reconnect-dropped-websocket
2 parents 5ad988e + c09db7f commit 673f3c7

File tree

3 files changed

+19
-19
lines changed

3 files changed

+19
-19
lines changed

reflex/app.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
State,
9898
StateManager,
9999
StateUpdate,
100+
_split_substate_key,
100101
_substate_key,
101102
all_base_state_classes,
102103
code_uses_state_contexts,
@@ -1559,7 +1560,7 @@ async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
15591560
state._clean()
15601561
await self.event_namespace.emit_update(
15611562
update=StateUpdate(delta=delta),
1562-
sid=state.router.session.session_id,
1563+
token=token,
15631564
)
15641565

15651566
def _process_background(
@@ -1599,7 +1600,7 @@ async def _coro():
15991600
# Send the update to the client.
16001601
await self.event_namespace.emit_update(
16011602
update=update,
1602-
sid=state.router.session.session_id,
1603+
token=event.token,
16031604
)
16041605

16051606
task = asyncio.create_task(
@@ -2061,20 +2062,19 @@ def on_disconnect(self, sid: str):
20612062
and console.error(f"Token cleanup error: {t.exception()}")
20622063
)
20632064

2064-
async def emit_update(self, update: StateUpdate, sid: str) -> None:
2065+
async def emit_update(self, update: StateUpdate, token: str) -> None:
20652066
"""Emit an update to the client.
20662067
20672068
Args:
20682069
update: The state update to send.
2069-
sid: The Socket.IO session id.
2070+
token: The client token (tab) associated with the event.
20702071
"""
2071-
if not sid:
2072+
client_token, _ = _split_substate_key(token)
2073+
sid = self.token_to_sid.get(client_token)
2074+
if sid is None:
20722075
# If the sid is None, we are not connected to a client. Prevent sending
20732076
# updates to all clients.
2074-
return
2075-
token = self.sid_to_token.get(sid)
2076-
if token is None:
2077-
console.warn(f"Attempting to send delta to disconnected websocket {sid}")
2077+
console.warn(f"Attempting to send delta to disconnected client {token!r}")
20782078
return
20792079
# Creating a task prevents the update from being blocked behind other coroutines.
20802080
await asyncio.create_task(
@@ -2165,7 +2165,7 @@ async def on_event(self, sid: str, data: Any):
21652165
# Process the events.
21662166
async for update in updates_gen:
21672167
# Emit the update from processing the event.
2168-
await self.emit_update(update=update, sid=sid)
2168+
await self.emit_update(update=update, token=event.token)
21692169

21702170
async def on_ping(self, sid: str):
21712171
"""Event for testing the API endpoint.

reflex/istate/proxy.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ def __init__(
7171
state_instance: The state instance to proxy.
7272
parent_state_proxy: The parent state proxy, for linked mutability and context tracking.
7373
"""
74+
from reflex.state import _substate_key
75+
7476
super().__init__(state_instance)
75-
# compile is not relevant to backend logic
7677
self._self_app = prerequisites.get_and_validate_app().app
7778
self._self_substate_path = tuple(state_instance.get_full_name().split("."))
79+
self._self_substate_token = _substate_key(
80+
state_instance.router.session.client_token,
81+
self._self_substate_path,
82+
)
7883
self._self_actx = None
7984
self._self_mutable = False
8085
self._self_actx_lock = asyncio.Lock()
@@ -127,16 +132,9 @@ async def __aenter__(self) -> StateProxy:
127132
msg = "The state is already mutable. Do not nest `async with self` blocks."
128133
raise ImmutableStateError(msg)
129134

130-
from reflex.state import _substate_key
131-
132135
await self._self_actx_lock.acquire()
133136
self._self_actx_lock_holder = current_task
134-
self._self_actx = self._self_app.modify_state(
135-
token=_substate_key(
136-
self.__wrapped__.router.session.client_token,
137-
self._self_substate_path,
138-
)
139-
)
137+
self._self_actx = self._self_app.modify_state(token=self._self_substate_token)
140138
mutable_state = await self._self_actx.__aenter__()
141139
super().__setattr__(
142140
"__wrapped__", mutable_state.get_substate(self._self_substate_path)

tests/units/test_state.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2005,6 +2005,7 @@ async def test_state_proxy(
20052005
namespace = mock_app.event_namespace
20062006
assert namespace is not None
20072007
namespace.sid_to_token[router_data.session.session_id] = token
2008+
namespace.token_to_sid[token] = router_data.session.session_id
20082009
if isinstance(mock_app.state_manager, (StateManagerMemory, StateManagerDisk)):
20092010
mock_app.state_manager.states[parent_state.router.session.client_token] = (
20102011
parent_state
@@ -2214,6 +2215,7 @@ async def test_background_task_no_block(mock_app: rx.App, token: str):
22142215
namespace = mock_app.event_namespace
22152216
assert namespace is not None
22162217
namespace.sid_to_token[sid] = token
2218+
namespace.token_to_sid[token] = sid
22172219
mock_app.state_manager.state = mock_app._state = BackgroundTaskState
22182220
async for update in rx.app.process(
22192221
mock_app,

0 commit comments

Comments
 (0)