Skip to content

Commit 71b777c

Browse files
Merge branch 'main' into fix-tests
2 parents f2ad5a7 + e190454 commit 71b777c

File tree

23 files changed

+127
-57
lines changed

23 files changed

+127
-57
lines changed

.changeset/calm-pans-worry.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

.changeset/lovely-impalas-do.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

.changeset/slow-spiders-smash.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

.changeset/spotty-students-serve.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

.changeset/swift-seahorses-help.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

packages/common/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# @powersync/common
22

3+
## 1.24.0
4+
5+
### Minor Changes
6+
7+
- 893d42b: Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance.
8+
- 0606ac2: add 'connecting' flag to SyncStatus
9+
310
## 1.23.0
411

512
### Minor Changes

packages/common/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@powersync/common",
3-
"version": "1.23.0",
3+
"version": "1.24.0",
44
"publishConfig": {
55
"registry": "https://registry.npmjs.org/",
66
"access": "public"

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ const POWERSYNC_JS_VERSION = PACKAGE.version;
2222

2323
// Refresh at least 30 sec before it expires
2424
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
25-
const SYNC_QUEUE_REQUEST_N = 10;
2625
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2726

2827
// Keep alive message is sent every period
@@ -40,6 +39,24 @@ export type SyncStreamOptions = {
4039
fetchOptions?: Request;
4140
};
4241

42+
export enum FetchStrategy {
43+
/**
44+
* Queues multiple sync events before processing, reducing round-trips.
45+
* This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets.
46+
*/
47+
Buffered = 'buffered',
48+
49+
/**
50+
* Processes each sync event immediately before requesting the next.
51+
* This reduces processing overhead and improves real-time responsiveness.
52+
*/
53+
Sequential = 'sequential'
54+
}
55+
56+
export type SocketSyncStreamOptions = SyncStreamOptions & {
57+
fetchStrategy: FetchStrategy;
58+
};
59+
4360
export type FetchImplementation = typeof fetch;
4461

4562
/**
@@ -217,8 +234,10 @@ export abstract class AbstractRemote {
217234
/**
218235
* Connects to the sync/stream websocket endpoint
219236
*/
220-
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
221-
const { path } = options;
237+
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
238+
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
239+
240+
const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1;
222241
const request = await this.buildRequest(path);
223242

224243
const bson = await this.getBSON();
@@ -278,7 +297,7 @@ export abstract class AbstractRemote {
278297
// Helps to prevent double close scenarios
279298
rsocket.onClose(() => (socketIsClosed = true));
280299
// We initially request this amount and expect these to arrive eventually
281-
let pendingEventsCount = SYNC_QUEUE_REQUEST_N;
300+
let pendingEventsCount = syncQueueRequestSize;
282301

283302
const disposeClosedListener = stream.registerListener({
284303
closed: () => {
@@ -299,7 +318,7 @@ export abstract class AbstractRemote {
299318
})
300319
)
301320
},
302-
SYNC_QUEUE_REQUEST_N, // The initial N amount
321+
syncQueueRequestSize, // The initial N amount
303322
{
304323
onError: (e) => {
305324
// Don't log closed as an error
@@ -341,10 +360,10 @@ export abstract class AbstractRemote {
341360
const l = stream.registerListener({
342361
lowWater: async () => {
343362
// Request to fill up the queue
344-
const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount;
363+
const required = syncQueueRequestSize - pendingEventsCount;
345364
if (required > 0) {
346-
socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount);
347-
pendingEventsCount = SYNC_QUEUE_REQUEST_N;
365+
socket.request(syncQueueRequestSize - pendingEventsCount);
366+
pendingEventsCount = syncQueueRequestSize;
348367
}
349368
},
350369
closed: () => {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js';
77
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
88
import { CrudEntry } from '../bucket/CrudEntry.js';
99
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
10-
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js';
10+
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
1111
import {
1212
BucketRequest,
13+
StreamingSyncLine,
1314
StreamingSyncRequestParameterType,
1415
isStreamingKeepalive,
1516
isStreamingSyncCheckpoint,
1617
isStreamingSyncCheckpointComplete,
1718
isStreamingSyncCheckpointDiff,
1819
isStreamingSyncData
1920
} from './streaming-sync-types.js';
21+
import { DataStream } from 'src/utils/DataStream.js';
2022

2123
export enum LockType {
2224
CRUD = 'crud',
@@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener {
6769
*/
6870
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}
6971

70-
/** @internal */
72+
/** @internal */
7173
export interface BaseConnectionOptions {
7274
/**
7375
* The connection method to use when streaming updates from
@@ -76,13 +78,18 @@ export interface BaseConnectionOptions {
7678
*/
7779
connectionMethod?: SyncStreamConnectionMethod;
7880

81+
/**
82+
* The fetch strategy to use when streaming updates from the PowerSync backend instance.
83+
*/
84+
fetchStrategy?: FetchStrategy;
85+
7986
/**
8087
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
8188
*/
8289
params?: Record<string, StreamingSyncRequestParameterType>;
8390
}
8491

85-
/** @internal */
92+
/** @internal */
8693
export interface AdditionalConnectionOptions {
8794
/**
8895
* Delay for retrying sync streaming operations
@@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions {
97104
crudUploadThrottleMs?: number;
98105
}
99106

100-
101107
/** @internal */
102-
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>
108+
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;
103109

104110
export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
105111
/**
@@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required<BaseConnectionOptions>
134140

135141
export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
136142
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
143+
fetchStrategy: FetchStrategy.Buffered,
137144
params: {}
138145
};
139146

@@ -496,10 +503,15 @@ The next upload iteration will be delayed.`);
496503
}
497504
};
498505

499-
const stream =
500-
resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP
501-
? await this.options.remote.postStream(syncOptions)
502-
: await this.options.remote.socketStream(syncOptions);
506+
let stream: DataStream<StreamingSyncLine>;
507+
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
508+
stream = await this.options.remote.postStream(syncOptions);
509+
} else {
510+
stream = await this.options.remote.socketStream({
511+
...syncOptions,
512+
...{ fetchStrategy: resolvedOptions.fetchStrategy }
513+
});
514+
}
503515

504516
this.logger.debug('Stream established. Processing events');
505517

packages/drizzle-driver/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
"test": "vitest"
2727
},
2828
"peerDependencies": {
29-
"@powersync/common": "workspace:^1.23.0",
29+
"@powersync/common": "workspace:^1.24.0",
3030
"drizzle-orm": "<1.0.0"
3131
},
3232
"devDependencies": {

0 commit comments

Comments
 (0)