ENG-7948: StateManagerDisk deferred write queue#5883
Conversation
* New env var: REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0) * If the debounce is non-zero, then state manager will queue the disk write * Queued writes will be processed in order of set time after they exceed the debounce timeout * New StateManager.close method standardized in base class * Close app.state_manager when the server is going down * Flush all queued writes when the StateManagerDisk closes * Update test cases to always call `state_manager.close()`
There was a problem hiding this comment.
Greptile Overview
Summary
Implements a deferred write queue for StateManagerDisk to reduce disk I/O overhead by batching state writes. The debounce period is configurable via REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0 seconds). A background task processes queued writes after the debounce timeout and handles state expiration cleanup.
Major changes:
- New
QueueItemdataclass to track pending writes with timestamps - Background
_process_write_queue()task that processes writes older than debounce period - Memory cache expiration tracking via
_token_last_toucheddictionary - Standardized
close()method in baseStateManagerclass to flush pending writes on shutdown - Integration with app lifespan to ensure clean shutdown
- Updated tests to always call
state_manager.close()
Issues found:
- Critical: Line 295 only adds to queue if token not present, causing subsequent updates to be silently dropped
- Critical: In-memory cache
self.statesnever updated inset_state(), causing stale data to be served while waiting for debounced writes
Confidence Score: 1/5
- This PR contains critical bugs that will cause data loss and stale state issues in production
- Two critical logical errors in the core state management flow: (1) subsequent state updates are silently dropped when debouncing is enabled due to conditional queue insertion, and (2) the in-memory cache is never updated during set_state, causing stale data to be served to clients. These bugs will manifest as lost state updates and inconsistent application state.
- reflex/istate/manager/disk.py requires immediate attention - the set_state() method has critical bugs at lines 285-305
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| reflex/istate/manager/disk.py | 1/5 | Adds deferred write queue with debouncing and expiration handling; critical bug where subsequent updates to same token are dropped, and memory cache not updated causing stale data |
| reflex/istate/manager/init.py | 5/5 | Adds close() method to base StateManager class with empty default implementation |
| reflex/environment.py | 5/5 | Adds interpret_float_env() helper and REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS environment variable with 2.0 default |
| reflex/app_mixins/lifespan.py | 5/5 | Calls state_manager.close() during app shutdown to flush pending writes |
Sequence Diagram
sequenceDiagram
participant Client
participant App
participant StateManagerDisk
participant WriteQueue
participant ProcessTask
participant Disk
Note over StateManagerDisk: Initialization
StateManagerDisk->>StateManagerDisk: __post_init__()
StateManagerDisk->>Disk: _purge_expired_states()
Note over Client,Disk: State Update Flow (with debounce)
Client->>App: User action triggers state change
App->>StateManagerDisk: modify_state(token)
StateManagerDisk->>StateManagerDisk: get_state(token)
StateManagerDisk-->>App: yield state
App->>App: Modify state
App->>StateManagerDisk: set_state(token, state)
StateManagerDisk->>WriteQueue: Add/Update QueueItem(token, state, timestamp)
StateManagerDisk->>StateManagerDisk: _schedule_process_write_queue()
StateManagerDisk->>ProcessTask: create_task() if not running
StateManagerDisk-->>App: Return (state not yet on disk)
Note over ProcessTask,Disk: Background Write Processing
loop Every debounce period
ProcessTask->>ProcessTask: Check queue for items older than debounce
ProcessTask->>WriteQueue: Pop items ready to write
ProcessTask->>Disk: set_state_for_substate() via run_in_thread
ProcessTask->>ProcessTask: Check for expired tokens
ProcessTask->>Disk: _purge_expired_states() via run_in_thread
ProcessTask->>ProcessTask: _process_write_queue_delay()
end
Note over App,Disk: App Shutdown
App->>StateManagerDisk: close()
StateManagerDisk->>ProcessTask: cancel()
ProcessTask->>WriteQueue: Flush all remaining items
ProcessTask->>Disk: Write all items to disk
StateManagerDisk-->>App: Shutdown complete
Additional Comments (1)
-
reflex/istate/manager/disk.py, line 285-305 (link)logic: in-memory cache is never updated during set_state, only during get_state. this causes stale data to be served from memory cache while waiting for debounced writes. add memory cache update before queueing the write
6 files reviewed, 3 comments
CodSpeed Performance ReportMerging #5883 will not alter performanceComparing Summary
Footnotes |
… state would expire conserve resources by pausing the _process_write_queue for the amount of time of the oldest known token to expire.
Avoid interference with _schedule_process_write_queue
run_in_threadfor the actual disk write and purging of expired states to avoid blocking the event loop while writing to diskstate_manager.close()