Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/orange-baboons-work.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/react-native': patch
'@powersync/common': patch
'@powersync/web': patch
'@powersync/node': patch
---

Fix sync stream delays during CRUD upload.
24 changes: 5 additions & 19 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { DataStream } from '../../../utils/DataStream.js';
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
import { StreamingSyncLine, StreamingSyncRequest } from './streaming-sync-types.js';
import {
StreamingSyncLine,
StreamingSyncLineOrCrudUploadComplete,
StreamingSyncRequest
} from './streaming-sync-types.js';
import { WebsocketClientTransport } from './WebsocketClientTransport.js';

export type BSONImplementation = typeof BSON;
Expand Down Expand Up @@ -267,15 +271,6 @@ export abstract class AbstractRemote {
return new WebSocket(url);
}

/**
* Connects to the sync/stream websocket endpoint and delivers sync lines by decoding the BSON events
* sent by the server.
*/
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const bson = await this.getBSON();
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
}

/**
* Returns a data stream of sync line data.
*
Expand Down Expand Up @@ -475,15 +470,6 @@ export abstract class AbstractRemote {
return stream;
}

/**
* Connects to the sync/stream http endpoint, parsing lines as JSON.
*/
async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
return await this.postStreamRaw(options, (line) => {
return JSON.parse(line) as StreamingSyncLine;
});
}

