Skip to content

Commit c55dc01

Browse files
Merge remote-tracking branch 'origin/main' into watches
2 parents be9cc12 + ffe3095 commit c55dc01

File tree

18 files changed

+311
-287
lines changed

18 files changed

+311
-287
lines changed

.changeset/dull-mugs-carry.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@powersync/react-native': patch
3+
'@powersync/diagnostics-app': patch
4+
'@powersync/common': patch
5+
'@powersync/node': patch
6+
'@powersync/op-sqlite': patch
7+
'@powersync/web': patch
8+
---
9+
10+
Improve websocket keepalive logic to reduce keepalive errors.

.changeset/lazy-onions-dance.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/node': minor
3+
---
4+
5+
Upgrade undici and use the default undici errors for WebSockets.

.changeset/light-clocks-hang.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/common': patch
3+
'@powersync/node': patch
4+
'@powersync/web': patch
5+
'@powersync/react-native': patch
6+
---
7+
8+
Rust client: Properly upload CRUD entries made while offline.

demos/example-node/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"dependencies": {
1313
"@powersync/node": "workspace:*",
1414
"dotenv": "^16.4.7",
15-
"undici": "^7.10.0"
15+
"undici": "^7.11.0"
1616
},
1717
"devDependencies": {
1818
"ts-node": "^10.9.2",

packages/common/src/client/ConnectionManager.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
194194

195195
this.logger.debug('Attempting to connect to PowerSync instance');
196196
await this.syncStreamImplementation?.connect(appliedOptions!);
197-
this.syncStreamImplementation?.triggerCrudUpload();
198197
}
199198

200199
/**

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,5 @@ export interface BucketStorageAdapter extends BaseObserverInterface<BucketStorag
106106
/**
107107
* Invokes the `powersync_control` function for the sync client.
108108
*/
109-
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string>;
109+
control(op: PowerSyncControlCommand, payload: string | Uint8Array | null): Promise<string>;
110110
}

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
364364
// No-op for now
365365
}
366366

