Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/violet-falcons-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fix a race condition causing sync changes during uploads not to be applied.
2 changes: 1 addition & 1 deletion packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { throttleTrailing } from '../utils/throttle.js';
import { throttleTrailing } from '../utils/async.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Logger, { ILogger } from 'js-logger';

import { SyncPriorityStatus, SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
Expand Down Expand Up @@ -161,6 +161,7 @@ export abstract class AbstractStreamingSyncImplementation
protected abortController: AbortController | null;
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise<void>;
private pendingCrudUpload?: Promise<void>;

syncStatus: SyncStatus;
triggerCrudUpload: () => void;
Expand All @@ -181,10 +182,16 @@ export abstract class AbstractStreamingSyncImplementation
this.abortController = null;

this.triggerCrudUpload = throttleLeadingTrailing(() => {
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
return;
}
this._uploadAllCrud();

this.pendingCrudUpload = new Promise((resolve) => {
this._uploadAllCrud().finally(() => {
this.pendingCrudUpload = undefined;
resolve();
});
});
}, this.options.crudUploadThrottleMs!);
}

Expand Down Expand Up @@ -434,16 +441,8 @@ The next upload iteration will be delayed.`);
if (signal?.aborted) {
break;
}
const { retry } = await this.streamingSyncIteration(nestedAbortController.signal, options);
if (!retry) {
/**
* A sync error ocurred that we cannot recover from here.
* This loop must terminate.
* The nestedAbortController will close any open network requests and streams below.
*/
break;
}
// Continue immediately
await this.streamingSyncIteration(nestedAbortController.signal, options);
// Continue immediately, streamingSyncIteration will wait before completing if necessary.
} catch (ex) {
/**
* Either:
Expand Down Expand Up @@ -501,8 +500,8 @@ The next upload iteration will be delayed.`);
protected async streamingSyncIteration(
signal: AbortSignal,
options?: PowerSyncConnectionOptions
): Promise<{ retry?: boolean }> {
return await this.obtainLock({
): Promise<void> {
await this.obtainLock({
type: LockType.SYNC,
signal,
callback: async () => {
Expand Down Expand Up @@ -552,7 +551,7 @@ The next upload iteration will be delayed.`);
const line = await stream.read();
if (!line) {
// The stream has closed while waiting
return { retry: true };
return;
}

// A connection is active and messages are being received
Expand Down Expand Up @@ -582,30 +581,12 @@ The next upload iteration will be delayed.`);
await this.options.adapter.removeBuckets([...bucketsToDelete]);
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
} else if (isStreamingSyncCheckpointComplete(line)) {
this.logger.debug('Checkpoint complete', targetCheckpoint);
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { retry: true };
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
// landing here the whole time
} else {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
return;
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
this.logger.debug('validated checkpoint', appliedCheckpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadError: undefined
}
});
}

validatedCheckpoint = targetCheckpoint;
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
const priority = line.partial_checkpoint_complete.priority;
Expand All @@ -615,9 +596,10 @@ The next upload iteration will be delayed.`);
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { retry: true };
return;
} else if (!result.ready) {
// Need more data for a consistent partial sync within a priority - continue waiting.
// If we have pending uploads, we can't complete new checkpoints outside of priority 0.
// We'll resolve this for a complete checkpoint.
} else {
// We'll keep on downloading, but can report that this priority is synced now.
this.logger.debug('partial checkpoint validation succeeded');
Expand Down Expand Up @@ -691,7 +673,7 @@ The next upload iteration will be delayed.`);
* (uses the same one), this should have some delay.
*/
await this.delayRetry();
return { retry: true };
return ;
}
this.triggerCrudUpload();
} else {
Expand All @@ -707,37 +689,67 @@ The next upload iteration will be delayed.`);
}
});
} else if (validatedCheckpoint === targetCheckpoint) {
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { retry: false };
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
return;
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
priorityStatusEntries: [],
dataFlow: {
downloading: false,
downloadError: undefined
}
});
}
}
}
}
this.logger.debug('Stream input empty');
// Connection closed. Likely due to auth issue.
return { retry: true };
return;
}
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;

if (!result.checkpointValid) {
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { applied: false, endIteration: true };
} else if (!result.ready && pending != null) {
// We have pending entries in the local upload queue or are waiting to confirm a write
// checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
// try again.
this.logger.debug(
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
);
await Promise.race([pending, onAbortPromise(abort)]);

if (abort.aborted) {
return { applied: false, endIteration: true };
}

// Try again now that uploads have completed.
result = await this.options.adapter.syncLocalDatabase(checkpoint);
}

if (result.checkpointValid && result.ready) {
this.logger.debug('validated checkpoint', checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadError: undefined
}
});

return { applied: true, endIteration: false };
} else {
this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.');
return { applied: false, endIteration: false };
}
}

protected updateSyncStatus(options: SyncStatusOptions) {
const updatedStatus = new SyncStatus({
connected: options.connected ?? this.syncStatus.connected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
}
};
}

export function onAbortPromise(signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
if (signal.aborted) {
resolve();
} else {
signal.onabort = () => resolve();
}
});
}
95 changes: 95 additions & 0 deletions packages/web/tests/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,101 @@ describe('Streaming', { sequential: true }, () => {
})
)
);

it('Should handle checkpoints during the upload process', async () => {
const { powersync, remote, uploadSpy } = await generateConnectedDatabase();
expect(powersync.connected).toBe(true);

let resolveUploadPromise: () => void;
let resolveUploadStartedPromise: () => void;
const completeUploadPromise = new Promise<void>((resolve) => {
resolveUploadPromise = resolve;
});
const uploadStartedPromise = new Promise<void>((resolve) => {
resolveUploadStartedPromise = resolve;
});

async function expectUserRows(amount: number) {
const row = await powersync.get<{ r: number }>('SELECT COUNT(*) AS r FROM users');
expect(row.r).toBe(amount);
}

uploadSpy.mockImplementation(async (db) => {
const batch = await db.getCrudBatch();
if (!batch) return;

resolveUploadStartedPromise();
await completeUploadPromise;
await batch?.complete();
});

// trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['from local']);
await expectUserRows(1);
await uploadStartedPromise;

// A connector could have uploaded data (triggering a checkpoint) before finishing
remote.enqueueLine({
checkpoint: {
write_checkpoint: '1',
last_op_id: '2',
buckets: [{ bucket: 'a', priority: 3, checksum: 0 }]
}
});
remote.generateCheckpoint.mockImplementation(() => {
return {
data: {
write_checkpoint: '1'
}
};
});

remote.enqueueLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '1',
op: 'PUT',
object_id: '1',
object_type: 'users',
data: '{"id": "test1", "name": "from local"}'
},
{
checksum: 0,
op_id: '2',
op: 'PUT',
object_id: '2',
object_type: 'users',
data: '{"id": "test1", "name": "additional entry"}'
}
]
}
});
remote.enqueueLine({
checkpoint_complete: {
last_op_id: '2'
}
});

// Give the sync client some time to process these
await new Promise<void>((resolve) => setTimeout(resolve, 500));

// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
await expectUserRows(1);

// Mark the upload as completed. This should trigger a write_checkpoint.json request
resolveUploadPromise!();
await vi.waitFor(() => {
expect(remote.generateCheckpoint.mock.calls.length).equals(1);
});

// Completing the upload should also make the checkpoint visible without it being sent again.
await vi.waitFor(async () => {
await expectUserRows(2);
});
});
});

function describeStreamingTests(createConnectedDatabase: () => Promise<ConnectedDatabaseUtils>) {
Expand Down
16 changes: 11 additions & 5 deletions packages/web/tests/utils/MockStreamOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
WebPowerSyncOpenFactoryOptions,
WebStreamingSyncImplementation
} from '@powersync/web';
import { MockedFunction, vi } from 'vitest';

export class TestConnector implements PowerSyncBackendConnector {
async fetchCredentials(): Promise<PowerSyncCredentials> {
Expand All @@ -35,12 +36,21 @@ export class TestConnector implements PowerSyncBackendConnector {
export class MockRemote extends AbstractRemote {
streamController: ReadableStreamDefaultController<StreamingSyncLine> | null;
errorOnStreamStart = false;
generateCheckpoint: MockedFunction<() => any>;

constructor(
connector: RemoteConnector,
protected onStreamRequested: () => void
) {
super(connector);
this.streamController = null;
this.generateCheckpoint = vi.fn(() => {
return {
data: {
write_checkpoint: '1000'
}
};
});
}

async getBSON(): Promise<BSONImplementation> {
Expand All @@ -53,11 +63,7 @@ export class MockRemote extends AbstractRemote {
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
// mock a response for write checkpoint API
if (path.includes('checkpoint')) {
return {
data: {
write_checkpoint: '1000'
}
};
return this.generateCheckpoint();
}
throw new Error('Not implemented');
}
Expand Down