Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/orange-baboons-work.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/react-native': patch
'@powersync/common': patch
'@powersync/web': patch
'@powersync/node': patch
---

Fix sync stream delays during CRUD upload.
24 changes: 5 additions & 19 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { DataStream } from '../../../utils/DataStream.js';
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
import { StreamingSyncLine, StreamingSyncRequest } from './streaming-sync-types.js';
import {
StreamingSyncLine,
StreamingSyncLineOrCrudUploadComplete,
StreamingSyncRequest
} from './streaming-sync-types.js';
import { WebsocketClientTransport } from './WebsocketClientTransport.js';

export type BSONImplementation = typeof BSON;
Expand Down Expand Up @@ -267,15 +271,6 @@ export abstract class AbstractRemote {
return new WebSocket(url);
}

/**
* Connects to the sync/stream websocket endpoint and delivers sync lines by decoding the BSON events
* sent by the server.
*/
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const bson = await this.getBSON();
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
}

/**
* Returns a data stream of sync line data.
*
Expand Down Expand Up @@ -475,15 +470,6 @@ export abstract class AbstractRemote {
return stream;
}

/**
* Connects to the sync/stream http endpoint, parsing lines as JSON.
*/
async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
return await this.postStreamRaw(options, (line) => {
return JSON.parse(line) as StreamingSyncLine;
});
}

