diff --git a/.changeset/orange-baboons-work.md b/.changeset/orange-baboons-work.md new file mode 100644 index 000000000..bcd861c86 --- /dev/null +++ b/.changeset/orange-baboons-work.md @@ -0,0 +1,8 @@ +--- +'@powersync/react-native': patch +'@powersync/common': patch +'@powersync/web': patch +'@powersync/node': patch +--- + +Fix sync stream delays during CRUD upload. diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index c746ea4c4..f0de66d00 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -7,7 +7,11 @@ import PACKAGE from '../../../../package.json' with { type: 'json' }; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { DataStream } from '../../../utils/DataStream.js'; import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js'; -import { StreamingSyncLine, StreamingSyncRequest } from './streaming-sync-types.js'; +import { + StreamingSyncLine, + StreamingSyncLineOrCrudUploadComplete, + StreamingSyncRequest +} from './streaming-sync-types.js'; import { WebsocketClientTransport } from './WebsocketClientTransport.js'; export type BSONImplementation = typeof BSON; @@ -267,15 +271,6 @@ export abstract class AbstractRemote { return new WebSocket(url); } - /** - * Connects to the sync/stream websocket endpoint and delivers sync lines by decoding the BSON events - * sent by the server. - */ - async socketStream(options: SocketSyncStreamOptions): Promise> { - const bson = await this.getBSON(); - return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson); - } - /** * Returns a data stream of sync line data. * @@ -475,15 +470,6 @@ export abstract class AbstractRemote { return stream; } - /** - * Connects to the sync/stream http endpoint, parsing lines as JSON. - */ - async postStream(options: SyncStreamOptions): Promise> { - return await this.postStreamRaw(options, (line) => { - return JSON.parse(line) as StreamingSyncLine; - }); - } - /** * Connects to the sync/stream http endpoint, mapping and emitting each received string line. */ diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 4f6cb1070..58d4c6492 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru import * as sync_status from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; -import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js'; +import { throttleLeadingTrailing } from '../../../utils/async.js'; import { BucketChecksum, BucketDescription, @@ -19,7 +19,9 @@ import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js'; import { BucketRequest, + CrudUploadNotification, StreamingSyncLine, + StreamingSyncLineOrCrudUploadComplete, StreamingSyncRequestParameterType, isStreamingKeepalive, isStreamingSyncCheckpoint, @@ -225,7 +227,7 @@ export abstract class AbstractStreamingSyncImplementation protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; - private pendingCrudUpload?: Promise; + private isUploadingCrud: boolean = false; private notifyCompletedUploads?: () => void; syncStatus: SyncStatus; @@ -247,16 +249,14 @@ export abstract class AbstractStreamingSyncImplementation this.abortController = null; this.triggerCrudUpload = throttleLeadingTrailing(() => { - if (!this.syncStatus.connected || this.pendingCrudUpload != null) { + if (!this.syncStatus.connected || this.isUploadingCrud) { return; } - this.pendingCrudUpload = new Promise((resolve) => { - this._uploadAllCrud().finally(() => { - this.notifyCompletedUploads?.(); - this.pendingCrudUpload = undefined; - resolve(); - }); + this.isUploadingCrud = true; + this._uploadAllCrud().finally(() => { + this.notifyCompletedUploads?.(); + this.isUploadingCrud = false; }); }, this.options.crudUploadThrottleMs!); } @@ -532,6 +532,8 @@ The next upload iteration will be delayed.`); } }); } finally { + this.notifyCompletedUploads = undefined; + if (!signal.aborted) { nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.')); nestedAbortController = new AbortController(); @@ -617,10 +619,9 @@ The next upload iteration will be delayed.`); this.options.adapter.startSession(); let [req, bucketMap] = await this.collectLocalBucketState(); - // These are compared by reference let targetCheckpoint: Checkpoint | null = null; - let validatedCheckpoint: Checkpoint | null = null; - let appliedCheckpoint: Checkpoint | null = null; + // A checkpoint that has been validated but not applied (e.g. due to pending local writes) + let pendingValidatedCheckpoint: Checkpoint | null = null; const clientId = await this.options.adapter.getClientId(); const usingFixedKeyFormat = await this.requireKeyFormat(false); @@ -639,18 +640,43 @@ The next upload iteration will be delayed.`); } }; - let stream: DataStream; + let stream: DataStream; if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) { - stream = await this.options.remote.postStream(syncOptions); - } else { - stream = await this.options.remote.socketStream({ - ...syncOptions, - ...{ fetchStrategy: resolvedOptions.fetchStrategy } + stream = await this.options.remote.postStreamRaw(syncOptions, (line: string | CrudUploadNotification) => { + if (typeof line == 'string') { + return JSON.parse(line) as StreamingSyncLine; + } else { + // Directly enqueued by us + return line; + } }); + } else { + const bson = await this.options.remote.getBSON(); + stream = await this.options.remote.socketStreamRaw( + { + ...syncOptions, + ...{ fetchStrategy: resolvedOptions.fetchStrategy } + }, + (payload: Uint8Array | CrudUploadNotification) => { + if (payload instanceof Uint8Array) { + return bson.deserialize(payload) as StreamingSyncLine; + } else { + // Directly enqueued by us + return payload; + } + }, + bson + ); } this.logger.debug('Stream established. Processing events'); + this.notifyCompletedUploads = () => { + if (!stream.closed) { + stream.enqueueData({ crud_upload_completed: null }); + } + }; + while (!stream.closed) { const line = await stream.read(); if (!line) { @@ -658,6 +684,19 @@ The next upload iteration will be delayed.`); return; } + if ('crud_upload_completed' in line) { + if (pendingValidatedCheckpoint != null) { + const { applied, endIteration } = await this.applyCheckpoint(pendingValidatedCheckpoint); + if (applied) { + pendingValidatedCheckpoint = null; + } else if (endIteration) { + break; + } + } + + continue; + } + // A connection is active and messages are being received if (!this.syncStatus.connected) { // There is a connection now @@ -686,13 +725,12 @@ The next upload iteration will be delayed.`); await this.options.adapter.setTargetCheckpoint(targetCheckpoint); await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); } else if (isStreamingSyncCheckpointComplete(line)) { - const result = await this.applyCheckpoint(targetCheckpoint!, signal); + const result = await this.applyCheckpoint(targetCheckpoint!); if (result.endIteration) { return; - } else if (result.applied) { - appliedCheckpoint = targetCheckpoint; + } else if (!result.applied) { + pendingValidatedCheckpoint = targetCheckpoint; } - validatedCheckpoint = targetCheckpoint; } else if (isStreamingSyncCheckpointPartiallyComplete(line)) { const priority = line.partial_checkpoint_complete.priority; this.logger.debug('Partial checkpoint complete', priority); @@ -802,25 +840,7 @@ The next upload iteration will be delayed.`); } this.triggerCrudUpload(); } else { - this.logger.debug('Sync complete'); - - if (targetCheckpoint === appliedCheckpoint) { - this.updateSyncStatus({ - connected: true, - lastSyncedAt: new Date(), - priorityStatusEntries: [], - dataFlow: { - downloadError: undefined - } - }); - } else if (validatedCheckpoint === targetCheckpoint) { - const result = await this.applyCheckpoint(targetCheckpoint!, signal); - if (result.endIteration) { - return; - } else if (result.applied) { - appliedCheckpoint = targetCheckpoint; - } - } + this.logger.debug('Received unknown sync line', line); } } this.logger.debug('Stream input empty'); @@ -1059,9 +1079,8 @@ The next upload iteration will be delayed.`); }); } - private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) { + private async applyCheckpoint(checkpoint: Checkpoint) { let result = await this.options.adapter.syncLocalDatabase(checkpoint); - const pending = this.pendingCrudUpload; if (!result.checkpointValid) { this.logger.debug('Checksum mismatch in checkpoint, will reconnect'); @@ -1069,40 +1088,26 @@ The next upload iteration will be delayed.`); // TODO: better back-off await new Promise((resolve) => setTimeout(resolve, 50)); return { applied: false, endIteration: true }; - } else if (!result.ready && pending != null) { - // We have pending entries in the local upload queue or are waiting to confirm a write - // checkpoint, which prevented this checkpoint from applying. Wait for that to complete and - // try again. + } else if (!result.ready) { this.logger.debug( - 'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.' + 'Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.' ); - await Promise.race([pending, onAbortPromise(abort)]); - - if (abort.aborted) { - return { applied: false, endIteration: true }; - } - // Try again now that uploads have completed. - result = await this.options.adapter.syncLocalDatabase(checkpoint); + return { applied: false, endIteration: false }; } - if (result.checkpointValid && result.ready) { - this.logger.debug('validated checkpoint', checkpoint); - this.updateSyncStatus({ - connected: true, - lastSyncedAt: new Date(), - dataFlow: { - downloading: false, - downloadProgress: null, - downloadError: undefined - } - }); + this.logger.debug('validated checkpoint', checkpoint); + this.updateSyncStatus({ + connected: true, + lastSyncedAt: new Date(), + dataFlow: { + downloading: false, + downloadProgress: null, + downloadError: undefined + } + }); - return { applied: true, endIteration: false }; - } else { - this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.'); - return { applied: false, endIteration: false }; - } + return { applied: true, endIteration: false }; } protected updateSyncStatus(options: SyncStatusOptions) { diff --git a/packages/common/src/client/sync/stream/streaming-sync-types.ts b/packages/common/src/client/sync/stream/streaming-sync-types.ts index bbc4b2f75..fdb73f360 100644 --- a/packages/common/src/client/sync/stream/streaming-sync-types.ts +++ b/packages/common/src/client/sync/stream/streaming-sync-types.ts @@ -136,6 +136,10 @@ export type StreamingSyncLine = | StreamingSyncCheckpointPartiallyComplete | StreamingSyncKeepalive; +export type CrudUploadNotification = { crud_upload_completed: null }; + +export type StreamingSyncLineOrCrudUploadComplete = StreamingSyncLine | CrudUploadNotification; + export interface BucketRequest { name: string; diff --git a/packages/common/src/utils/async.ts b/packages/common/src/utils/async.ts index c6fe822d8..d869f8c36 100644 --- a/packages/common/src/utils/async.ts +++ b/packages/common/src/utils/async.ts @@ -48,13 +48,3 @@ export function throttleLeadingTrailing(func: () => void, wait: number) { } }; } - -export function onAbortPromise(signal: AbortSignal): Promise { - return new Promise((resolve) => { - if (signal.aborted) { - resolve(); - } else { - signal.onabort = () => resolve(); - } - }); -} diff --git a/packages/react-native/src/sync/stream/ReactNativeRemote.ts b/packages/react-native/src/sync/stream/ReactNativeRemote.ts index 3ed14fb6e..9b79d017f 100644 --- a/packages/react-native/src/sync/stream/ReactNativeRemote.ts +++ b/packages/react-native/src/sync/stream/ReactNativeRemote.ts @@ -9,8 +9,6 @@ import { FetchImplementation, FetchImplementationProvider, RemoteConnector, - SocketSyncStreamOptions, - StreamingSyncLine, SyncStreamOptions } from '@powersync/common'; import { Platform } from 'react-native'; @@ -57,11 +55,7 @@ export class ReactNativeRemote extends AbstractRemote { return BSON; } - async socketStream(options: SocketSyncStreamOptions): Promise> { - return super.socketStream(options); - } - - async postStream(options: SyncStreamOptions): Promise> { + async postStreamRaw(options: SyncStreamOptions, mapLine: (line: string) => T): Promise> { const timeout = Platform.OS == 'android' ? setTimeout(() => { @@ -74,19 +68,22 @@ export class ReactNativeRemote extends AbstractRemote { : null; try { - return await super.postStream({ - ...options, - fetchOptions: { - ...options.fetchOptions, - /** - * The `react-native-fetch-api` polyfill provides streaming support via - * this non-standard flag - * https://github.com/react-native-community/fetch#enable-text-streaming - */ - // @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming - reactNative: { textStreaming: true } - } - }); + return await super.postStreamRaw( + { + ...options, + fetchOptions: { + ...options.fetchOptions, + /** + * The `react-native-fetch-api` polyfill provides streaming support via + * this non-standard flag + * https://github.com/react-native-community/fetch#enable-text-streaming + */ + // @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming + reactNative: { textStreaming: true } + } + }, + mapLine + ); } finally { if (timeout) { clearTimeout(timeout); diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index 08e1e86de..d45ebe0aa 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -3,6 +3,7 @@ import { createBaseLogger, DataStream, PowerSyncConnectionOptions, + SyncStreamConnectionMethod, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web'; @@ -179,6 +180,7 @@ function describeStreamingTests(createConnectedDatabase: () => Promise { // This initially performs a connect call const { powersync, remote } = await createConnectedDatabase(); + const connectionOptions: PowerSyncConnectionOptions = { connectionMethod: SyncStreamConnectionMethod.HTTP }; expect(powersync.connected).toBe(true); const spy = vi.spyOn(powersync as any, 'generateSyncStreamImplementation'); @@ -187,11 +189,11 @@ function describeStreamingTests(createConnectedDatabase: () => Promise[] = []; // This method is used for all mocked connections - const basePostStream = remote.postStream; - const postSpy = vi.spyOn(remote, 'postStream').mockImplementation(async (...options) => { + const basePostStream = remote.postStreamRaw; + const postSpy = vi.spyOn(remote, 'postStreamRaw').mockImplementation(async (...args) => { // Simulate a connection delay await new Promise((r) => setTimeout(r, 100)); - const stream = await basePostStream.call(remote, ...options); + const stream = await basePostStream.call(remote, ...args); generatedStreams.push(stream); return stream; }); @@ -199,7 +201,7 @@ function describeStreamingTests(createConnectedDatabase: () => Promise Promise= 0; i--) { await new Promise((r) => setTimeout(r, Math.random() * 10)); - powersync.connect(new TestConnector(), { params: { count: i } }); + powersync.connect(new TestConnector(), { params: { count: i }, ...connectionOptions }); } await vi.waitFor( diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index 84dfad789..f12bd186c 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -8,6 +8,7 @@ import { PowerSyncCredentials, PowerSyncDatabaseOptions, RemoteConnector, + SocketSyncStreamOptions, StreamingSyncLine, SyncStreamOptions } from '@powersync/common'; @@ -18,6 +19,7 @@ import { WebPowerSyncOpenFactoryOptions, WebStreamingSyncImplementation } from '@powersync/web'; +import { BSON } from 'bson'; import { MockedFunction, vi } from 'vitest'; export class TestConnector implements PowerSyncBackendConnector { @@ -34,7 +36,7 @@ export class TestConnector implements PowerSyncBackendConnector { } export class MockRemote extends AbstractRemote { - streamController: ReadableStreamDefaultController | null; + streamController: ReadableStreamDefaultController | null; errorOnStreamStart = false; generateCheckpoint: MockedFunction<() => any>; @@ -100,16 +102,16 @@ export class MockRemote extends AbstractRemote { return new Response(stream).body!; } - socketStream(options: SyncStreamOptions): Promise> { - // For this test mock these are essentially the same - return this.postStream(options); + async socketStreamRaw(): Promise> { + throw 'Unsupported: Socket streams are not currently supported in tests'; } - async postStream(options: SyncStreamOptions): Promise> { + async postStreamRaw(options: SyncStreamOptions, mapLine: (line: string) => T): Promise> { const mockResponse = await this.postStreaming(options.path, options.data, options.headers, options.abortSignal); const mockReader = mockResponse.getReader(); - const stream = new DataStream({ - logger: this.logger + const stream = new DataStream({ + logger: this.logger, + mapLine: mapLine }); if (options.abortSignal?.aborted) { @@ -143,7 +145,7 @@ export class MockRemote extends AbstractRemote { } enqueueLine(line: StreamingSyncLine) { - this.streamController?.enqueue(line); + this.streamController?.enqueue(JSON.stringify(line)); } } diff --git a/packages/web/tests/utils/generateConnectedDatabase.ts b/packages/web/tests/utils/generateConnectedDatabase.ts index 260fb5e7e..16fcc317e 100644 --- a/packages/web/tests/utils/generateConnectedDatabase.ts +++ b/packages/web/tests/utils/generateConnectedDatabase.ts @@ -1,4 +1,4 @@ -import { Schema, Table, column } from '@powersync/common'; +import { Schema, SyncStreamConnectionMethod, Table, column } from '@powersync/common'; import { WebPowerSyncOpenFactoryOptions } from '@powersync/web'; import { v4 as uuid, v4 } from 'uuid'; import { onTestFinished, vi } from 'vitest'; @@ -71,7 +71,7 @@ export async function generateConnectedDatabase(options: GenerateConnectedDataba const connect = async () => { const streamOpened = waitForStream(); - const connectedPromise = powersync.connect(connector); + const connectedPromise = powersync.connect(connector, { connectionMethod: SyncStreamConnectionMethod.HTTP }); await streamOpened;