Skip to content

Commit 360b135

Browse files
committed
New keepalive behavior.
1 parent 8a9a5dc commit 360b135

File tree

1 file changed

+26
-3
lines changed

1 file changed

+26
-3
lines changed

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2424

2525
// Keep alive message is sent every period
2626
const KEEP_ALIVE_MS = 20_000;
27-
// The ACK must be received in this period
28-
const KEEP_ALIVE_LIFETIME_MS = 30_000;
27+
28+
// One message of any type must be received in this period.
29+
const SOCKET_TIMEOUT_MS = 30_000;
30+
31+
// One keepalive message must be received in this period.
32+
// If there is a backlog of messages (for example on slow connections), keepalive messages could be delayed
33+
// significantly. Therefore this is longer than the socket timeout.
34+
const KEEP_ALIVE_LIFETIME_MS = 90_000;
2935

3036
export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote');
3137

@@ -304,12 +310,26 @@ export abstract class AbstractRemote {
304310
// automatically as a header.
305311
const userAgent = this.getUserAgent();
306312

313+
let keepAliveTimeout: any;
314+
const resetTimeout = () => {
315+
clearTimeout(keepAliveTimeout);
316+
keepAliveTimeout = setTimeout(() => {
317+
this.logger.error(`No data received on WebSocket in ${KEEP_ALIVE_LIFETIME_MS}ms, closing connection.`);
318+
stream.close();
319+
}, SOCKET_TIMEOUT_MS);
320+
};
321+
resetTimeout();
322+
307323
const url = this.options.socketUrlTransformer(request.url);
308324
const connector = new RSocketConnector({
309325
transport: new WebsocketClientTransport({
310326
url,
311327
wsCreator: (url) => {
312-
return this.createSocket(url);
328+
const socket = this.createSocket(url);
329+
socket.addEventListener('message', (event) => {
330+
resetTimeout();
331+
});
332+
return socket;
313333
}
314334
}),
315335
setup: {
@@ -335,6 +355,8 @@ export abstract class AbstractRemote {
335355
throw ex;
336356
}
337357

358+
resetTimeout();
359+
338360
const stream = new DataStream({
339361
logger: this.logger,
340362
pressure: {
@@ -349,6 +371,7 @@ export abstract class AbstractRemote {
349371
}
350372
socketIsClosed = true;
351373
rsocket.close();
374+
clearTimeout(keepAliveTimeout);
352375
};
353376
// Helps to prevent double close scenarios
354377
rsocket.onClose(() => (socketIsClosed = true));

0 commit comments

Comments
 (0)