diff --git a/.changeset/angry-pumpkins-double.md b/.changeset/angry-pumpkins-double.md new file mode 100644 index 000000000..cb0b6a95b --- /dev/null +++ b/.changeset/angry-pumpkins-double.md @@ -0,0 +1,6 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +--- + +Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance. diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index abfba091e..228e8e99d 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -21,7 +21,6 @@ export type RemoteConnector = { // Refresh at least 30 sec before it expires const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000; -const SYNC_QUEUE_REQUEST_N = 10; const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period @@ -39,6 +38,24 @@ export type SyncStreamOptions = { fetchOptions?: Request; }; +export enum FetchStrategy { + /** + * Queues multiple sync events before processing, reducing round-trips. + * This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets. + */ + Buffered = 'buffered', + + /** + * Processes each sync event immediately before requesting the next. + * This reduces processing overhead and improves real-time responsiveness. + */ + Sequential = 'sequential' +} + +export type SocketSyncStreamOptions = SyncStreamOptions & { + fetchStrategy: FetchStrategy; +}; + export type FetchImplementation = typeof fetch; /** @@ -216,8 +233,10 @@ export abstract class AbstractRemote { /** * Connects to the sync/stream websocket endpoint */ - async socketStream(options: SyncStreamOptions): Promise> { - const { path } = options; + async socketStream(options: SocketSyncStreamOptions): Promise> { + const { path, fetchStrategy = FetchStrategy.Buffered } = options; + + const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1; const request = await this.buildRequest(path); const bson = await this.getBSON(); @@ -277,7 +296,7 @@ export abstract class AbstractRemote { // Helps to prevent double close scenarios rsocket.onClose(() => (socketIsClosed = true)); // We initially request this amount and expect these to arrive eventually - let pendingEventsCount = SYNC_QUEUE_REQUEST_N; + let pendingEventsCount = syncQueueRequestSize; const disposeClosedListener = stream.registerListener({ closed: () => { @@ -298,7 +317,7 @@ export abstract class AbstractRemote { }) ) }, - SYNC_QUEUE_REQUEST_N, // The initial N amount + syncQueueRequestSize, // The initial N amount { onError: (e) => { // Don't log closed as an error @@ -340,10 +359,10 @@ export abstract class AbstractRemote { const l = stream.registerListener({ lowWater: async () => { // Request to fill up the queue - const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount; + const required = syncQueueRequestSize - pendingEventsCount; if (required > 0) { - socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount); - pendingEventsCount = SYNC_QUEUE_REQUEST_N; + socket.request(syncQueueRequestSize - pendingEventsCount); + pendingEventsCount = syncQueueRequestSize; } }, closed: () => { diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 2d0c8f624..5d8440557 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -7,9 +7,10 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js'; import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; -import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js'; +import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js'; import { BucketRequest, + StreamingSyncLine, StreamingSyncRequestParameterType, isStreamingKeepalive, isStreamingSyncCheckpoint, @@ -17,6 +18,7 @@ import { isStreamingSyncCheckpointDiff, isStreamingSyncData } from './streaming-sync-types.js'; +import { DataStream } from 'src/utils/DataStream.js'; export enum LockType { CRUD = 'crud', @@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener { */ export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {} - /** @internal */ +/** @internal */ export interface BaseConnectionOptions { /** * The connection method to use when streaming updates from @@ -76,13 +78,18 @@ export interface BaseConnectionOptions { */ connectionMethod?: SyncStreamConnectionMethod; + /** + * The fetch strategy to use when streaming updates from the PowerSync backend instance. + */ + fetchStrategy?: FetchStrategy; + /** * These parameters are passed to the sync rules, and will be available under the`user_parameters` object. */ params?: Record; } - /** @internal */ +/** @internal */ export interface AdditionalConnectionOptions { /** * Delay for retrying sync streaming operations @@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions { crudUploadThrottleMs?: number; } - /** @internal */ -export type RequiredAdditionalConnectionOptions = Required +export type RequiredAdditionalConnectionOptions = Required; export interface StreamingSyncImplementation extends BaseObserver, Disposable { /** @@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = { connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, + fetchStrategy: FetchStrategy.Buffered, params: {} }; @@ -496,10 +503,15 @@ The next upload iteration will be delayed.`); } }; - const stream = - resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP - ? await this.options.remote.postStream(syncOptions) - : await this.options.remote.socketStream(syncOptions); + let stream: DataStream; + if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) { + stream = await this.options.remote.postStream(syncOptions); + } else { + stream = await this.options.remote.socketStream({ + ...syncOptions, + ...{ fetchStrategy: resolvedOptions.fetchStrategy } + }); + } this.logger.debug('Stream established. Processing events'); diff --git a/packages/react-native/src/sync/stream/ReactNativeRemote.ts b/packages/react-native/src/sync/stream/ReactNativeRemote.ts index 2f6f8e52a..3ed14fb6e 100644 --- a/packages/react-native/src/sync/stream/ReactNativeRemote.ts +++ b/packages/react-native/src/sync/stream/ReactNativeRemote.ts @@ -9,6 +9,7 @@ import { FetchImplementation, FetchImplementationProvider, RemoteConnector, + SocketSyncStreamOptions, StreamingSyncLine, SyncStreamOptions } from '@powersync/common'; @@ -56,7 +57,7 @@ export class ReactNativeRemote extends AbstractRemote { return BSON; } - async socketStream(options: SyncStreamOptions): Promise> { + async socketStream(options: SocketSyncStreamOptions): Promise> { return super.socketStream(options); }