Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/angry-pumpkins-double.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
---

Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance.
35 changes: 27 additions & 8 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export type RemoteConnector = {

// Refresh at least 30 sec before it expires
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
const SYNC_QUEUE_REQUEST_N = 10;
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;

// Keep alive message is sent every period
Expand All @@ -39,6 +38,24 @@ export type SyncStreamOptions = {
fetchOptions?: Request;
};

export enum FetchStrategy {
/**
* Queues multiple sync events before processing, reducing round-trips.
* This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets.
*/
Buffered = 'buffered',

/**
* Processes each sync event immediately before requesting the next.
* This reduces processing overhead and improves real-time responsiveness.
*/
Sequential = 'sequential'
}

export type SocketSyncStreamOptions = SyncStreamOptions & {
fetchStrategy: FetchStrategy;
};

export type FetchImplementation = typeof fetch;

/**
Expand Down Expand Up @@ -216,8 +233,10 @@ export abstract class AbstractRemote {
/**
* Connects to the sync/stream websocket endpoint
*/
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const { path } = options;
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const { path, fetchStrategy = FetchStrategy.Buffered } = options;

const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1;
const request = await this.buildRequest(path);

const bson = await this.getBSON();
Expand Down Expand Up @@ -277,7 +296,7 @@ export abstract class AbstractRemote {
// Helps to prevent double close scenarios
rsocket.onClose(() => (socketIsClosed = true));
// We initially request this amount and expect these to arrive eventually
let pendingEventsCount = SYNC_QUEUE_REQUEST_N;
let pendingEventsCount = syncQueueRequestSize;

const disposeClosedListener = stream.registerListener({
closed: () => {
Expand All @@ -298,7 +317,7 @@ export abstract class AbstractRemote {
})
)
},
SYNC_QUEUE_REQUEST_N, // The initial N amount
syncQueueRequestSize, // The initial N amount
{
onError: (e) => {
// Don't log closed as an error
Expand Down Expand Up @@ -340,10 +359,10 @@ export abstract class AbstractRemote {
const l = stream.registerListener({
lowWater: async () => {
// Request to fill up the queue
const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount;
const required = syncQueueRequestSize - pendingEventsCount;
if (required > 0) {
socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount);
pendingEventsCount = SYNC_QUEUE_REQUEST_N;
socket.request(syncQueueRequestSize - pendingEventsCount);
pendingEventsCount = syncQueueRequestSize;
}
},
closed: () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js';
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
import {
BucketRequest,
StreamingSyncLine,
StreamingSyncRequestParameterType,
isStreamingKeepalive,
isStreamingSyncCheckpoint,
isStreamingSyncCheckpointComplete,
isStreamingSyncCheckpointDiff,
isStreamingSyncData
} from './streaming-sync-types.js';
import { DataStream } from 'src/utils/DataStream.js';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener {
*/
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}

/** @internal */
/** @internal */
export interface BaseConnectionOptions {
/**
* The connection method to use when streaming updates from
Expand All @@ -76,13 +78,18 @@ export interface BaseConnectionOptions {
*/
connectionMethod?: SyncStreamConnectionMethod;

/**
* The fetch strategy to use when streaming updates from the PowerSync backend instance.
*/
fetchStrategy?: FetchStrategy;

/**
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
*/
params?: Record<string, StreamingSyncRequestParameterType>;
}

/** @internal */
/** @internal */
export interface AdditionalConnectionOptions {
/**
* Delay for retrying sync streaming operations
Expand All @@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions {
crudUploadThrottleMs?: number;
}


/** @internal */
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;

export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
/**
Expand Down Expand Up @@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required<BaseConnectionOptions>

export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
fetchStrategy: FetchStrategy.Buffered,
params: {}
};

Expand Down Expand Up @@ -496,10 +503,15 @@ The next upload iteration will be delayed.`);
}
};

const stream =
resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP
? await this.options.remote.postStream(syncOptions)
: await this.options.remote.socketStream(syncOptions);
let stream: DataStream<StreamingSyncLine>;
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
stream = await this.options.remote.postStream(syncOptions);
} else {
stream = await this.options.remote.socketStream({
...syncOptions,
...{ fetchStrategy: resolvedOptions.fetchStrategy }
});
}

this.logger.debug('Stream established. Processing events');

Expand Down
3 changes: 2 additions & 1 deletion packages/react-native/src/sync/stream/ReactNativeRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
FetchImplementation,
FetchImplementationProvider,
RemoteConnector,
SocketSyncStreamOptions,
StreamingSyncLine,
SyncStreamOptions
} from '@powersync/common';
Expand Down Expand Up @@ -56,7 +57,7 @@ export class ReactNativeRemote extends AbstractRemote {
return BSON;
}

async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
return super.socketStream(options);
}

Expand Down