367-
async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
367+
async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
368368
return await this.writeTransaction(async (tx) => {
369369
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
370370
return raw;

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2424

2525
// Keep alive message is sent every period
2626
const KEEP_ALIVE_MS = 20_000;
27-
// The ACK must be received in this period
28-
const KEEP_ALIVE_LIFETIME_MS = 30_000;
27+
28+
// One message of any type must be received in this period.
29+
const SOCKET_TIMEOUT_MS = 30_000;
30+
31+
// One keepalive message must be received in this period.
32+
// If there is a backlog of messages (for example on slow connections), keepalive messages could be delayed
33+
// significantly. Therefore this is longer than the socket timeout.
34+
const KEEP_ALIVE_LIFETIME_MS = 90_000;
2935

3036
export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote');
3137

@@ -267,7 +273,7 @@ export abstract class AbstractRemote {
267273
*/
268274
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
269275
const bson = await this.getBSON();
270-
return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson);
276+
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
271277
}
272278

273279
/**
@@ -279,9 +285,9 @@ export abstract class AbstractRemote {
279285
*/
280286
async socketStreamRaw<T>(
281287
options: SocketSyncStreamOptions,
282-
map: (buffer: Buffer) => T,
288+
map: (buffer: Uint8Array) => T,
283289
bson?: typeof BSON
284-
): Promise<DataStream> {
290+
): Promise<DataStream<T>> {
285291
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
286292
const mimeType = bson == null ? 'application/json' : 'application/bson';
287293

@@ -304,12 +310,26 @@ export abstract class AbstractRemote {
304310
// automatically as a header.
305311
const userAgent = this.getUserAgent();
306312

313+
let keepAliveTimeout: any;
314+
const resetTimeout = () => {
315+
clearTimeout(keepAliveTimeout);
316+
keepAliveTimeout = setTimeout(() => {
317+
this.logger.error(`No data received on WebSocket in ${SOCKET_TIMEOUT_MS}ms, closing connection.`);
318+
stream.close();
319+
}, SOCKET_TIMEOUT_MS);
320+
};
321+
resetTimeout();
322+
307323
const url = this.options.socketUrlTransformer(request.url);
308324
const connector = new RSocketConnector({
309325
transport: new WebsocketClientTransport({
310326
url,
311327
wsCreator: (url) => {
312-
return this.createSocket(url);
328+
const socket = this.createSocket(url);
329+
socket.addEventListener('message', (event) => {
330+
resetTimeout();
331+
});
332+
return socket;
313333
}
314334
}),
315335
setup: {
@@ -332,18 +352,23 @@ export abstract class AbstractRemote {
332352
rsocket = await connector.connect();
333353
} catch (ex) {
334354
this.logger.error(`Failed to connect WebSocket`, ex);
355+
clearTimeout(keepAliveTimeout);
335356
throw ex;
336357
}
337358

338-
const stream = new DataStream({
359+
resetTimeout();
360+
361+
const stream = new DataStream<T, Uint8Array>({
339362
logger: this.logger,
340363
pressure: {
341364
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
342-
}
365+
},
366+
mapLine: map
343367
});
344368

345369
let socketIsClosed = false;
346370
const closeSocket = () => {
371+
clearTimeout(keepAliveTimeout);
347372
if (socketIsClosed) {
348373
return;
349374
}
@@ -411,7 +436,7 @@ export abstract class AbstractRemote {
411436
return;
412437
}
413438

414-
stream.enqueueData(map(data));
439+
stream.enqueueData(data);
415440
},
416441
onComplete: () => {
417442
stream.close();
@@ -537,8 +562,9 @@ export abstract class AbstractRemote {
537562
const decoder = new TextDecoder();
538563
let buffer = '';
539564

540-
const stream = new DataStream<T>({
541-
logger: this.logger
565+
const stream = new DataStream<T, string>({
566+
logger: this.logger,
567+
mapLine: mapLine
542568
});
543569

544570
const l = stream.registerListener({
@@ -550,7 +576,7 @@ export abstract class AbstractRemote {
550576
if (done) {
551577
const remaining = buffer.trim();
552578
if (remaining.length != 0) {
553-
stream.enqueueData(mapLine(remaining));
579+
stream.enqueueData(remaining);
554580
}
555581

556582
stream.close();
@@ -565,7 +591,7 @@ export abstract class AbstractRemote {
565591
for (var i = 0; i < lines.length - 1; i++) {
566592
var l = lines[i].trim();
567593
if (l.length > 0) {
568-
stream.enqueueData(mapLine(l));
594+
stream.enqueueData(l);
569595
didCompleteLine = true;
570596
}
571597
}

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -858,17 +858,15 @@ The next upload iteration will be delayed.`);
858858
const adapter = this.options.adapter;
859859
const remote = this.options.remote;
860860
let receivingLines: Promise<void> | null = null;
861+
let hadSyncLine = false;
861862

862863
const abortController = new AbortController();
863864
signal.addEventListener('abort', () => abortController.abort());
864865

865866
// Pending sync lines received from the service, as well as local events that trigger a powersync_control
866867
// invocation (local events include refreshed tokens and completed uploads).
867868
// This is a single data stream so that we can handle all control calls from a single place.
868-
let controlInvocations: DataStream<{
869-
command: PowerSyncControlCommand;
870-
payload?: ArrayBuffer | string;
871-
}> | null = null;
869+
let controlInvocations: DataStream<EnqueuedCommand, Uint8Array | EnqueuedCommand> | null = null;
872870

873871
async function connect(instr: EstablishSyncStream) {
874872
const syncOptions: SyncStreamOptions = {
@@ -878,20 +876,34 @@ The next upload iteration will be delayed.`);
878876
};
879877

880878
if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) {
881-
controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({
882-
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
883-
payload: line
884-
}));
879+
controlInvocations = await remote.postStreamRaw(syncOptions, (line: string | EnqueuedCommand) => {
880+
if (typeof line == 'string') {
881+
return {
882+
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
883+
payload: line
884+
};
885+
} else {
886+
// Directly enqueued by us
887+
return line;
888+
}
889+
});
885890
} else {
886891
controlInvocations = await remote.socketStreamRaw(
887892
{
888893
...syncOptions,
889894
fetchStrategy: resolvedOptions.fetchStrategy
890895
},
891-
(buffer) => ({
892-
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
893-
payload: buffer
894-
})
896+
(payload: Uint8Array | EnqueuedCommand) => {
897+
if (payload instanceof Uint8Array) {
898+
return {
899+
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
900+
payload: payload
901+
};
902+
} else {
903+
// Directly enqueued by us
904+
return payload;
905+
}
906+
}
895907
);
896908
}
897909

@@ -903,6 +915,11 @@ The next upload iteration will be delayed.`);
903915
}
904916

905917
await control(line.command, line.payload);
918+
919+
if (!hadSyncLine) {
920+
syncImplementation.triggerCrudUpload();
921+
hadSyncLine = true;
922+
}
906923
}
907924
} finally {
908925
const activeInstructions = controlInvocations;
@@ -918,7 +935,7 @@ The next upload iteration will be delayed.`);
918935
await control(PowerSyncControlCommand.STOP);
919936
}
920937

921-
async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) {
938+
async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) {
922939
const rawResponse = await adapter.control(op, payload ?? null);
923940
await handleInstructions(JSON.parse(rawResponse));
924941
}
@@ -1157,3 +1174,8 @@ The next upload iteration will be delayed.`);
11571174
});
11581175
}
11591176
}
1177+
1178+
interface EnqueuedCommand {
1179+
command: PowerSyncControlCommand;
1180+
payload?: Uint8Array | string;
1181+
}

0 commit comments

Comments
 (0)