Skip to content

Commit b6b7f32

Browse files
adhami3310masenf
andauthored
add context for lock expiry (#5918)
* add context for lock expiry * unpack --------- Co-authored-by: Masen Furer <[email protected]>
1 parent 1d71117 commit b6b7f32

File tree

6 files changed

+82
-19
lines changed

6 files changed

+82
-19
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ Now you can modify the source code in `my_app_name/my_app_name.py`. Reflex has f
9898

9999
If you installed Reflex without a virtual environment and the `reflex` command is not found, you can run commands using: `python3 -m reflex init` and `python3 -m reflex run`
100100

101-
102101
## 🫧 Example App
103102

104103
Let's go over an example: creating an image generation UI around [DALL·E](https://platform.openai.com/docs/guides/images/image-generation?context=node). For simplicity, we just call the [OpenAI API](https://platform.openai.com/docs/api-reference/authentication), but you could replace this with an ML model run locally.

reflex/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1758,7 +1758,9 @@ async def process(
17581758
constants.RouteVar.CLIENT_IP: client_ip,
17591759
})
17601760
# Get the state for the session exclusively.
1761-
async with app.state_manager.modify_state(event.substate_token) as state:
1761+
async with app.state_manager.modify_state(
1762+
event.substate_token, event=event
1763+
) as state:
17621764
# When this is a brand new instance of the state, signal the
17631765
# frontend to reload before processing it.
17641766
if (

reflex/istate/manager/__init__.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,27 @@
44
import dataclasses
55
from abc import ABC, abstractmethod
66
from collections.abc import AsyncIterator
7+
from typing import TypedDict
8+
9+
from typing_extensions import ReadOnly, Unpack
710

811
from reflex import constants
912
from reflex.config import get_config
13+
from reflex.event import Event
1014
from reflex.state import BaseState
1115
from reflex.utils import console, prerequisites
1216
from reflex.utils.exceptions import InvalidStateManagerModeError
1317

1418

19+
class StateModificationContext(TypedDict, total=False):
20+
"""The context for modifying state."""
21+
22+
event: ReadOnly[Event]
23+
24+
25+
EmptyContext = StateModificationContext()
26+
27+
1528
@dataclasses.dataclass
1629
class StateManager(ABC):
1730
"""A class to manage many client states."""
@@ -71,21 +84,30 @@ async def get_state(self, token: str) -> BaseState:
7184
"""
7285

7386
@abstractmethod
74-
async def set_state(self, token: str, state: BaseState):
87+
async def set_state(
88+
self,
89+
token: str,
90+
state: BaseState,
91+
**context: Unpack[StateModificationContext],
92+
):
7593
"""Set the state for a token.
7694
7795
Args:
7896
token: The token to set the state for.
7997
state: The state to set.
98+
context: The state modification context.
8099
"""
81100

82101
@abstractmethod
83102
@contextlib.asynccontextmanager
84-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
103+
async def modify_state(
104+
self, token: str, **context: Unpack[StateModificationContext]
105+
) -> AsyncIterator[BaseState]:
85106
"""Modify the state for a token while holding exclusive lock.
86107
87108
Args:
88109
token: The token to modify the state for.
110+
context: The state modification context.
89111
90112
Yields:
91113
The state for the token.

reflex/istate/manager/disk.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
from hashlib import md5
1010
from pathlib import Path
1111

12-
from typing_extensions import override
12+
from typing_extensions import Unpack, override
1313

1414
from reflex.environment import environment
15-
from reflex.istate.manager import StateManager, _default_token_expiration
15+
from reflex.istate.manager import (
16+
StateManager,
17+
StateModificationContext,
18+
_default_token_expiration,
19+
)
1620
from reflex.state import BaseState, _split_substate_key, _substate_key
1721
from reflex.utils import console, path_ops, prerequisites
1822
from reflex.utils.misc import run_in_thread
@@ -301,12 +305,15 @@ async def _schedule_process_write_queue(self):
301305
await asyncio.sleep(0) # Yield to allow the task to start.
302306

303307
@override
304-
async def set_state(self, token: str, state: BaseState):
308+
async def set_state(
309+
self, token: str, state: BaseState, **context: Unpack[StateModificationContext]
310+
):
305311
"""Set the state for a token.
306312
307313
Args:
308314
token: The token to set the state for.
309315
state: The state to set.
316+
context: The state modification context.
310317
"""
311318
client_token, _ = _split_substate_key(token)
312319
if self._write_debounce_seconds > 0:
@@ -325,11 +332,14 @@ async def set_state(self, token: str, state: BaseState):
325332

326333
@override
327334
@contextlib.asynccontextmanager
328-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
335+
async def modify_state(
336+
self, token: str, **context: Unpack[StateModificationContext]
337+
) -> AsyncIterator[BaseState]:
329338
"""Modify the state for a token while holding exclusive lock.
330339
331340
Args:
332341
token: The token to modify the state for.
342+
context: The state modification context.
333343
334344
Yields:
335345
The state for the token.
@@ -344,7 +354,7 @@ async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
344354
async with self._states_locks[client_token]:
345355
state = await self.get_state(token)
346356
yield state
347-
await self.set_state(token, state)
357+
await self.set_state(token, state, **context)
348358

349359
async def close(self):
350360
"""Close the state manager, flushing any pending writes to disk."""

reflex/istate/manager/memory.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import dataclasses
66
from collections.abc import AsyncIterator
77

8-
from typing_extensions import override
8+
from typing_extensions import Unpack, override
99

10-
from reflex.istate.manager import StateManager
10+
from reflex.istate.manager import StateManager, StateModificationContext
1111
from reflex.state import BaseState, _split_substate_key
1212

1313

@@ -43,23 +43,32 @@ async def get_state(self, token: str) -> BaseState:
4343
return self.states[token]
4444

4545
@override
46-
async def set_state(self, token: str, state: BaseState):
46+
async def set_state(
47+
self,
48+
token: str,
49+
state: BaseState,
50+
**context: Unpack[StateModificationContext],
51+
):
4752
"""Set the state for a token.
4853
4954
Args:
5055
token: The token to set the state for.
5156
state: The state to set.
57+
context: The state modification context.
5258
"""
5359
token = _split_substate_key(token)[0]
5460
self.states[token] = state
5561

5662
@override
5763
@contextlib.asynccontextmanager
58-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
64+
async def modify_state(
65+
self, token: str, **context: Unpack[StateModificationContext]
66+
) -> AsyncIterator[BaseState]:
5967
"""Modify the state for a token while holding exclusive lock.
6068
6169
Args:
6270
token: The token to modify the state for.
71+
context: The state modification context.
6372
6473
Yields:
6574
The state for the token.

reflex/istate/manager/redis.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@
1010
from redis import ResponseError
1111
from redis.asyncio import Redis
1212
from redis.asyncio.client import PubSub
13-
from typing_extensions import override
13+
from typing_extensions import Unpack, override
1414

1515
from reflex.config import get_config
1616
from reflex.environment import environment
17-
from reflex.istate.manager import StateManager, _default_token_expiration
17+
from reflex.istate.manager import (
18+
StateManager,
19+
StateModificationContext,
20+
_default_token_expiration,
21+
)
1822
from reflex.state import BaseState, _split_substate_key, _substate_key
1923
from reflex.utils import console
2024
from reflex.utils.exceptions import (
@@ -261,14 +265,17 @@ async def set_state(
261265
self,
262266
token: str,
263267
state: BaseState,
268+
*,
264269
lock_id: bytes | None = None,
270+
**context: Unpack[StateModificationContext],
265271
):
266272
"""Set the state for a token.
267273
268274
Args:
269275
token: The token to set the state for.
270276
state: The state to set.
271277
lock_id: If provided, the lock_key must be set to this value to set the state.
278+
context: The event context.
272279
273280
Raises:
274281
LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID.
@@ -283,6 +290,11 @@ async def set_state(
283290
f"Lock expired for token {token} while processing. Consider increasing "
284291
f"`app.state_manager.lock_expiration` (currently {self.lock_expiration}) "
285292
"or use `@rx.event(background=True)` decorator for long-running tasks."
293+
+ (
294+
f" Happened in event: {event.name}"
295+
if (event := context.get("event")) is not None
296+
else ""
297+
)
286298
)
287299
raise LockExpiredError(msg)
288300
if lock_id is not None:
@@ -292,7 +304,12 @@ async def set_state(
292304
if time_taken > self.lock_warning_threshold / 1000:
293305
console.warn(
294306
f"Lock for token {token} was held too long {time_taken=}s, "
295-
f"use `@rx.event(background=True)` decorator for long-running tasks.",
307+
f"use `@rx.event(background=True)` decorator for long-running tasks."
308+
+ (
309+
f" Happened in event: {event.name}"
310+
if (event := context.get("event")) is not None
311+
else ""
312+
),
296313
dedupe=True,
297314
)
298315

@@ -308,7 +325,8 @@ async def set_state(
308325
self.set_state(
309326
_substate_key(client_token, substate),
310327
substate,
311-
lock_id,
328+
lock_id=lock_id,
329+
**context,
312330
),
313331
name=f"reflex_set_state|{client_token}|{substate.get_full_name()}",
314332
)
@@ -330,19 +348,22 @@ async def set_state(
330348

331349
@override
332350
@contextlib.asynccontextmanager
333-
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
351+
async def modify_state(
352+
self, token: str, **context: Unpack[StateModificationContext]
353+
) -> AsyncIterator[BaseState]:
334354
"""Modify the state for a token while holding exclusive lock.
335355
336356
Args:
337357
token: The token to modify the state for.
358+
context: The state modification context.
338359
339360
Yields:
340361
The state for the token.
341362
"""
342363
async with self._lock(token) as lock_id:
343364
state = await self.get_state(token)
344365
yield state
345-
await self.set_state(token, state, lock_id)
366+
await self.set_state(token, state, lock_id=lock_id, **context)
346367

347368
@staticmethod
348369
def _lock_key(token: str) -> bytes:

0 commit comments

Comments
 (0)