Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/violet-falcons-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fix a race condition causing sync changes during uploads not to be applied.
2 changes: 1 addition & 1 deletion packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { throttleTrailing } from '../utils/throttle.js';
import { throttleTrailing } from '../utils/async.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Logger, { ILogger } from 'js-logger';

import { SyncPriorityStatus, SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
Expand Down Expand Up @@ -161,6 +161,7 @@ export abstract class AbstractStreamingSyncImplementation
protected abortController: AbortController | null;
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise<void>;
private pendingCrudUpload?: Promise<void>;

syncStatus: SyncStatus;
triggerCrudUpload: () => void;
Expand All @@ -181,10 +182,16 @@ export abstract class AbstractStreamingSyncImplementation
this.abortController = null;

this.triggerCrudUpload = throttleLeadingTrailing(() => {
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
return;
}
this._uploadAllCrud();

this.pendingCrudUpload = new Promise((resolve) => {
this._uploadAllCrud().finally(() => {
this.pendingCrudUpload = undefined;
resolve();
});
});
}, this.options.crudUploadThrottleMs!);
}

Expand Down Expand Up @@ -582,30 +589,12 @@ The next upload iteration will be delayed.`);
await this.options.adapter.removeBuckets([...bucketsToDelete]);
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
} else if (isStreamingSyncCheckpointComplete(line)) {
this.logger.debug('Checkpoint complete', targetCheckpoint);
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
return { retry: true };
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
// landing here the whole time
} else {
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
this.logger.debug('validated checkpoint', appliedCheckpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadError: undefined
}
});
}

validatedCheckpoint = targetCheckpoint;
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
const priority = line.partial_checkpoint_complete.priority;
Expand All @@ -617,7 +606,8 @@ The next upload iteration will be delayed.`);
await new Promise((resolve) => setTimeout(resolve, 50));
return { retry: true };
} else if (!result.ready) {
// Need more data for a consistent partial sync within a priority - continue waiting.
// If we have pending uploads, we can't complete new checkpoints outside of priority 0.
// We'll resolve this for a complete checkpoint.
} else {
// We'll keep on downloading, but can report that this priority is synced now.
this.logger.debug('partial checkpoint validation succeeded');
Expand Down Expand Up @@ -707,26 +697,13 @@ The next upload iteration will be delayed.`);
}
});
} else if (validatedCheckpoint === targetCheckpoint) {
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
// TODO: Why is this one retry: false? That's the only change from when we receive
// the line above?
return { retry: false };
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
} else if (result.applied) {
appliedCheckpoint = targetCheckpoint;
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
priorityStatusEntries: [],
dataFlow: {
downloading: false,
downloadError: undefined
}
});
}
}
}
Expand All @@ -738,6 +715,51 @@ The next upload iteration will be delayed.`);
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;

if (!result.checkpointValid) {
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { applied: false, endIteration: true };
} else if (!result.ready && pending != null) {
// We have pending entries in the local upload queue or are waiting to confirm a write
// checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
// try again.
this.logger.debug(
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
);
await Promise.race([pending, onAbortPromise(abort)]);

if (abort.aborted) {
return { applied: false, endIteration: true };
}

// Try again now that uploads have completed.
result = await this.options.adapter.syncLocalDatabase(checkpoint);
}

if (result.checkpointValid && result.ready) {
this.logger.debug('validated checkpoint', checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadError: undefined
}
});

return { applied: true, endIteration: false };
} else {
this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.');
return { applied: false, endIteration: false };
}
}

protected updateSyncStatus(options: SyncStatusOptions) {
const updatedStatus = new SyncStatus({
connected: options.connected ?? this.syncStatus.connected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
}
};
}

export function onAbortPromise(signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
if (signal.aborted) {
resolve();
} else {
signal.onabort = () => resolve();
}
});
}
95 changes: 95 additions & 0 deletions packages/web/tests/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,101 @@ describe('Streaming', { sequential: true }, () => {
})
)
);

it('Should handle checkpoints during the upload process', async () => {
const { powersync, remote, uploadSpy } = await generateConnectedDatabase();
expect(powersync.connected).toBe(true);

let resolveUploadPromise: () => void;
let resolveUploadStartedPromise: () => void;
const completeUploadPromise = new Promise<void>((resolve) => {
resolveUploadPromise = resolve;
});
const uploadStartedPromise = new Promise<void>((resolve) => {
resolveUploadStartedPromise = resolve;
});

async function expectUserRows(amount: number) {
const row = await powersync.get<{ r: number }>('SELECT COUNT(*) AS r FROM users');
expect(row.r).toBe(amount);
}

uploadSpy.mockImplementation(async (db) => {
const batch = await db.getCrudBatch();
if (!batch) return;

resolveUploadStartedPromise();
await completeUploadPromise;
await batch?.complete();
});

// trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['from local']);
await expectUserRows(1);
await uploadStartedPromise;

// A connector could have uploaded data (triggering a checkpoint) before finishing
remote.enqueueLine({
checkpoint: {
write_checkpoint: '1',
last_op_id: '2',
buckets: [{ bucket: 'a', priority: 3, checksum: 0 }]
}
});
remote.generateCheckpoint.mockImplementation(() => {
return {
data: {
write_checkpoint: '1'
}
};
});

remote.enqueueLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '1',
op: 'PUT',
object_id: '1',
object_type: 'users',
data: '{"id": "test1", "name": "from local"}'
},
{
checksum: 0,
op_id: '2',
op: 'PUT',
object_id: '2',
object_type: 'users',
data: '{"id": "test1", "name": "additional entry"}'
}
]
}
});
remote.enqueueLine({
checkpoint_complete: {
last_op_id: '2'
}
});

// Give the sync client some time to process these
await new Promise<void>((resolve) => setTimeout(resolve, 500));

// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
await expectUserRows(1);

// Mark the upload as completed. This should trigger a write_checkpoint.json request
resolveUploadPromise!();
await vi.waitFor(() => {
expect(remote.generateCheckpoint.mock.calls.length).equals(1);
});

// Completing the upload should also make the checkpoint visible without it being sent again.
await vi.waitFor(async () => {
await expectUserRows(2);
});
});
});

function describeStreamingTests(createConnectedDatabase: () => Promise<ConnectedDatabaseUtils>) {
Expand Down
16 changes: 11 additions & 5 deletions packages/web/tests/utils/MockStreamOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
WebPowerSyncOpenFactoryOptions,
WebStreamingSyncImplementation
} from '@powersync/web';
import { MockedFunction, vi } from 'vitest';

export class TestConnector implements PowerSyncBackendConnector {
async fetchCredentials(): Promise<PowerSyncCredentials> {
Expand All @@ -35,12 +36,21 @@ export class TestConnector implements PowerSyncBackendConnector {
export class MockRemote extends AbstractRemote {
streamController: ReadableStreamDefaultController<StreamingSyncLine> | null;
errorOnStreamStart = false;
generateCheckpoint: MockedFunction<() => any>;

constructor(
connector: RemoteConnector,
protected onStreamRequested: () => void
) {
super(connector);
this.streamController = null;
this.generateCheckpoint = vi.fn(() => {
return {
data: {
write_checkpoint: '1000'
}
};
});
}

async getBSON(): Promise<BSONImplementation> {
Expand All @@ -53,11 +63,7 @@ export class MockRemote extends AbstractRemote {
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
// mock a response for write checkpoint API
if (path.includes('checkpoint')) {
return {
data: {
write_checkpoint: '1000'
}
};
return this.generateCheckpoint();
}
throw new Error('Not implemented');
}
Expand Down