Skip to content

Commit 9f080b5

Browse files
committed
add context for lock expiry
1 parent fb19fb4 commit 9f080b5

File tree

5 files changed

+49
-12
lines changed

5 files changed

+49
-12
lines changed

reflex/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1758,7 +1758,9 @@ async def process(
17581758
constants.RouteVar.CLIENT_IP: client_ip,
17591759
})
17601760
# Get the state for the session exclusively.
1761-
async with app.state_manager.modify_state(event.substate_token) as state:
1761+
async with app.state_manager.modify_state(
1762+
event.substate_token, context=event
1763+
) as state:
17621764
# When this is a brand new instance of the state, signal the
17631765
# frontend to reload before processing it.
17641766
if (

reflex/istate/manager/__init__.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from reflex import constants
99
from reflex.config import get_config
10+
from reflex.event import Event
1011
from reflex.state import BaseState
1112
from reflex.utils import console, prerequisites
1213
from reflex.utils.exceptions import InvalidStateManagerModeError
@@ -71,21 +72,27 @@ async def get_state(self, token: str) -> BaseState:
7172
"""
7273

7374
@abstractmethod
74-
async def set_state(self, token: str, state: BaseState):
75+
async def set_state(
76+
self, token: str, state: BaseState, *, context: Event | None = None
77+
):
7578
"""Set the state for a token.
7679
7780
Args:
7881
token: The token to set the state for.
7982
state: The state to set.
83+
context: The event context.
8084
"""
8185

8286
@abstractmethod
8387
@contextlib.asynccontextmanager
84-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
88+
async def modify_state(
89+
self, token: str, *, context: Event | None = None
90+
) -> AsyncIterator[BaseState]:
8591
"""Modify the state for a token while holding exclusive lock.
8692
8793
Args:
8894
token: The token to modify the state for.
95+
context: The event context.
8996
9097
Yields:
9198
The state for the token.

reflex/istate/manager/disk.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing_extensions import override
1313

1414
from reflex.environment import environment
15+
from reflex.event import Event
1516
from reflex.istate.manager import StateManager, _default_token_expiration
1617
from reflex.state import BaseState, _split_substate_key, _substate_key
1718
from reflex.utils import console, path_ops, prerequisites
@@ -301,12 +302,15 @@ async def _schedule_process_write_queue(self):
301302
await asyncio.sleep(0) # Yield to allow the task to start.
302303

303304
@override
304-
async def set_state(self, token: str, state: BaseState):
305+
async def set_state(
306+
self, token: str, state: BaseState, *, context: Event | None = None
307+
):
305308
"""Set the state for a token.
306309
307310
Args:
308311
token: The token to set the state for.
309312
state: The state to set.
313+
context: The event context.
310314
"""
311315
client_token, _ = _split_substate_key(token)
312316
if self._write_debounce_seconds > 0:
@@ -325,11 +329,14 @@ async def set_state(self, token: str, state: BaseState):
325329

326330
@override
327331
@contextlib.asynccontextmanager
328-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
332+
async def modify_state(
333+
self, token: str, *, context: Event | None = None
334+
) -> AsyncIterator[BaseState]:
329335
"""Modify the state for a token while holding exclusive lock.
330336
331337
Args:
332338
token: The token to modify the state for.
339+
context: The event context.
333340
334341
Yields:
335342
The state for the token.
@@ -344,7 +351,7 @@ async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
344351
async with self._states_locks[client_token]:
345352
state = await self.get_state(token)
346353
yield state
347-
await self.set_state(token, state)
354+
await self.set_state(token, state, context=context)
348355

349356
async def close(self):
350357
"""Close the state manager, flushing any pending writes to disk."""

reflex/istate/manager/memory.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from typing_extensions import override
99

10+
from reflex.event import Event
1011
from reflex.istate.manager import StateManager
1112
from reflex.state import BaseState, _split_substate_key
1213

@@ -43,23 +44,29 @@ async def get_state(self, token: str) -> BaseState:
4344
return self.states[token]
4445

4546
@override
46-
async def set_state(self, token: str, state: BaseState):
47+
async def set_state(
48+
self, token: str, state: BaseState, *, context: Event | None = None
49+
):
4750
"""Set the state for a token.
4851
4952
Args:
5053
token: The token to set the state for.
5154
state: The state to set.
55+
context: The event context.
5256
"""
5357
token = _split_substate_key(token)[0]
5458
self.states[token] = state
5559

5660
@override
5761
@contextlib.asynccontextmanager
58-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
62+
async def modify_state(
63+
self, token: str, *, context: Event | None = None
64+
) -> AsyncIterator[BaseState]:
5965
"""Modify the state for a token while holding exclusive lock.
6066
6167
Args:
6268
token: The token to modify the state for.
69+
context: The event context.
6370
6471
Yields:
6572
The state for the token.

reflex/istate/manager/redis.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from reflex.config import get_config
1616
from reflex.environment import environment
17+
from reflex.event import Event
1718
from reflex.istate.manager import StateManager, _default_token_expiration
1819
from reflex.state import BaseState, _split_substate_key, _substate_key
1920
from reflex.utils import console
@@ -261,14 +262,17 @@ async def set_state(
261262
self,
262263
token: str,
263264
state: BaseState,
265+
*,
264266
lock_id: bytes | None = None,
267+
context: Event | None = None,
265268
):
266269
"""Set the state for a token.
267270
268271
Args:
269272
token: The token to set the state for.
270273
state: The state to set.
271274
lock_id: If provided, the lock_key must be set to this value to set the state.
275+
context: The event context.
272276
273277
Raises:
274278
LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID.
@@ -283,6 +287,7 @@ async def set_state(
283287
f"Lock expired for token {token} while processing. Consider increasing "
284288
f"`app.state_manager.lock_expiration` (currently {self.lock_expiration}) "
285289
"or use `@rx.event(background=True)` decorator for long-running tasks."
290+
+ (f" Happened in event: {context.name}" if context is not None else "")
286291
)
287292
raise LockExpiredError(msg)
288293
if lock_id is not None:
@@ -292,7 +297,12 @@ async def set_state(
292297
if time_taken > self.lock_warning_threshold / 1000:
293298
console.warn(
294299
f"Lock for token {token} was held too long {time_taken=}s, "
295-
f"use `@rx.event(background=True)` decorator for long-running tasks.",
300+
f"use `@rx.event(background=True)` decorator for long-running tasks."
301+
+ (
302+
f" Happened in event: {context.name}"
303+
if context is not None
304+
else ""
305+
),
296306
dedupe=True,
297307
)
298308

@@ -308,7 +318,8 @@ async def set_state(
308318
self.set_state(
309319
_substate_key(client_token, substate),
310320
substate,
311-
lock_id,
321+
lock_id=lock_id,
322+
context=context,
312323
),
313324
name=f"reflex_set_state|{client_token}|{substate.get_full_name()}",
314325
)
@@ -330,19 +341,22 @@ async def set_state(
330341

331342
@override
332343
@contextlib.asynccontextmanager
333-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
344+
async def modify_state(
345+
self, token: str, *, context: Event | None = None
346+
) -> AsyncIterator[BaseState]:
334347
"""Modify the state for a token while holding exclusive lock.
335348
336349
Args:
337350
token: The token to modify the state for.
351+
context: The event context.
338352
339353
Yields:
340354
The state for the token.
341355
"""
342356
async with self._lock(token) as lock_id:
343357
state = await self.get_state(token)
344358
yield state
345-
await self.set_state(token, state, lock_id)
359+
await self.set_state(token, state, lock_id=lock_id, context=context)
346360

347361
@staticmethod
348362
def _lock_key(token: str) -> bytes:

0 commit comments

Comments
 (0)