diff --git a/.changeset/violet-falcons-build.md b/.changeset/violet-falcons-build.md new file mode 100644 index 000000000..db34fcdb6 --- /dev/null +++ b/.changeset/violet-falcons-build.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fix a race condition causing sync changes during uploads not to be applied. diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index f9ecc6a87..2e71083e7 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -15,7 +15,7 @@ import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; import { mutexRunExclusive } from '../utils/mutex.js'; -import { throttleTrailing } from '../utils/throttle.js'; +import { throttleTrailing } from '../utils/async.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js'; import { runOnSchemaChange } from './runOnSchemaChange.js'; diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 808ebc2c3..705f96426 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -1,9 +1,9 @@ import Logger, { ILogger } from 'js-logger'; -import { SyncPriorityStatus, SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; +import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; -import { throttleLeadingTrailing } from '../../../utils/throttle.js'; +import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js'; import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; @@ -161,6 +161,7 @@ export abstract class AbstractStreamingSyncImplementation protected abortController: AbortController | null; protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; + private pendingCrudUpload?: Promise; syncStatus: SyncStatus; triggerCrudUpload: () => void; @@ -181,10 +182,16 @@ export abstract class AbstractStreamingSyncImplementation this.abortController = null; this.triggerCrudUpload = throttleLeadingTrailing(() => { - if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) { + if (!this.syncStatus.connected || this.pendingCrudUpload != null) { return; } - this._uploadAllCrud(); + + this.pendingCrudUpload = new Promise((resolve) => { + this._uploadAllCrud().finally(() => { + this.pendingCrudUpload = undefined; + resolve(); + }); + }); }, this.options.crudUploadThrottleMs!); } @@ -434,16 +441,8 @@ The next upload iteration will be delayed.`); if (signal?.aborted) { break; } - const { retry } = await this.streamingSyncIteration(nestedAbortController.signal, options); - if (!retry) { - /** - * A sync error ocurred that we cannot recover from here. - * This loop must terminate. - * The nestedAbortController will close any open network requests and streams below. - */ - break; - } - // Continue immediately + await this.streamingSyncIteration(nestedAbortController.signal, options); + // Continue immediately, streamingSyncIteration will wait before completing if necessary. } catch (ex) { /** * Either: @@ -501,8 +500,8 @@ The next upload iteration will be delayed.`); protected async streamingSyncIteration( signal: AbortSignal, options?: PowerSyncConnectionOptions - ): Promise<{ retry?: boolean }> { - return await this.obtainLock({ + ): Promise { + await this.obtainLock({ type: LockType.SYNC, signal, callback: async () => { @@ -552,7 +551,7 @@ The next upload iteration will be delayed.`); const line = await stream.read(); if (!line) { // The stream has closed while waiting - return { retry: true }; + return; } // A connection is active and messages are being received @@ -582,30 +581,12 @@ The next upload iteration will be delayed.`); await this.options.adapter.removeBuckets([...bucketsToDelete]); await this.options.adapter.setTargetCheckpoint(targetCheckpoint); } else if (isStreamingSyncCheckpointComplete(line)) { - this.logger.debug('Checkpoint complete', targetCheckpoint); - const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!); - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await new Promise((resolve) => setTimeout(resolve, 50)); - return { retry: true }; - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // landing here the whole time - } else { + const result = await this.applyCheckpoint(targetCheckpoint!, signal); + if (result.endIteration) { + return; + } else if (result.applied) { appliedCheckpoint = targetCheckpoint; - this.logger.debug('validated checkpoint', appliedCheckpoint); - this.updateSyncStatus({ - connected: true, - lastSyncedAt: new Date(), - dataFlow: { - downloading: false, - downloadError: undefined - } - }); } - validatedCheckpoint = targetCheckpoint; } else if (isStreamingSyncCheckpointPartiallyComplete(line)) { const priority = line.partial_checkpoint_complete.priority; @@ -615,9 +596,10 @@ The next upload iteration will be delayed.`); // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off await new Promise((resolve) => setTimeout(resolve, 50)); - return { retry: true }; + return; } else if (!result.ready) { - // Need more data for a consistent partial sync within a priority - continue waiting. + // If we have pending uploads, we can't complete new checkpoints outside of priority 0. + // We'll resolve this for a complete checkpoint. } else { // We'll keep on downloading, but can report that this priority is synced now. this.logger.debug('partial checkpoint validation succeeded'); @@ -691,7 +673,7 @@ The next upload iteration will be delayed.`); * (uses the same one), this should have some delay. */ await this.delayRetry(); - return { retry: true }; + return ; } this.triggerCrudUpload(); } else { @@ -707,37 +689,67 @@ The next upload iteration will be delayed.`); } }); } else if (validatedCheckpoint === targetCheckpoint) { - const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!); - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await new Promise((resolve) => setTimeout(resolve, 50)); - return { retry: false }; - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } else { + const result = await this.applyCheckpoint(targetCheckpoint!, signal); + if (result.endIteration) { + return; + } else if (result.applied) { appliedCheckpoint = targetCheckpoint; - this.updateSyncStatus({ - connected: true, - lastSyncedAt: new Date(), - priorityStatusEntries: [], - dataFlow: { - downloading: false, - downloadError: undefined - } - }); } } } } this.logger.debug('Stream input empty'); // Connection closed. Likely due to auth issue. - return { retry: true }; + return; } }); } + private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) { + let result = await this.options.adapter.syncLocalDatabase(checkpoint); + const pending = this.pendingCrudUpload; + + if (!result.checkpointValid) { + this.logger.debug('Checksum mismatch in checkpoint, will reconnect'); + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + await new Promise((resolve) => setTimeout(resolve, 50)); + return { applied: false, endIteration: true }; + } else if (!result.ready && pending != null) { + // We have pending entries in the local upload queue or are waiting to confirm a write + // checkpoint, which prevented this checkpoint from applying. Wait for that to complete and + // try again. + this.logger.debug( + 'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.' + ); + await Promise.race([pending, onAbortPromise(abort)]); + + if (abort.aborted) { + return { applied: false, endIteration: true }; + } + + // Try again now that uploads have completed. + result = await this.options.adapter.syncLocalDatabase(checkpoint); + } + + if (result.checkpointValid && result.ready) { + this.logger.debug('validated checkpoint', checkpoint); + this.updateSyncStatus({ + connected: true, + lastSyncedAt: new Date(), + dataFlow: { + downloading: false, + downloadError: undefined + } + }); + + return { applied: true, endIteration: false }; + } else { + this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.'); + return { applied: false, endIteration: false }; + } + } + protected updateSyncStatus(options: SyncStatusOptions) { const updatedStatus = new SyncStatus({ connected: options.connected ?? this.syncStatus.connected, diff --git a/packages/common/src/utils/throttle.ts b/packages/common/src/utils/async.ts similarity index 86% rename from packages/common/src/utils/throttle.ts rename to packages/common/src/utils/async.ts index d869f8c36..c6fe822d8 100644 --- a/packages/common/src/utils/throttle.ts +++ b/packages/common/src/utils/async.ts @@ -48,3 +48,13 @@ export function throttleLeadingTrailing(func: () => void, wait: number) { } }; } + +export function onAbortPromise(signal: AbortSignal): Promise { + return new Promise((resolve) => { + if (signal.aborted) { + resolve(); + } else { + signal.onabort = () => resolve(); + } + }); +} diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index 1e564950d..2fc8c75a4 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -46,6 +46,101 @@ describe('Streaming', { sequential: true }, () => { }) ) ); + + it('Should handle checkpoints during the upload process', async () => { + const { powersync, remote, uploadSpy } = await generateConnectedDatabase(); + expect(powersync.connected).toBe(true); + + let resolveUploadPromise: () => void; + let resolveUploadStartedPromise: () => void; + const completeUploadPromise = new Promise((resolve) => { + resolveUploadPromise = resolve; + }); + const uploadStartedPromise = new Promise((resolve) => { + resolveUploadStartedPromise = resolve; + }); + + async function expectUserRows(amount: number) { + const row = await powersync.get<{ r: number }>('SELECT COUNT(*) AS r FROM users'); + expect(row.r).toBe(amount); + } + + uploadSpy.mockImplementation(async (db) => { + const batch = await db.getCrudBatch(); + if (!batch) return; + + resolveUploadStartedPromise(); + await completeUploadPromise; + await batch?.complete(); + }); + + // trigger an upload + await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['from local']); + await expectUserRows(1); + await uploadStartedPromise; + + // A connector could have uploaded data (triggering a checkpoint) before finishing + remote.enqueueLine({ + checkpoint: { + write_checkpoint: '1', + last_op_id: '2', + buckets: [{ bucket: 'a', priority: 3, checksum: 0 }] + } + }); + remote.generateCheckpoint.mockImplementation(() => { + return { + data: { + write_checkpoint: '1' + } + }; + }); + + remote.enqueueLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '1', + op: 'PUT', + object_id: '1', + object_type: 'users', + data: '{"id": "test1", "name": "from local"}' + }, + { + checksum: 0, + op_id: '2', + op: 'PUT', + object_id: '2', + object_type: 'users', + data: '{"id": "test1", "name": "additional entry"}' + } + ] + } + }); + remote.enqueueLine({ + checkpoint_complete: { + last_op_id: '2' + } + }); + + // Give the sync client some time to process these + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data. + await expectUserRows(1); + + // Mark the upload as completed. This should trigger a write_checkpoint.json request + resolveUploadPromise!(); + await vi.waitFor(() => { + expect(remote.generateCheckpoint.mock.calls.length).equals(1); + }); + + // Completing the upload should also make the checkpoint visible without it being sent again. + await vi.waitFor(async () => { + await expectUserRows(2); + }); + }); }); function describeStreamingTests(createConnectedDatabase: () => Promise) { diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index 5e12bd573..e29cf313b 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -18,6 +18,7 @@ import { WebPowerSyncOpenFactoryOptions, WebStreamingSyncImplementation } from '@powersync/web'; +import { MockedFunction, vi } from 'vitest'; export class TestConnector implements PowerSyncBackendConnector { async fetchCredentials(): Promise { @@ -35,12 +36,21 @@ export class TestConnector implements PowerSyncBackendConnector { export class MockRemote extends AbstractRemote { streamController: ReadableStreamDefaultController | null; errorOnStreamStart = false; + generateCheckpoint: MockedFunction<() => any>; + constructor( connector: RemoteConnector, protected onStreamRequested: () => void ) { super(connector); this.streamController = null; + this.generateCheckpoint = vi.fn(() => { + return { + data: { + write_checkpoint: '1000' + } + }; + }); } async getBSON(): Promise { @@ -53,11 +63,7 @@ export class MockRemote extends AbstractRemote { async get(path: string, headers?: Record | undefined): Promise { // mock a response for write checkpoint API if (path.includes('checkpoint')) { - return { - data: { - write_checkpoint: '1000' - } - }; + return this.generateCheckpoint(); } throw new Error('Not implemented'); }