/**
* Connects to the sync/stream http endpoint, mapping and emitting each received string line.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru
import * as sync_status from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { throttleLeadingTrailing } from '../../../utils/async.js';
import {
BucketChecksum,
BucketDescription,
Expand All @@ -19,7 +19,9 @@ import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
import {
BucketRequest,
CrudUploadNotification,
StreamingSyncLine,
StreamingSyncLineOrCrudUploadComplete,
StreamingSyncRequestParameterType,
isStreamingKeepalive,
isStreamingSyncCheckpoint,
Expand Down Expand Up @@ -225,7 +227,7 @@ export abstract class AbstractStreamingSyncImplementation
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise<void>;

private pendingCrudUpload?: Promise<void>;
private isUploadingCrud: boolean = false;
private notifyCompletedUploads?: () => void;

syncStatus: SyncStatus;
Expand All @@ -247,16 +249,14 @@ export abstract class AbstractStreamingSyncImplementation
this.abortController = null;

this.triggerCrudUpload = throttleLeadingTrailing(() => {
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
if (!this.syncStatus.connected || this.isUploadingCrud) {
return;
}

this.pendingCrudUpload = new Promise((resolve) => {
this._uploadAllCrud().finally(() => {
this.notifyCompletedUploads?.();
this.pendingCrudUpload = undefined;
resolve();
});
this.isUploadingCrud = true;
this._uploadAllCrud().finally(() => {
this.notifyCompletedUploads?.();
this.isUploadingCrud = false;
});
}, this.options.crudUploadThrottleMs!);
}
Expand Down Expand Up @@ -532,6 +532,8 @@ The next upload iteration will be delayed.`);
}
});
} finally {
this.notifyCompletedUploads = undefined;

if (!signal.aborted) {
nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.'));
nestedAbortController = new AbortController();
Expand Down Expand Up @@ -639,25 +641,63 @@ The next upload iteration will be delayed.`);
}
};

let stream: DataStream<StreamingSyncLine>;
let stream: DataStream<StreamingSyncLineOrCrudUploadComplete>;
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
stream = await this.options.remote.postStream(syncOptions);
} else {
stream = await this.options.remote.socketStream({
...syncOptions,
...{ fetchStrategy: resolvedOptions.fetchStrategy }
stream = await this.options.remote.postStreamRaw(syncOptions, (line: string | CrudUploadNotification) => {
if (typeof line == 'string') {
return JSON.parse(line) as StreamingSyncLine;
} else {
// Directly enqueued by us
return line;
}
});
} else {
const bson = await this.options.remote.getBSON();
stream = await this.options.remote.socketStreamRaw(
{
...syncOptions,
...{ fetchStrategy: resolvedOptions.fetchStrategy }
},
(payload: Uint8Array | CrudUploadNotification) => {
if (payload instanceof Uint8Array) {
return bson.deserialize(payload) as StreamingSyncLine;
} else {
// Directly enqueued by us
return payload;
}
},
bson
);
}

this.logger.debug('Stream established. Processing events');

this.notifyCompletedUploads = () => {
if (!stream.closed) {
stream.enqueueData({ crud_upload_completed: null });
}
};

while (!stream.closed) {
const line = await stream.read();
if (!line) {
// The stream has closed while waiting
return;
}

if ('crud_upload_completed' in line) {
if (validatedCheckpoint != null) {
const { applied, endIteration } = await this.applyCheckpoint(validatedCheckpoint);
if (applied) {
appliedCheckpoint = validatedCheckpoint;
} else if (endIteration) {
break;
}
}

continue;
}

// A connection is active and messages are being received
if (!this.syncStatus.connected) {
// There is a connection now
Expand Down Expand Up @@ -686,7 +726,7 @@ The next upload iteration will be delayed.`);
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
} else if (isStreamingSyncCheckpointComplete(line)) {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
const result = await this.applyCheckpoint(targetCheckpoint!);
if (result.endIteration) {
return;
} else if (result.applied) {
Expand Down Expand Up @@ -802,25 +842,7 @@ The next upload iteration will be delayed.`);
}
this.triggerCrudUpload();
} else {
this.logger.debug('Sync complete');

if (targetCheckpoint === appliedCheckpoint) {
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
priorityStatusEntries: [],
dataFlow: {
downloadError: undefined
}
});
} else if (validatedCheckpoint === targetCheckpoint) {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
return;
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
}
}
this.logger.debug('Received unknown sync line', line);
}
}
this.logger.debug('Stream input empty');
Expand Down Expand Up @@ -1059,50 +1081,35 @@ The next upload iteration will be delayed.`);
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
private async applyCheckpoint(checkpoint: Checkpoint) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;

if (!result.checkpointValid) {
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { applied: false, endIteration: true };
} else if (!result.ready && pending != null) {
// We have pending entries in the local upload queue or are waiting to confirm a write
// checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
// try again.
} else if (!result.ready) {
this.logger.debug(
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
'Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.'
);
await Promise.race([pending, onAbortPromise(abort)]);

if (abort.aborted) {
return { applied: false, endIteration: true };
}

// Try again now that uploads have completed.
result = await this.options.adapter.syncLocalDatabase(checkpoint);
return { applied: false, endIteration: false };
}

if (result.checkpointValid && result.ready) {
this.logger.debug('validated checkpoint', checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadProgress: null,
downloadError: undefined
}
});
this.logger.debug('validated checkpoint', checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadProgress: null,
downloadError: undefined
}
});

return { applied: true, endIteration: false };
} else {
this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.');
return { applied: false, endIteration: false };
}
return { applied: true, endIteration: false };
}

protected updateSyncStatus(options: SyncStatusOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ export type StreamingSyncLine =
| StreamingSyncCheckpointPartiallyComplete
| StreamingSyncKeepalive;

export type CrudUploadNotification = { crud_upload_completed: null };

export type StreamingSyncLineOrCrudUploadComplete = StreamingSyncLine | CrudUploadNotification;

export interface BucketRequest {
name: string;

Expand Down
10 changes: 0 additions & 10 deletions packages/common/src/utils/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,3 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
}
};
}

export function onAbortPromise(signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
if (signal.aborted) {
resolve();
} else {
signal.onabort = () => resolve();
}
});
}
37 changes: 17 additions & 20 deletions packages/react-native/src/sync/stream/ReactNativeRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import {
FetchImplementation,
FetchImplementationProvider,
RemoteConnector,
SocketSyncStreamOptions,
StreamingSyncLine,
SyncStreamOptions
} from '@powersync/common';
import { Platform } from 'react-native';
Expand Down Expand Up @@ -57,11 +55,7 @@ export class ReactNativeRemote extends AbstractRemote {
return BSON;
}

async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
return super.socketStream(options);
}

async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
async postStreamRaw<T>(options: SyncStreamOptions, mapLine: (line: string) => T): Promise<DataStream<T>> {
const timeout =
Platform.OS == 'android'
? setTimeout(() => {
Expand All @@ -74,19 +68,22 @@ export class ReactNativeRemote extends AbstractRemote {
: null;

try {
return await super.postStream({
...options,
fetchOptions: {
...options.fetchOptions,
/**
* The `react-native-fetch-api` polyfill provides streaming support via
* this non-standard flag
* https://github.com/react-native-community/fetch#enable-text-streaming
*/
// @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming
reactNative: { textStreaming: true }
}
});
return await super.postStreamRaw(
{
...options,
fetchOptions: {
...options.fetchOptions,
/**
* The `react-native-fetch-api` polyfill provides streaming support via
* this non-standard flag
* https://github.com/react-native-community/fetch#enable-text-streaming
*/
// @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming
reactNative: { textStreaming: true }
}
},
mapLine
);
} finally {
if (timeout) {
clearTimeout(timeout);
Expand Down
Loading