/**
* Connects to the sync/stream http endpoint, mapping and emitting each received string line.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru
import * as sync_status from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { throttleLeadingTrailing } from '../../../utils/async.js';
import {
BucketChecksum,
BucketDescription,
Expand All @@ -19,7 +19,9 @@ import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
import {
BucketRequest,
CrudUploadNotification,
StreamingSyncLine,
StreamingSyncLineOrCrudUploadComplete,
StreamingSyncRequestParameterType,
isStreamingKeepalive,
isStreamingSyncCheckpoint,
Expand Down Expand Up @@ -225,7 +227,7 @@ export abstract class AbstractStreamingSyncImplementation
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise<void>;

private pendingCrudUpload?: Promise<void>;
private isUploadingCrud: boolean = false;
private notifyCompletedUploads?: () => void;

syncStatus: SyncStatus;
Expand All @@ -247,16 +249,14 @@ export abstract class AbstractStreamingSyncImplementation
this.abortController = null;

this.triggerCrudUpload = throttleLeadingTrailing(() => {
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
if (!this.syncStatus.connected || this.isUploadingCrud) {
return;
}

this.pendingCrudUpload = new Promise((resolve) => {
this._uploadAllCrud().finally(() => {
this.notifyCompletedUploads?.();
this.pendingCrudUpload = undefined;
resolve();
});
this.isUploadingCrud = true;
this._uploadAllCrud().finally(() => {
this.notifyCompletedUploads?.();
this.isUploadingCrud = false;
});
}, this.options.crudUploadThrottleMs!);
}
Expand Down Expand Up @@ -532,6 +532,8 @@ The next upload iteration will be delayed.`);
}
});
} finally {
this.notifyCompletedUploads = undefined;

if (!signal.aborted) {
nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.'));
nestedAbortController = new AbortController();
Expand Down Expand Up @@ -617,10 +619,9 @@ The next upload iteration will be delayed.`);
this.options.adapter.startSession();
let [req, bucketMap] = await this.collectLocalBucketState();

// These are compared by reference
let targetCheckpoint: Checkpoint | null = null;
let validatedCheckpoint: Checkpoint | null = null;
let appliedCheckpoint: Checkpoint | null = null;
// A checkpoint that has been validated but not applied (e.g. due to pending local writes)
let pendingValidatedCheckpoint: Checkpoint | null = null;

const clientId = await this.options.adapter.getClientId();
const usingFixedKeyFormat = await this.requireKeyFormat(false);
Expand All @@ -639,25 +640,63 @@ The next upload iteration will be delayed.`);
}
};

let stream: DataStream<StreamingSyncLine>;
let stream: DataStream<StreamingSyncLineOrCrudUploadComplete>;
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
stream = await this.options.remote.postStream(syncOptions);
} else {
stream = await this.options.remote.socketStream({
...syncOptions,
...{ fetchStrategy: resolvedOptions.fetchStrategy }
stream = await this.options.remote.postStreamRaw(syncOptions, (line: string | CrudUploadNotification) => {
if (typeof line == 'string') {
return JSON.parse(line) as StreamingSyncLine;
} else {
// Directly enqueued by us
return line;
}
});
} else {
const bson = await this.options.remote.getBSON();
stream = await this.options.remote.socketStreamRaw(
{
...syncOptions,
...{ fetchStrategy: resolvedOptions.fetchStrategy }
},
(payload: Uint8Array | CrudUploadNotification) => {
if (payload instanceof Uint8Array) {
return bson.deserialize(payload) as StreamingSyncLine;
} else {
// Directly enqueued by us
return payload;
}
},
bson
);
}

this.logger.debug('Stream established. Processing events');

this.notifyCompletedUploads = () => {
if (!stream.closed) {
stream.enqueueData({ crud_upload_completed: null });
}
};

while (!stream.closed) {
const line = await stream.read();
if (!line) {
// The stream has closed while waiting
return;
}

if ('crud_upload_completed' in line) {
if (pendingValidatedCheckpoint != null) {
const { applied, endIteration } = await this.applyCheckpoint(pendingValidatedCheckpoint);
if (applied) {
pendingValidatedCheckpoint = null;
} else if (endIteration) {
break;
}
}

continue;
}

// A connection is active and messages are being received
if (!this.syncStatus.connected) {
// There is a connection now
Expand Down Expand Up @@ -686,13 +725,12 @@ The next upload iteration will be delayed.`);
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
} else if (isStreamingSyncCheckpointComplete(line)) {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
const result = await this.applyCheckpoint(targetCheckpoint!);
if (result.endIteration) {
return;
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
} else if (!result.applied) {
pendingValidatedCheckpoint = targetCheckpoint;
}
validatedCheckpoint = targetCheckpoint;
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
const priority = line.partial_checkpoint_complete.priority;
this.logger.debug('Partial checkpoint complete', priority);
Expand Down Expand Up @@ -802,25 +840,7 @@ The next upload iteration will be delayed.`);
}
this.triggerCrudUpload();
} else {
this.logger.debug('Sync complete');

if (targetCheckpoint === appliedCheckpoint) {
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
priorityStatusEntries: [],
dataFlow: {
downloadError: undefined
}
});
} else if (validatedCheckpoint === targetCheckpoint) {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
return;
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
}
}
this.logger.debug('Received unknown sync line', line);
}
}
this.logger.debug('Stream input empty');
Expand Down Expand Up @@ -1059,50 +1079,35 @@ The next upload iteration will be delayed.`);
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
private async applyCheckpoint(checkpoint: Checkpoint) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;

if (!result.checkpointValid) {
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { applied: false, endIteration: true };
} else if (!result.ready && pending != null) {
// We have pending entries in the local upload queue or are waiting to confirm a write
// checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
// try again.
} else if (!result.ready) {
this.logger.debug(
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
'Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.'
);
await Promise.race([pending, onAbortPromise(abort)]);

if (abort.aborted) {
return { applied: false, endIteration: true };
}

// Try again now that uploads have completed.
result = await this.options.adapter.syncLocalDatabase(checkpoint);
return { applied: false, endIteration: false };
}

if (result.checkpointValid && result.ready) {
this.logger.debug('validated checkpoint', checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadProgress: null,
downloadError: undefined
}
});
this.logger.debug('validated checkpoint', checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadProgress: null,
downloadError: undefined
}
});

return { applied: true, endIteration: false };
} else {
this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.');
return { applied: false, endIteration: false };
}
return { applied: true, endIteration: false };
}

protected updateSyncStatus(options: SyncStatusOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ export type StreamingSyncLine =
| StreamingSyncCheckpointPartiallyComplete
| StreamingSyncKeepalive;

export type CrudUploadNotification = { crud_upload_completed: null };

export type StreamingSyncLineOrCrudUploadComplete = StreamingSyncLine | CrudUploadNotification;

export interface BucketRequest {
name: string;

Expand Down
10 changes: 0 additions & 10 deletions packages/common/src/utils/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,3 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
}
};
}

export function onAbortPromise(signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
if (signal.aborted) {
resolve();
} else {
signal.onabort = () => resolve();
}
});
}
37 changes: 17 additions & 20 deletions packages/react-native/src/sync/stream/ReactNativeRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import {
FetchImplementation,
FetchImplementationProvider,
RemoteConnector,
SocketSyncStreamOptions,
StreamingSyncLine,
SyncStreamOptions
} from '@powersync/common';
import { Platform } from 'react-native';
Expand Down Expand Up @@ -57,11 +55,7 @@ export class ReactNativeRemote extends AbstractRemote {
return BSON;
}

async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
return super.socketStream(options);
}

async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
async postStreamRaw<T>(options: SyncStreamOptions, mapLine: (line: string) => T): Promise<DataStream<T>> {
const timeout =
Platform.OS == 'android'
? setTimeout(() => {
Expand All @@ -74,19 +68,22 @@ export class ReactNativeRemote extends AbstractRemote {
: null;

try {
return await super.postStream({
...options,
fetchOptions: {
...options.fetchOptions,
/**
* The `react-native-fetch-api` polyfill provides streaming support via
* this non-standard flag
* https://github.com/react-native-community/fetch#enable-text-streaming
*/
// @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming
reactNative: { textStreaming: true }
}
});
return await super.postStreamRaw(
{
...options,
fetchOptions: {
...options.fetchOptions,
/**
* The `react-native-fetch-api` polyfill provides streaming support via
* this non-standard flag
* https://github.com/react-native-community/fetch#enable-text-streaming
*/
// @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming
reactNative: { textStreaming: true }
}
},
mapLine
);
} finally {
if (timeout) {
clearTimeout(timeout);
Expand Down
Loading