From a31ddb524746612b4fdff0597375862440690773 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 30 Jun 2025 16:38:58 +0200 Subject: [PATCH 1/4] New keepalive behavior. --- .../src/client/sync/stream/AbstractRemote.ts | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index adb4cef8f..2513eccd2 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -24,8 +24,14 @@ const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period const KEEP_ALIVE_MS = 20_000; -// The ACK must be received in this period -const KEEP_ALIVE_LIFETIME_MS = 30_000; + +// One message of any type must be received in this period. +const SOCKET_TIMEOUT_MS = 30_000; + +// One keepalive message must be received in this period. +// If there is a backlog of messages (for example on slow connections), keepalive messages could be delayed +// significantly. Therefore this is longer than the socket timeout. +const KEEP_ALIVE_LIFETIME_MS = 90_000; export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote'); @@ -304,12 +310,26 @@ export abstract class AbstractRemote { // automatically as a header. const userAgent = this.getUserAgent(); + let keepAliveTimeout: any; + const resetTimeout = () => { + clearTimeout(keepAliveTimeout); + keepAliveTimeout = setTimeout(() => { + this.logger.error(`No data received on WebSocket in ${SOCKET_TIMEOUT_MS}ms, closing connection.`); + stream.close(); + }, SOCKET_TIMEOUT_MS); + }; + resetTimeout(); + const url = this.options.socketUrlTransformer(request.url); const connector = new RSocketConnector({ transport: new WebsocketClientTransport({ url, wsCreator: (url) => { - return this.createSocket(url); + const socket = this.createSocket(url); + socket.addEventListener('message', (event) => { + resetTimeout(); + }); + return socket; } }), setup: { @@ -332,9 +352,12 @@ export abstract class AbstractRemote { rsocket = await connector.connect(); } catch (ex) { this.logger.error(`Failed to connect WebSocket`, ex); + clearTimeout(keepAliveTimeout); throw ex; } + resetTimeout(); + const stream = new DataStream({ logger: this.logger, pressure: { @@ -344,6 +367,7 @@ export abstract class AbstractRemote { let socketIsClosed = false; const closeSocket = () => { + clearTimeout(keepAliveTimeout); if (socketIsClosed) { return; } From 199985ebde5aa00474a1c368dea5fc02867a1b37 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 1 Jul 2025 12:14:19 +0200 Subject: [PATCH 2/4] Defer sync stream parsing until the data is processed. --- .../sync/bucket/BucketStorageAdapter.ts | 2 +- .../client/sync/bucket/SqliteBucketStorage.ts | 2 +- .../src/client/sync/stream/AbstractRemote.ts | 22 +++++----- .../AbstractStreamingSyncImplementation.ts | 42 ++++++++++++------ packages/common/src/utils/DataStream.ts | 44 +++++++------------ .../bucket/ReactNativeBucketStorageAdapter.ts | 21 ++++++--- 6 files changed, 75 insertions(+), 58 deletions(-) diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index 3d6080769..ecd6c7ef4 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -106,5 +106,5 @@ export interface BucketStorageAdapter extends BaseObserver; + control(op: PowerSyncControlCommand, payload: string | Uint8Array | null): Promise; } diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index c206e9fa2..258b32acc 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -364,7 +364,7 @@ export class SqliteBucketStorage extends BaseObserver imp // No-op for now } - async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise { + async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise { return await this.writeTransaction(async (tx) => { const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]); return raw; diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 2513eccd2..c746ea4c4 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -273,7 +273,7 @@ export abstract class AbstractRemote { */ async socketStream(options: SocketSyncStreamOptions): Promise> { const bson = await this.getBSON(); - return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson); + return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson); } /** @@ -285,9 +285,9 @@ export abstract class AbstractRemote { */ async socketStreamRaw( options: SocketSyncStreamOptions, - map: (buffer: Buffer) => T, + map: (buffer: Uint8Array) => T, bson?: typeof BSON - ): Promise { + ): Promise> { const { path, fetchStrategy = FetchStrategy.Buffered } = options; const mimeType = bson == null ? 'application/json' : 'application/bson'; @@ -358,11 +358,12 @@ export abstract class AbstractRemote { resetTimeout(); - const stream = new DataStream({ + const stream = new DataStream({ logger: this.logger, pressure: { lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER - } + }, + mapLine: map }); let socketIsClosed = false; @@ -435,7 +436,7 @@ export abstract class AbstractRemote { return; } - stream.enqueueData(map(data)); + stream.enqueueData(data); }, onComplete: () => { stream.close(); @@ -561,8 +562,9 @@ export abstract class AbstractRemote { const decoder = new TextDecoder(); let buffer = ''; - const stream = new DataStream({ - logger: this.logger + const stream = new DataStream({ + logger: this.logger, + mapLine: mapLine }); const l = stream.registerListener({ @@ -574,7 +576,7 @@ export abstract class AbstractRemote { if (done) { const remaining = buffer.trim(); if (remaining.length != 0) { - stream.enqueueData(mapLine(remaining)); + stream.enqueueData(remaining); } stream.close(); @@ -589,7 +591,7 @@ export abstract class AbstractRemote { for (var i = 0; i < lines.length - 1; i++) { var l = lines[i].trim(); if (l.length > 0) { - stream.enqueueData(mapLine(l)); + stream.enqueueData(l); didCompleteLine = true; } } diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index f24e72e7a..46088023e 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -847,10 +847,7 @@ The next upload iteration will be delayed.`); // Pending sync lines received from the service, as well as local events that trigger a powersync_control // invocation (local events include refreshed tokens and completed uploads). // This is a single data stream so that we can handle all control calls from a single place. - let controlInvocations: DataStream<{ - command: PowerSyncControlCommand; - payload?: ArrayBuffer | string; - }> | null = null; + let controlInvocations: DataStream | null = null; async function connect(instr: EstablishSyncStream) { const syncOptions: SyncStreamOptions = { @@ -860,20 +857,34 @@ The next upload iteration will be delayed.`); }; if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) { - controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({ - command: PowerSyncControlCommand.PROCESS_TEXT_LINE, - payload: line - })); + controlInvocations = await remote.postStreamRaw(syncOptions, (line: string | EnqueuedCommand) => { + if (typeof line == 'string') { + return { + command: PowerSyncControlCommand.PROCESS_TEXT_LINE, + payload: line + }; + } else { + // Directly enqueued by us + return line; + } + }); } else { controlInvocations = await remote.socketStreamRaw( { ...syncOptions, fetchStrategy: resolvedOptions.fetchStrategy }, - (buffer) => ({ - command: PowerSyncControlCommand.PROCESS_BSON_LINE, - payload: buffer - }) + (payload: Uint8Array | EnqueuedCommand) => { + if (payload instanceof Uint8Array) { + return { + command: PowerSyncControlCommand.PROCESS_BSON_LINE, + payload: payload + }; + } else { + // Directly enqueued by us + return payload; + } + } ); } @@ -900,7 +911,7 @@ The next upload iteration will be delayed.`); await control(PowerSyncControlCommand.STOP); } - async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) { + async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) { const rawResponse = await adapter.control(op, payload ?? null); await handleInstructions(JSON.parse(rawResponse)); } @@ -1139,3 +1150,8 @@ The next upload iteration will be delayed.`); }); } } + +interface EnqueuedCommand { + command: PowerSyncControlCommand; + payload?: Uint8Array | string; +} diff --git a/packages/common/src/utils/DataStream.ts b/packages/common/src/utils/DataStream.ts index 9dcf1564a..887bc8af6 100644 --- a/packages/common/src/utils/DataStream.ts +++ b/packages/common/src/utils/DataStream.ts @@ -1,7 +1,9 @@ import Logger, { ILogger } from 'js-logger'; import { BaseListener, BaseObserver } from './BaseObserver.js'; -export type DataStreamOptions = { +export type DataStreamOptions = { + mapLine?: (line: SourceData) => ParsedData; + /** * Close the stream if any consumer throws an error */ @@ -33,8 +35,8 @@ export const DEFAULT_PRESSURE_LIMITS = { * native JS streams or async iterators. * This is handy for environments such as React Native which need polyfills for the above. */ -export class DataStream extends BaseObserver> { - dataQueue: Data[]; +export class DataStream extends BaseObserver> { + dataQueue: SourceData[]; protected isClosed: boolean; @@ -42,11 +44,14 @@ export class DataStream extends BaseObserver ParsedData; + + constructor(protected options?: DataStreamOptions) { super(); this.processingPromise = null; this.isClosed = false; this.dataQueue = []; + this.mapLine = options?.mapLine ?? ((line) => line as any); this.logger = options?.logger ?? Logger.get('DataStream'); @@ -84,7 +89,7 @@ export class DataStream extends BaseObserver extends BaseObserver { + async read(): Promise { if (this.closed) { return null; } @@ -127,7 +132,7 @@ export class DataStream extends BaseObserver) { + forEach(callback: DataStreamCallback) { if (this.dataQueue.length <= this.lowWatermark) { this.iterateAsyncErrored(async (l) => l.lowWater?.()); } @@ -154,24 +159,6 @@ export class DataStream extends BaseObserver(callback: (data: Data) => ReturnData): DataStream { - const stream = new DataStream(this.options); - const l = this.registerListener({ - data: async (data) => { - stream.enqueueData(callback(data)); - }, - closed: () => { - stream.close(); - l?.(); - } - }); - - return stream; - } - protected hasDataReader() { return Array.from(this.listeners.values()).some((l) => !!l.data); } @@ -184,7 +171,8 @@ export class DataStream extends BaseObserver l.data?.(data)); + const mapped = this.mapLine(data); + await this.iterateAsyncErrored(async (l) => l.data?.(mapped)); } if (this.dataQueue.length <= this.lowWatermark) { @@ -199,8 +187,8 @@ export class DataStream extends BaseObserver Promise) { - for (let i of Array.from(this.listeners.values())) { + protected async iterateAsyncErrored(cb: (l: Partial>) => Promise) { + for (let i of this.listeners.values()) { try { await cb(i); } catch (ex) { diff --git a/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts b/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts index b27bbae52..e3d470269 100644 --- a/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts +++ b/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts @@ -1,13 +1,24 @@ import { PowerSyncControlCommand, SqliteBucketStorage } from '@powersync/common'; export class ReactNativeBucketStorageAdapter extends SqliteBucketStorage { - control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise { - if (payload != null && typeof payload != 'string') { - // For some reason, we need to copy array buffers for RNQS to recognize them. We're doing that here because we - // don't want to pay the cost of a copy on platforms where it's not necessary. - payload = new Uint8Array(payload).buffer; + control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise { + if (payload instanceof Uint8Array) { + // RNQS doesn't accept Uint8Array arguments - convert to ArrayBuffer first. + payload = uint8ArrayToArrayBuffer(payload); } return super.control(op, payload); } } + +function uint8ArrayToArrayBuffer(array: Uint8Array): ArrayBuffer { + // SharedArrayBuffer isn't defined on ReactNative, so don't need to cater for that. + const arrayBuffer = array.buffer as ArrayBuffer; + if (array.byteOffset == 0 && array.byteLength == arrayBuffer.byteLength) { + // No copying needed - can use ArrayBuffer as-is + return arrayBuffer; + } else { + // Need to make a copy + return arrayBuffer.slice(array.byteOffset, array.byteOffset + array.byteLength); + } +} From ccb56e39cc217091396fd1ae2b2c8928230b231a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 1 Jul 2025 15:34:45 +0200 Subject: [PATCH 3/4] Add changeset. --- .changeset/dull-mugs-carry.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .changeset/dull-mugs-carry.md diff --git a/.changeset/dull-mugs-carry.md b/.changeset/dull-mugs-carry.md new file mode 100644 index 000000000..eb61d9a9e --- /dev/null +++ b/.changeset/dull-mugs-carry.md @@ -0,0 +1,10 @@ +--- +'@powersync/react-native': patch +'@powersync/diagnostics-app': patch +'@powersync/common': patch +'@powersync/node': patch +'@powersync/op-sqlite': patch +'@powersync/web': patch +--- + +Improve websocket keepalive logic to reduce keepalive errors. From 52509bc89753ba83f22907ec7b13b83d485034a6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 2 Jul 2025 12:06:33 +0200 Subject: [PATCH 4/4] Fix firing listeners in DataStream. --- packages/common/src/utils/DataStream.ts | 32 ++++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/packages/common/src/utils/DataStream.ts b/packages/common/src/utils/DataStream.ts index 887bc8af6..2d8dcde8f 100644 --- a/packages/common/src/utils/DataStream.ts +++ b/packages/common/src/utils/DataStream.ts @@ -142,11 +142,23 @@ export class DataStream extends BaseObserver { + this.processingPromise = null; + }); + this.processingPromise = promise; + return promise; + } + + protected hasDataReader() { + return Array.from(this.listeners.values()).some((l) => !!l.data); + } + + protected async _processQueue() { /** * Allow listeners to mutate the queue before processing. * This allows for operations such as dropping or compressing data @@ -156,16 +168,7 @@ export class DataStream extends BaseObserver l.highWater?.()); } - return (this.processingPromise = this._processQueue()); - } - - protected hasDataReader() { - return Array.from(this.listeners.values()).some((l) => !!l.data); - } - - protected async _processQueue() { if (this.isClosed || !this.hasDataReader()) { - Promise.resolve().then(() => (this.processingPromise = null)); return; } @@ -179,16 +182,17 @@ export class DataStream extends BaseObserver l.lowWater?.()); } - this.processingPromise = null; - - if (this.dataQueue.length) { + if (this.dataQueue.length > 0) { // Next tick setTimeout(() => this.processQueue()); } } protected async iterateAsyncErrored(cb: (l: Partial>) => Promise) { - for (let i of this.listeners.values()) { + // Important: We need to copy the listeners, as calling a listener could result in adding another + // listener, resulting in infinite loops. + const listeners = Array.from(this.listeners.values()); + for (let i of listeners) { try { await cb(i); } catch (ex) {