Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/angry-pumpkins-double.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
---

Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance.
35 changes: 27 additions & 8 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export type RemoteConnector = {

// Refresh at least 30 sec before it expires
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
const SYNC_QUEUE_REQUEST_N = 10;
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;

// Keep alive message is sent every period
Expand All @@ -39,6 +38,24 @@ export type SyncStreamOptions = {
fetchOptions?: Request;
};

export enum FetchStrategy {
/**
* Queues multiple sync events before processing, reducing round-trips.
* This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets.
*/
Buffered = 'buffered',

/**
* Processes each sync event immediately before requesting the next.
* This reduces processing overhead and improves real-time responsiveness.
*/
Sequential = 'sequential'
}

export type SocketSyncStreamOptions = SyncStreamOptions & {
fetchStrategy: FetchStrategy;
};

export type FetchImplementation = typeof fetch;

/**
Expand Down Expand Up @@ -216,8 +233,10 @@ export abstract class AbstractRemote {
/**
* Connects to the sync/stream websocket endpoint
*/
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const { path } = options;
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const { path, fetchStrategy = FetchStrategy.Buffered } = options;

const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1;
const request = await this.buildRequest(path);

const bson = await this.getBSON();
Expand Down Expand Up @@ -277,7 +296,7 @@ export abstract class AbstractRemote {
// Helps to prevent double close scenarios
rsocket.onClose(() => (socketIsClosed = true));
// We initially request this amount and expect these to arrive eventually
let pendingEventsCount = SYNC_QUEUE_REQUEST_N;
let pendingEventsCount = syncQueueRequestSize;

const disposeClosedListener = stream.registerListener({
closed: () => {
Expand All @@ -298,7 +317,7 @@ export abstract class AbstractRemote {
})
)
},
SYNC_QUEUE_REQUEST_N, // The initial N amount
syncQueueRequestSize, // The initial N amount
{
onError: (e) => {
// Don't log closed as an error
Expand Down Expand Up @@ -340,10 +359,10 @@ export abstract class AbstractRemote {
const l = stream.registerListener({
lowWater: async () => {
// Request to fill up the queue
const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount;
const required = syncQueueRequestSize - pendingEventsCount;
if (required > 0) {
socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount);
pendingEventsCount = SYNC_QUEUE_REQUEST_N;
socket.request(syncQueueRequestSize - pendingEventsCount);
pendingEventsCount = syncQueueRequestSize;
}
},
closed: () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js';
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
import {
BucketRequest,
StreamingSyncLine,
StreamingSyncRequestParameterType,
isStreamingKeepalive,
isStreamingSyncCheckpoint,
isStreamingSyncCheckpointComplete,
isStreamingSyncCheckpointDiff,
isStreamingSyncData
} from './streaming-sync-types.js';
import { DataStream } from 'src/utils/DataStream.js';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener {
*/
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}

/** @internal */
/** @internal */
export interface BaseConnectionOptions {
/**
* The connection method to use when streaming updates from
Expand All @@ -76,13 +78,18 @@ export interface BaseConnectionOptions {
*/
connectionMethod?: SyncStreamConnectionMethod;

/**
* The fetch strategy to use when streaming updates from the PowerSync backend instance.
*/
fetchStrategy?: FetchStrategy;

/**
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
*/
params?: Record<string, StreamingSyncRequestParameterType>;
}

/** @internal */
/** @internal */
export interface AdditionalConnectionOptions {
/**
* Delay for retrying sync streaming operations
Expand All @@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions {
crudUploadThrottleMs?: number;
}


/** @internal */
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;

export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
/**
Expand Down Expand Up @@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required<BaseConnectionOptions>

export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
fetchStrategy: FetchStrategy.Buffered,
params: {}
};

Expand Down Expand Up @@ -496,10 +503,15 @@ The next upload iteration will be delayed.`);
}
};

const stream =
resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP
? await this.options.remote.postStream(syncOptions)
: await this.options.remote.socketStream(syncOptions);
let stream: DataStream<StreamingSyncLine>;
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
stream = await this.options.remote.postStream(syncOptions);
} else {
stream = await this.options.remote.socketStream({
...syncOptions,
...{ fetchStrategy: resolvedOptions.fetchStrategy }
});
}

this.logger.debug('Stream established. Processing events');

Expand Down
3 changes: 2 additions & 1 deletion packages/react-native/src/sync/stream/ReactNativeRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
FetchImplementation,
FetchImplementationProvider,
RemoteConnector,
SocketSyncStreamOptions,
StreamingSyncLine,
SyncStreamOptions
} from '@powersync/common';
Expand Down Expand Up @@ -56,7 +57,7 @@ export class ReactNativeRemote extends AbstractRemote {
return BSON;
}

async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
return super.socketStream(options);
}

Expand Down
Loading