From b2ac1a73334e830adecb5d9f504e192d4e273f74 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Fri, 7 Feb 2025 09:36:09 +0200 Subject: [PATCH 1/3] test: make queue size less --- packages/common/src/client/sync/stream/AbstractRemote.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index abfba091e..88e12a45f 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -21,13 +21,13 @@ export type RemoteConnector = { // Refresh at least 30 sec before it expires const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000; -const SYNC_QUEUE_REQUEST_N = 10; +const SYNC_QUEUE_REQUEST_N = 1; const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period -const KEEP_ALIVE_MS = 20_000; +const KEEP_ALIVE_MS = 60_000; // The ACK must be received in this period -const KEEP_ALIVE_LIFETIME_MS = 30_000; +const KEEP_ALIVE_LIFETIME_MS = 90_000; export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote'); From 0a8e7b9b1124736006411f5c550dac6b51be5570 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 7 Feb 2025 10:05:54 +0200 Subject: [PATCH 2/3] changeset entry. --- .changeset/shiny-boxes-prove.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/shiny-boxes-prove.md diff --git a/.changeset/shiny-boxes-prove.md b/.changeset/shiny-boxes-prove.md new file mode 100644 index 000000000..6b2aaab8c --- /dev/null +++ b/.changeset/shiny-boxes-prove.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Changed WebSocket sync queue by reducing pending events from 10 to 1, improving known keepalive issues with minimal impact on sync performance. From 03ec45e698b82b6c3bdc2198d4eeb8e10e249dc5 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 7 Feb 2025 15:38:32 +0200 Subject: [PATCH 3/3] Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. --- .changeset/angry-pumpkins-double.md | 6 +++ .changeset/shiny-boxes-prove.md | 5 --- .../src/client/sync/stream/AbstractRemote.ts | 39 ++++++++++++++----- .../AbstractStreamingSyncImplementation.ts | 30 +++++++++----- .../src/sync/stream/ReactNativeRemote.ts | 3 +- 5 files changed, 58 insertions(+), 25 deletions(-) create mode 100644 .changeset/angry-pumpkins-double.md delete mode 100644 .changeset/shiny-boxes-prove.md diff --git a/.changeset/angry-pumpkins-double.md b/.changeset/angry-pumpkins-double.md new file mode 100644 index 000000000..cb0b6a95b --- /dev/null +++ b/.changeset/angry-pumpkins-double.md @@ -0,0 +1,6 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +--- + +Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance. diff --git a/.changeset/shiny-boxes-prove.md b/.changeset/shiny-boxes-prove.md deleted file mode 100644 index 6b2aaab8c..000000000 --- a/.changeset/shiny-boxes-prove.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@powersync/common': patch ---- - -Changed WebSocket sync queue by reducing pending events from 10 to 1, improving known keepalive issues with minimal impact on sync performance. diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 88e12a45f..228e8e99d 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -21,13 +21,12 @@ export type RemoteConnector = { // Refresh at least 30 sec before it expires const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000; -const SYNC_QUEUE_REQUEST_N = 1; const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period -const KEEP_ALIVE_MS = 60_000; +const KEEP_ALIVE_MS = 20_000; // The ACK must be received in this period -const KEEP_ALIVE_LIFETIME_MS = 90_000; +const KEEP_ALIVE_LIFETIME_MS = 30_000; export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote'); @@ -39,6 +38,24 @@ export type SyncStreamOptions = { fetchOptions?: Request; }; +export enum FetchStrategy { + /** + * Queues multiple sync events before processing, reducing round-trips. + * This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets. + */ + Buffered = 'buffered', + + /** + * Processes each sync event immediately before requesting the next. + * This reduces processing overhead and improves real-time responsiveness. + */ + Sequential = 'sequential' +} + +export type SocketSyncStreamOptions = SyncStreamOptions & { + fetchStrategy: FetchStrategy; +}; + export type FetchImplementation = typeof fetch; /** @@ -216,8 +233,10 @@ export abstract class AbstractRemote { /** * Connects to the sync/stream websocket endpoint */ - async socketStream(options: SyncStreamOptions): Promise> { - const { path } = options; + async socketStream(options: SocketSyncStreamOptions): Promise> { + const { path, fetchStrategy = FetchStrategy.Buffered } = options; + + const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1; const request = await this.buildRequest(path); const bson = await this.getBSON(); @@ -277,7 +296,7 @@ export abstract class AbstractRemote { // Helps to prevent double close scenarios rsocket.onClose(() => (socketIsClosed = true)); // We initially request this amount and expect these to arrive eventually - let pendingEventsCount = SYNC_QUEUE_REQUEST_N; + let pendingEventsCount = syncQueueRequestSize; const disposeClosedListener = stream.registerListener({ closed: () => { @@ -298,7 +317,7 @@ export abstract class AbstractRemote { }) ) }, - SYNC_QUEUE_REQUEST_N, // The initial N amount + syncQueueRequestSize, // The initial N amount { onError: (e) => { // Don't log closed as an error @@ -340,10 +359,10 @@ export abstract class AbstractRemote { const l = stream.registerListener({ lowWater: async () => { // Request to fill up the queue - const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount; + const required = syncQueueRequestSize - pendingEventsCount; if (required > 0) { - socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount); - pendingEventsCount = SYNC_QUEUE_REQUEST_N; + socket.request(syncQueueRequestSize - pendingEventsCount); + pendingEventsCount = syncQueueRequestSize; } }, closed: () => { diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 2d0c8f624..5d8440557 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -7,9 +7,10 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js'; import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; -import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js'; +import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js'; import { BucketRequest, + StreamingSyncLine, StreamingSyncRequestParameterType, isStreamingKeepalive, isStreamingSyncCheckpoint, @@ -17,6 +18,7 @@ import { isStreamingSyncCheckpointDiff, isStreamingSyncData } from './streaming-sync-types.js'; +import { DataStream } from 'src/utils/DataStream.js'; export enum LockType { CRUD = 'crud', @@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener { */ export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {} - /** @internal */ +/** @internal */ export interface BaseConnectionOptions { /** * The connection method to use when streaming updates from @@ -76,13 +78,18 @@ export interface BaseConnectionOptions { */ connectionMethod?: SyncStreamConnectionMethod; + /** + * The fetch strategy to use when streaming updates from the PowerSync backend instance. + */ + fetchStrategy?: FetchStrategy; + /** * These parameters are passed to the sync rules, and will be available under the`user_parameters` object. */ params?: Record; } - /** @internal */ +/** @internal */ export interface AdditionalConnectionOptions { /** * Delay for retrying sync streaming operations @@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions { crudUploadThrottleMs?: number; } - /** @internal */ -export type RequiredAdditionalConnectionOptions = Required +export type RequiredAdditionalConnectionOptions = Required; export interface StreamingSyncImplementation extends BaseObserver, Disposable { /** @@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = { connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, + fetchStrategy: FetchStrategy.Buffered, params: {} }; @@ -496,10 +503,15 @@ The next upload iteration will be delayed.`); } }; - const stream = - resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP - ? await this.options.remote.postStream(syncOptions) - : await this.options.remote.socketStream(syncOptions); + let stream: DataStream; + if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) { + stream = await this.options.remote.postStream(syncOptions); + } else { + stream = await this.options.remote.socketStream({ + ...syncOptions, + ...{ fetchStrategy: resolvedOptions.fetchStrategy } + }); + } this.logger.debug('Stream established. Processing events'); diff --git a/packages/react-native/src/sync/stream/ReactNativeRemote.ts b/packages/react-native/src/sync/stream/ReactNativeRemote.ts index 2f6f8e52a..3ed14fb6e 100644 --- a/packages/react-native/src/sync/stream/ReactNativeRemote.ts +++ b/packages/react-native/src/sync/stream/ReactNativeRemote.ts @@ -9,6 +9,7 @@ import { FetchImplementation, FetchImplementationProvider, RemoteConnector, + SocketSyncStreamOptions, StreamingSyncLine, SyncStreamOptions } from '@powersync/common'; @@ -56,7 +57,7 @@ export class ReactNativeRemote extends AbstractRemote { return BSON; } - async socketStream(options: SyncStreamOptions): Promise> { + async socketStream(options: SocketSyncStreamOptions): Promise> { return super.socketStream(options); }