Skip to content

Commit ba4070d

Browse files
authored
Reassociate sid and token when connecting websocket (#5794)
* Reassociate sid and token when connecting websocket If there are active background tasks for a given client_token, reassociate the token and sid in the state when reconnecting so that updates from the background task go to to the new websocket. * Do not queue websocket events until the websocket is connected * fully drain the queue on socket connect event
1 parent f226bb8 commit ba4070d

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

reflex/.templates/web/utils/state.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,8 @@ export const queueEvents = async (
478478
* @param params The params object from React Router
479479
*/
480480
export const processEvent = async (socket, navigate, params) => {
481-
// Only proceed if the socket is up and no event in the queue uses state, otherwise we throw the event into the void
482-
if (!socket && isStateful()) {
481+
// Only proceed if the socket is up or no event in the queue uses state, otherwise we throw the event into the void
482+
if (isStateful() && !(socket && socket.connected)) {
483483
return;
484484
}
485485

@@ -576,11 +576,15 @@ export const connect = async (
576576
};
577577

578578
// Once the socket is open, hydrate the page.
579-
socket.current.on("connect", () => {
579+
socket.current.on("connect", async () => {
580580
setConnectErrors([]);
581581
window.addEventListener("pagehide", pagehideHandler);
582582
window.addEventListener("beforeunload", disconnectTrigger);
583583
window.addEventListener("unload", disconnectTrigger);
584+
// Drain any initial events from the queue.
585+
while (event_queue.length > 0 && !event_processing) {
586+
await processEvent(socket.current, navigate, () => params.current);
587+
}
584588
});
585589

586590
socket.current.on("connect_error", (error) => {
@@ -893,7 +897,7 @@ export const useEventLoop = (
893897
// Main event loop.
894898
useEffect(() => {
895899
// Skip if the backend is disabled
896-
if (isBackendDisabled()) {
900+
if (isBackendDisabled() || !socket.current || !socket.current.connected) {
897901
return;
898902
}
899903
(async () => {

reflex/app.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2189,3 +2189,10 @@ async def link_token_to_sid(self, sid: str, token: str):
21892189
if new_token:
21902190
# Duplicate detected, emit new token to client
21912191
await self.emit("new_token", new_token, to=sid)
2192+
2193+
# Update client state to apply new sid/token for running background tasks.
2194+
async with self.app.modify_state(
2195+
_substate_key(new_token or token, self.app.state_manager.state)
2196+
) as state:
2197+
state.router_data[constants.RouteVar.SESSION_ID] = sid
2198+
state.router = RouterData.from_router_data(state.router_data)

0 commit comments

Comments
 (0)