@@ -50,6 +50,17 @@ def _default_lock_warning_threshold() -> int:
5050 return get_config ().redis_lock_warning_threshold
5151
5252
53+ def _default_oplock_hold_time_ms () -> int :
54+ """Get the default opportunistic lock hold time.
55+
56+ Returns:
57+ The default opportunistic lock hold time.
58+ """
59+ return environment .REFLEX_OPLOCK_HOLD_TIME_MS .get () or (
60+ _default_lock_expiration () // 2
61+ )
62+
63+
5364SMR = f"[SMR:{ os .getpid ()} ]"
5465start = time .monotonic ()
5566
@@ -85,6 +96,11 @@ class StateManagerRedis(StateManager):
8596 default_factory = _default_lock_warning_threshold
8697 )
8798
99+ # How long to opportunistically hold the redis lock in milliseconds (must be less than the token expiration).
100+ oplock_hold_time_ms : int = dataclasses .field (
101+ default_factory = _default_oplock_hold_time_ms
102+ )
103+
88104 # The keyspace subscription string when redis is waiting for lock to be released.
89105 _redis_notify_keyspace_events : str = dataclasses .field (
90106 default = "K" # Enable keyspace notifications (target a particular key)
@@ -154,6 +170,9 @@ def __post_init__(self):
154170 if self .lock_warning_threshold >= (lock_expiration := self .lock_expiration ):
155171 msg = f"The lock warning threshold({ self .lock_warning_threshold } ) must be less than the lock expiration time({ lock_expiration } )."
156172 raise InvalidLockWarningThresholdError (msg )
173+ if self ._oplock_enabled and self .oplock_hold_time_ms >= lock_expiration :
174+ msg = f"The opportunistic lock hold time({ self .oplock_hold_time_ms } ) must be less than the lock expiration time({ lock_expiration } )."
175+ raise InvalidLockWarningThresholdError (msg )
157176 with contextlib .suppress (RuntimeError ):
158177 asyncio .get_running_loop () # Check if we're in an event loop.
159178 self ._ensure_lock_task ()
@@ -620,7 +639,7 @@ async def do_flush() -> None:
620639 async def lease_breaker ():
621640 cancelled_error : asyncio .CancelledError | None = None
622641 async with cleanup_ctx :
623- lease_break_time = ( self .lock_expiration * 0.8 ) / 1000
642+ lease_break_time = self .oplock_hold_time_ms / 1000
624643 if self ._debug_enabled :
625644 console .debug (
626645 f"{ SMR } [{ time .monotonic () - start :.3f} ] { client_token } lease breaker { lock_id .decode ()} started, sleeping for { lease_break_time } s"
0 commit comments