diff --git a/.changeset/dull-mugs-carry.md b/.changeset/dull-mugs-carry.md new file mode 100644 index 000000000..eb61d9a9e --- /dev/null +++ b/.changeset/dull-mugs-carry.md @@ -0,0 +1,10 @@ +--- +'@powersync/react-native': patch +'@powersync/diagnostics-app': patch +'@powersync/common': patch +'@powersync/node': patch +'@powersync/op-sqlite': patch +'@powersync/web': patch +--- + +Improve websocket keepalive logic to reduce keepalive errors. diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index 3d6080769..ecd6c7ef4 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -106,5 +106,5 @@ export interface BucketStorageAdapter extends BaseObserver; + control(op: PowerSyncControlCommand, payload: string | Uint8Array | null): Promise; } diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index c206e9fa2..258b32acc 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -364,7 +364,7 @@ export class SqliteBucketStorage extends BaseObserver imp // No-op for now } - async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise { + async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise { return await this.writeTransaction(async (tx) => { const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]); return raw; diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index adb4cef8f..c746ea4c4 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -24,8 +24,14 @@ const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period const KEEP_ALIVE_MS = 20_000; -// The ACK must be received in this period -const KEEP_ALIVE_LIFETIME_MS = 30_000; + +// One message of any type must be received in this period. +const SOCKET_TIMEOUT_MS = 30_000; + +// One keepalive message must be received in this period. +// If there is a backlog of messages (for example on slow connections), keepalive messages could be delayed +// significantly. Therefore this is longer than the socket timeout. +const KEEP_ALIVE_LIFETIME_MS = 90_000; export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote'); @@ -267,7 +273,7 @@ export abstract class AbstractRemote { */ async socketStream(options: SocketSyncStreamOptions): Promise> { const bson = await this.getBSON(); - return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson); + return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson); } /** @@ -279,9 +285,9 @@ export abstract class AbstractRemote { */ async socketStreamRaw( options: SocketSyncStreamOptions, - map: (buffer: Buffer) => T, + map: (buffer: Uint8Array) => T, bson?: typeof BSON - ): Promise { + ): Promise> { const { path, fetchStrategy = FetchStrategy.Buffered } = options; const mimeType = bson == null ? 'application/json' : 'application/bson'; @@ -304,12 +310,26 @@ export abstract class AbstractRemote { // automatically as a header. const userAgent = this.getUserAgent(); + let keepAliveTimeout: any; + const resetTimeout = () => { + clearTimeout(keepAliveTimeout); + keepAliveTimeout = setTimeout(() => { + this.logger.error(`No data received on WebSocket in ${SOCKET_TIMEOUT_MS}ms, closing connection.`); + stream.close(); + }, SOCKET_TIMEOUT_MS); + }; + resetTimeout(); + const url = this.options.socketUrlTransformer(request.url); const connector = new RSocketConnector({ transport: new WebsocketClientTransport({ url, wsCreator: (url) => { - return this.createSocket(url); + const socket = this.createSocket(url); + socket.addEventListener('message', (event) => { + resetTimeout(); + }); + return socket; } }), setup: { @@ -332,18 +352,23 @@ export abstract class AbstractRemote { rsocket = await connector.connect(); } catch (ex) { this.logger.error(`Failed to connect WebSocket`, ex); + clearTimeout(keepAliveTimeout); throw ex; } - const stream = new DataStream({ + resetTimeout(); + + const stream = new DataStream({ logger: this.logger, pressure: { lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER - } + }, + mapLine: map }); let socketIsClosed = false; const closeSocket = () => { + clearTimeout(keepAliveTimeout); if (socketIsClosed) { return; } @@ -411,7 +436,7 @@ export abstract class AbstractRemote { return; } - stream.enqueueData(map(data)); + stream.enqueueData(data); }, onComplete: () => { stream.close(); @@ -537,8 +562,9 @@ export abstract class AbstractRemote { const decoder = new TextDecoder(); let buffer = ''; - const stream = new DataStream({ - logger: this.logger + const stream = new DataStream({ + logger: this.logger, + mapLine: mapLine }); const l = stream.registerListener({ @@ -550,7 +576,7 @@ export abstract class AbstractRemote { if (done) { const remaining = buffer.trim(); if (remaining.length != 0) { - stream.enqueueData(mapLine(remaining)); + stream.enqueueData(remaining); } stream.close(); @@ -565,7 +591,7 @@ export abstract class AbstractRemote { for (var i = 0; i < lines.length - 1; i++) { var l = lines[i].trim(); if (l.length > 0) { - stream.enqueueData(mapLine(l)); + stream.enqueueData(l); didCompleteLine = true; } } diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 9ea24685f..85d493ef5 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -848,10 +848,7 @@ The next upload iteration will be delayed.`); // Pending sync lines received from the service, as well as local events that trigger a powersync_control // invocation (local events include refreshed tokens and completed uploads). // This is a single data stream so that we can handle all control calls from a single place. - let controlInvocations: DataStream<{ - command: PowerSyncControlCommand; - payload?: ArrayBuffer | string; - }> | null = null; + let controlInvocations: DataStream | null = null; async function connect(instr: EstablishSyncStream) { const syncOptions: SyncStreamOptions = { @@ -861,20 +858,34 @@ The next upload iteration will be delayed.`); }; if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) { - controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({ - command: PowerSyncControlCommand.PROCESS_TEXT_LINE, - payload: line - })); + controlInvocations = await remote.postStreamRaw(syncOptions, (line: string | EnqueuedCommand) => { + if (typeof line == 'string') { + return { + command: PowerSyncControlCommand.PROCESS_TEXT_LINE, + payload: line + }; + } else { + // Directly enqueued by us + return line; + } + }); } else { controlInvocations = await remote.socketStreamRaw( { ...syncOptions, fetchStrategy: resolvedOptions.fetchStrategy }, - (buffer) => ({ - command: PowerSyncControlCommand.PROCESS_BSON_LINE, - payload: buffer - }) + (payload: Uint8Array | EnqueuedCommand) => { + if (payload instanceof Uint8Array) { + return { + command: PowerSyncControlCommand.PROCESS_BSON_LINE, + payload: payload + }; + } else { + // Directly enqueued by us + return payload; + } + } ); } @@ -906,7 +917,7 @@ The next upload iteration will be delayed.`); await control(PowerSyncControlCommand.STOP); } - async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) { + async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) { const rawResponse = await adapter.control(op, payload ?? null); await handleInstructions(JSON.parse(rawResponse)); } @@ -1145,3 +1156,8 @@ The next upload iteration will be delayed.`); }); } } + +interface EnqueuedCommand { + command: PowerSyncControlCommand; + payload?: Uint8Array | string; +} diff --git a/packages/common/src/utils/DataStream.ts b/packages/common/src/utils/DataStream.ts index 35b93c7e1..410b1066b 100644 --- a/packages/common/src/utils/DataStream.ts +++ b/packages/common/src/utils/DataStream.ts @@ -1,7 +1,9 @@ import Logger, { ILogger } from 'js-logger'; import { BaseListener, BaseObserver } from './BaseObserver.js'; -export type DataStreamOptions = { +export type DataStreamOptions = { + mapLine?: (line: SourceData) => ParsedData; + /** * Close the stream if any consumer throws an error */ @@ -33,8 +35,8 @@ export const DEFAULT_PRESSURE_LIMITS = { * native JS streams or async iterators. * This is handy for environments such as React Native which need polyfills for the above. */ -export class DataStream extends BaseObserver> { - dataQueue: Data[]; +export class DataStream extends BaseObserver> { + dataQueue: SourceData[]; protected isClosed: boolean; @@ -43,11 +45,14 @@ export class DataStream extends BaseObserver ParsedData; + + constructor(protected options?: DataStreamOptions) { super(); this.processingPromise = null; this.isClosed = false; this.dataQueue = []; + this.mapLine = options?.mapLine ?? ((line) => line as any); this.logger = options?.logger ?? Logger.get('DataStream'); @@ -85,7 +90,7 @@ export class DataStream extends BaseObserver extends BaseObserver { + async read(): Promise { if (this.closed) { return null; } @@ -129,7 +134,7 @@ export class DataStream extends BaseObserver) { + forEach(callback: DataStreamCallback) { if (this.dataQueue.length <= this.lowWatermark) { this.iterateAsyncErrored(async (l) => l.lowWater?.()); } @@ -139,20 +144,11 @@ export class DataStream extends BaseObserver= this.highWatermark) { - await this.iterateAsyncErrored(async (l) => l.highWater?.()); - } - const promise = (this.processingPromise = this._processQueue()); promise.finally(() => { return (this.processingPromise = null); @@ -160,37 +156,28 @@ export class DataStream extends BaseObserver(callback: (data: Data) => ReturnData): DataStream { - const stream = new DataStream(this.options); - const l = this.registerListener({ - data: async (data) => { - stream.enqueueData(callback(data)); - }, - closed: () => { - stream.close(); - l?.(); - } - }); - - return stream; - } - protected hasDataReader() { return Array.from(this.listeners.values()).some((l) => !!l.data); } protected async _processQueue() { + /** + * Allow listeners to mutate the queue before processing. + * This allows for operations such as dropping or compressing data + * on high water or requesting more data on low water. + */ + if (this.dataQueue.length >= this.highWatermark) { + await this.iterateAsyncErrored(async (l) => l.highWater?.()); + } + if (this.isClosed || !this.hasDataReader()) { - await Promise.resolve(); return; } if (this.dataQueue.length) { const data = this.dataQueue.shift()!; - await this.iterateAsyncErrored(async (l) => l.data?.(data)); + const mapped = this.mapLine(data); + await this.iterateAsyncErrored(async (l) => l.data?.(mapped)); } if (this.dataQueue.length <= this.lowWatermark) { @@ -202,14 +189,17 @@ export class DataStream extends BaseObserver 0) { // Next tick setTimeout(() => this.processQueue()); } } - protected async iterateAsyncErrored(cb: (l: BaseListener) => Promise) { - for (let i of Array.from(this.listeners.values())) { + protected async iterateAsyncErrored(cb: (l: Partial>) => Promise) { + // Important: We need to copy the listeners, as calling a listener could result in adding another + // listener, resulting in infinite loops. + const listeners = Array.from(this.listeners.values()); + for (let i of listeners) { try { await cb(i); } catch (ex) { diff --git a/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts b/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts index b27bbae52..e3d470269 100644 --- a/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts +++ b/packages/react-native/src/sync/bucket/ReactNativeBucketStorageAdapter.ts @@ -1,13 +1,24 @@ import { PowerSyncControlCommand, SqliteBucketStorage } from '@powersync/common'; export class ReactNativeBucketStorageAdapter extends SqliteBucketStorage { - control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise { - if (payload != null && typeof payload != 'string') { - // For some reason, we need to copy array buffers for RNQS to recognize them. We're doing that here because we - // don't want to pay the cost of a copy on platforms where it's not necessary. - payload = new Uint8Array(payload).buffer; + control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise { + if (payload instanceof Uint8Array) { + // RNQS doesn't accept Uint8Array arguments - convert to ArrayBuffer first. + payload = uint8ArrayToArrayBuffer(payload); } return super.control(op, payload); } } + +function uint8ArrayToArrayBuffer(array: Uint8Array): ArrayBuffer { + // SharedArrayBuffer isn't defined on ReactNative, so don't need to cater for that. + const arrayBuffer = array.buffer as ArrayBuffer; + if (array.byteOffset == 0 && array.byteLength == arrayBuffer.byteLength) { + // No copying needed - can use ArrayBuffer as-is + return arrayBuffer; + } else { + // Need to make a copy + return arrayBuffer.slice(array.byteOffset, array.byteOffset + array.byteLength); + } +}