Skip to content

Commit 2907722

Browse files
authored
ENG-8258: API for linking and sharing states (#6024)
* ENG-8258: API for linking and sharing states It works by defining a substate of SharedState and then calling self._link_to(target_token) from some event handler. from that point on, whenever that user's state is loaded, the StateManager will patch in the linked shared states. whenever a linked state is modified, we explicitly load all of the other linked tokens, patch in the modified states, and send a delta to those clients You can call ._unlink to remove the link association, which causes the substate to be subsequently loaded from the client_token's tree as a private state It is intended to work transparently with computed vars, background events, and frontend rendering. * WiP: move _links to root state to avoid extra redis load * Fix test_mixin_state * only import ShareStateBaseInternal if there are links present * Fix delta calculation in certain scenarios * Add integration test for linked states * fix test_computed_var assertion * remove background task deadlock updating linked state perform the subsequent updates in an asyncio.Task to allow the original caller to drop the lock for the other shared states. * do not reuse _modify_linked_states in `_link_to` the state might have multiple links, and we may have already entered the context and are holding the lock already, so we don't want to take the lock again which will hang. instead, check if the state is already linked to the target token and avoid doing extra work. * abstract out _patch_state and use it when linking or unlinking * _link_to returns the newly linked state _modify_linked_states context can now release the locks of newly linked states and send updates for changes in newly linked states. rehydrating after linking is no longer necessary. * fix test cases * mark router and router_data as dirty when patching state definitely the token will be different. although private-dependent data should use private states, it's common for llm generated code to define router_data dependent vars in the linked state itself, so we make that special case work * AI CR feedback
1 parent 224f920 commit 2907722

File tree

9 files changed

+869
-8
lines changed

9 files changed

+869
-8
lines changed

pyi_hashes.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"reflex/__init__.pyi": "b304ed6f7a2fa028a194cad81bd83112",
2+
"reflex/__init__.pyi": "0a3ae880e256b9fd3b960e12a2cb51a7",
33
"reflex/components/__init__.pyi": "ac05995852baa81062ba3d18fbc489fb",
44
"reflex/components/base/__init__.pyi": "16e47bf19e0d62835a605baa3d039c5a",
55
"reflex/components/base/app_wrap.pyi": "22e94feaa9fe675bcae51c412f5b67f1",

reflex/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@
336336
"State",
337337
"dynamic",
338338
],
339+
"istate.shared": ["SharedState"],
339340
"istate.wrappers": ["get_state"],
340341
"style": ["Style", "toggle_color_mode"],
341342
"utils.imports": ["ImportDict", "ImportVar"],

reflex/app.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,13 +1562,17 @@ def all_routes(_request: Request) -> Response:
15621562

15631563
@contextlib.asynccontextmanager
15641564
async def modify_state(
1565-
self, token: str, background: bool = False
1565+
self,
1566+
token: str,
1567+
background: bool = False,
1568+
previous_dirty_vars: dict[str, set[str]] | None = None,
15661569
) -> AsyncIterator[BaseState]:
15671570
"""Modify the state out of band.
15681571
15691572
Args:
15701573
token: The token to modify the state for.
15711574
background: Whether the modification is happening in a background task.
1575+
previous_dirty_vars: Vars that are considered dirty from a previous operation.
15721576
15731577
Yields:
15741578
The state to modify.
@@ -1581,7 +1585,9 @@ async def modify_state(
15811585
raise RuntimeError(msg)
15821586

15831587
# Get exclusive access to the state.
1584-
async with self.state_manager.modify_state(token) as state:
1588+
async with self.state_manager.modify_state_with_links(
1589+
token, previous_dirty_vars=previous_dirty_vars
1590+
) as state:
15851591
# No other event handler can modify the state while in this context.
15861592
yield state
15871593
delta = await state._get_resolved_delta()
@@ -1769,7 +1775,7 @@ async def process(
17691775
constants.RouteVar.CLIENT_IP: client_ip,
17701776
})
17711777
# Get the state for the session exclusively.
1772-
async with app.state_manager.modify_state(
1778+
async with app.state_manager.modify_state_with_links(
17731779
event.substate_token, event=event
17741780
) as state:
17751781
# When this is a brand new instance of the state, signal the
@@ -2003,7 +2009,9 @@ async def _ndjson_updates():
20032009
Each state update as JSON followed by a new line.
20042010
"""
20052011
# Process the event.
2006-
async with app.state_manager.modify_state(event.substate_token) as state:
2012+
async with app.state_manager.modify_state_with_links(
2013+
event.substate_token
2014+
) as state:
20072015
async for update in state._process(event):
20082016
# Postprocess the event.
20092017
update = await app._postprocess(state, event, update)

reflex/istate/manager/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,35 @@ async def modify_state(
114114
"""
115115
yield self.state()
116116

117+
@contextlib.asynccontextmanager
118+
async def modify_state_with_links(
119+
self,
120+
token: str,
121+
previous_dirty_vars: dict[str, set[str]] | None = None,
122+
**context: Unpack[StateModificationContext],
123+
) -> AsyncIterator[BaseState]:
124+
"""Modify the state for a token, including linked substates, while holding exclusive lock.
125+
126+
Args:
127+
token: The token to modify the state for.
128+
previous_dirty_vars: The previously dirty vars for linked states.
129+
context: The state modification context.
130+
131+
Yields:
132+
The state for the token with linked states patched in.
133+
"""
134+
async with self.modify_state(token, **context) as root_state:
135+
if getattr(root_state, "_reflex_internal_links", None) is not None:
136+
from reflex.istate.shared import SharedStateBaseInternal
137+
138+
shared_state = await root_state.get_state(SharedStateBaseInternal)
139+
async with shared_state._modify_linked_states(
140+
previous_dirty_vars=previous_dirty_vars
141+
) as _:
142+
yield root_state
143+
else:
144+
yield root_state
145+
117146
async def close(self): # noqa: B027
118147
"""Close the state manager."""
119148

0 commit comments

Comments
 (0)