Skip to content

Commit 5ad988e

Browse files
committed
Automatic websocket reconnect and reload handling
* ensureSocketConnected is called when adding events or pumping the queue to trigger an automatic reconnection to the backend * when "reload" event is encountered, trigger a re-hydrate and wait until ALL on_load have finished processing and `is_hydrated` is True before requeue the event that caused the "reload"
1 parent 4468e14 commit 5ad988e

File tree

1 file changed

+48
-17
lines changed

1 file changed

+48
-17
lines changed

reflex/.templates/web/utils/state.js

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ export const connect = async (
531531
) => {
532532
// Get backend URL object from the endpoint.
533533
const endpoint = getBackendURL(EVENTURL);
534+
const on_hydrated_queue = [];
534535

535536
// Create the socket.
536537
socket.current = io(endpoint.href, {
@@ -552,7 +553,17 @@ export const connect = async (
552553

553554
function checkVisibility() {
554555
if (document.visibilityState === "visible") {
555-
if (!socket.current.connected) {
556+
if (!socket.current) {
557+
connect(
558+
socket,
559+
dispatch,
560+
transports,
561+
setConnectErrors,
562+
client_storage,
563+
navigate,
564+
params,
565+
);
566+
} else if (!socket.current.connected) {
556567
console.log("Socket is disconnected, attempting to reconnect ");
557568
socket.current.connect();
558569
} else {
@@ -593,6 +604,7 @@ export const connect = async (
593604

594605
// When the socket disconnects reset the event_processing flag
595606
socket.current.on("disconnect", () => {
607+
socket.current = null; // allow reconnect to occur automatically
596608
event_processing = false;
597609
window.removeEventListener("unload", disconnectTrigger);
598610
window.removeEventListener("beforeunload", disconnectTrigger);
@@ -603,6 +615,14 @@ export const connect = async (
603615
socket.current.on("event", async (update) => {
604616
for (const substate in update.delta) {
605617
dispatch[substate](update.delta[substate]);
618+
// handle events waiting for `is_hydrated`
619+
if (
620+
substate === state_name &&
621+
update.delta[substate]?.is_hydrated_rx_state_
622+
) {
623+
queueEvents(on_hydrated_queue, socket, false, navigate, params);
624+
on_hydrated_queue.length = 0;
625+
}
606626
}
607627
applyClientStorageDelta(client_storage, update.delta);
608628
event_processing = !update.final;
@@ -612,7 +632,8 @@ export const connect = async (
612632
});
613633
socket.current.on("reload", async (event) => {
614634
event_processing = false;
615-
queueEvents([...initialEvents(), event], socket, true, navigate, params);
635+
on_hydrated_queue.push(event);
636+
queueEvents(initialEvents(), socket, true, navigate, params);
616637
});
617638
socket.current.on("new_token", async (new_token) => {
618639
token = new_token;
@@ -774,10 +795,32 @@ export const useEventLoop = (
774795
}
775796
}, [paramsR]);
776797

798+
const ensureSocketConnected = useCallback(async () => {
799+
// only use websockets if state is present and backend is not disabled (reflex cloud).
800+
if (
801+
Object.keys(initialState).length > 1 &&
802+
!isBackendDisabled() &&
803+
!socket.current
804+
) {
805+
// Initialize the websocket connection.
806+
await connect(
807+
socket,
808+
dispatch,
809+
["websocket"],
810+
setConnectErrors,
811+
client_storage,
812+
navigate,
813+
() => params.current,
814+
);
815+
}
816+
}, [socket, dispatch, setConnectErrors, client_storage, navigate, params]);
817+
777818
// Function to add new events to the event queue.
778819
const addEvents = useCallback((events, args, event_actions) => {
779820
const _events = events.filter((e) => e !== undefined && e !== null);
780821

822+
ensureSocketConnected();
823+
781824
if (!(args instanceof Array)) {
782825
args = [args];
783826
}
@@ -870,21 +913,8 @@ export const useEventLoop = (
870913

871914
// Handle socket connect/disconnect.
872915
useEffect(() => {
873-
// only use websockets if state is present and backend is not disabled (reflex cloud).
874-
if (Object.keys(initialState).length > 1 && !isBackendDisabled()) {
875-
// Initialize the websocket connection.
876-
if (!socket.current) {
877-
connect(
878-
socket,
879-
dispatch,
880-
["websocket"],
881-
setConnectErrors,
882-
client_storage,
883-
navigate,
884-
() => params.current,
885-
);
886-
}
887-
}
916+
// Initialize the websocket connection.
917+
ensureSocketConnected();
888918

889919
// Cleanup function.
890920
return () => {
@@ -903,6 +933,7 @@ export const useEventLoop = (
903933
(async () => {
904934
// Process all outstanding events.
905935
while (event_queue.length > 0 && !event_processing) {
936+
await ensureSocketConnected();
906937
await processEvent(socket.current, navigate, () => params.current);
907938
}
908939
})();

0 commit comments

Comments
 (0)