Skip to content

Commit f3d016c

Browse files
committed
Introduced fetchStrategy option to connect, allowing you to choose either buffered or sequential for the Websocket connect option.
1 parent 0a8e7b9 commit f3d016c

File tree

5 files changed

+59
-25
lines changed

5 files changed

+59
-25
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/react-native': minor
3+
'@powersync/common': minor
4+
---
5+
6+
Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance.

.changeset/shiny-boxes-prove.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ export type RemoteConnector = {
2121

2222
// Refresh at least 30 sec before it expires
2323
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
24-
const SYNC_QUEUE_REQUEST_N = 1;
24+
// const SYNC_QUEUE_REQUEST_N = 1; // TODO CL resolve
2525
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2626

2727
// Keep alive message is sent every period
28-
const KEEP_ALIVE_MS = 60_000;
28+
const KEEP_ALIVE_MS = 20_000;
2929
// The ACK must be received in this period
30-
const KEEP_ALIVE_LIFETIME_MS = 90_000;
30+
const KEEP_ALIVE_LIFETIME_MS = 30_000;
3131

3232
export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote');
3333

@@ -39,6 +39,24 @@ export type SyncStreamOptions = {
3939
fetchOptions?: Request;
4040
};
4141

42+
export enum FetchStrategy {
43+
/**
44+
* Queues multiple sync events before processing, reducing round-trips.
45+
* This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets.
46+
*/
47+
Buffered = 'buffered',
48+
49+
/**
50+
* Processes each sync event immediately before requesting the next.
51+
* This reduces processing overhead and improves real-time responsiveness.
52+
*/
53+
Sequential = 'sequential'
54+
}
55+
56+
export type SocketSyncStreamOptions = SyncStreamOptions & {
57+
fetchStrategy: FetchStrategy;
58+
};
59+
4260
export type FetchImplementation = typeof fetch;
4361

4462
/**
@@ -216,8 +234,10 @@ export abstract class AbstractRemote {
216234
/**
217235
* Connects to the sync/stream websocket endpoint
218236
*/
219-
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
220-
const { path } = options;
237+
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
238+
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
239+
240+
const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1;
221241
const request = await this.buildRequest(path);
222242

223243
const bson = await this.getBSON();
@@ -277,7 +297,7 @@ export abstract class AbstractRemote {
277297
// Helps to prevent double close scenarios
278298
rsocket.onClose(() => (socketIsClosed = true));
279299
// We initially request this amount and expect these to arrive eventually
280-
let pendingEventsCount = SYNC_QUEUE_REQUEST_N;
300+
let pendingEventsCount = syncQueueRequestSize;
281301

282302
const disposeClosedListener = stream.registerListener({
283303
closed: () => {
@@ -298,7 +318,7 @@ export abstract class AbstractRemote {
298318
})
299319
)
300320
},
301-
SYNC_QUEUE_REQUEST_N, // The initial N amount
321+
syncQueueRequestSize, // The initial N amount
302322
{
303323
onError: (e) => {
304324
// Don't log closed as an error
@@ -340,10 +360,10 @@ export abstract class AbstractRemote {
340360
const l = stream.registerListener({
341361
lowWater: async () => {
342362
// Request to fill up the queue
343-
const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount;
363+
const required = syncQueueRequestSize - pendingEventsCount;
344364
if (required > 0) {
345-
socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount);
346-
pendingEventsCount = SYNC_QUEUE_REQUEST_N;
365+
socket.request(syncQueueRequestSize - pendingEventsCount);
366+
pendingEventsCount = syncQueueRequestSize;
347367
}
348368
},
349369
closed: () => {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js';
77
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
88
import { CrudEntry } from '../bucket/CrudEntry.js';
99
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
10-
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js';
10+
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
1111
import {
1212
BucketRequest,
13+
StreamingSyncLine,
1314
StreamingSyncRequestParameterType,
1415
isStreamingKeepalive,
1516
isStreamingSyncCheckpoint,
1617
isStreamingSyncCheckpointComplete,
1718
isStreamingSyncCheckpointDiff,
1819
isStreamingSyncData
1920
} from './streaming-sync-types.js';
21+
import { DataStream } from 'src/utils/DataStream.js';
2022

2123
export enum LockType {
2224
CRUD = 'crud',
@@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener {
6769
*/
6870
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}
6971

70-
/** @internal */
72+
/** @internal */
7173
export interface BaseConnectionOptions {
7274
/**
7375
* The connection method to use when streaming updates from
@@ -76,13 +78,18 @@ export interface BaseConnectionOptions {
7678
*/
7779
connectionMethod?: SyncStreamConnectionMethod;
7880

81+
/**
82+
* The fetch strategy to use when streaming updates from the PowerSync backend instance.
83+
*/
84+
fetchStrategy?: FetchStrategy;
85+
7986
/**
8087
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
8188
*/
8289
params?: Record<string, StreamingSyncRequestParameterType>;
8390
}
8491

85-
/** @internal */
92+
/** @internal */
8693
export interface AdditionalConnectionOptions {
8794
/**
8895
* Delay for retrying sync streaming operations
@@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions {
97104
crudUploadThrottleMs?: number;
98105
}
99106

100-
101107
/** @internal */
102-
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>
108+
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;
103109

104110
export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
105111
/**
@@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required<BaseConnectionOptions>
134140

135141
export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
136142
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
143+
fetchStrategy: FetchStrategy.Buffered,
137144
params: {}
138145
};
139146

@@ -496,10 +503,15 @@ The next upload iteration will be delayed.`);
496503
}
497504
};
498505

499-
const stream =
500-
resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP
501-
? await this.options.remote.postStream(syncOptions)
502-
: await this.options.remote.socketStream(syncOptions);
506+
let stream: DataStream<StreamingSyncLine>;
507+
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
508+
stream = await this.options.remote.postStream(syncOptions);
509+
} else {
510+
stream = await this.options.remote.socketStream({
511+
...syncOptions,
512+
...{ fetchStrategy: resolvedOptions.fetchStrategy }
513+
});
514+
}
503515

504516
this.logger.debug('Stream established. Processing events');
505517

packages/react-native/src/sync/stream/ReactNativeRemote.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
FetchImplementation,
1010
FetchImplementationProvider,
1111
RemoteConnector,
12+
SocketSyncStreamOptions,
1213
StreamingSyncLine,
1314
SyncStreamOptions
1415
} from '@powersync/common';
@@ -56,7 +57,7 @@ export class ReactNativeRemote extends AbstractRemote {
5657
return BSON;
5758
}
5859

59-
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
60+
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
6061
return super.socketStream(options);
6162
}
6263

0 commit comments

Comments
 (0)