Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,8 @@ The next upload iteration will be delayed.`);

if (isStreamingSyncCheckpoint(line)) {
targetCheckpoint = line.checkpoint;
// New checkpoint - existing validated checkpoint is no longer valid
pendingValidatedCheckpoint = null;
const bucketsToDelete = new Set<string>(bucketMap.keys());
const newBuckets = new Map<string, BucketDescription>();
for (const checksum of line.checkpoint.buckets) {
Expand All @@ -737,7 +739,13 @@ The next upload iteration will be delayed.`);
if (result.endIteration) {
return;
} else if (!result.applied) {
// "Could not apply checkpoint due to local data". We need to retry after
// finishing uploads.
pendingValidatedCheckpoint = targetCheckpoint;
} else {
// Nothing to retry later. This would likely already be null from the last
// checksum or checksum_diff operation, but we make sure.
pendingValidatedCheckpoint = null;
}
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
const priority = line.partial_checkpoint_complete.priority;
Expand Down Expand Up @@ -773,6 +781,8 @@ The next upload iteration will be delayed.`);
if (targetCheckpoint == null) {
throw new Error('Checkpoint diff without previous checkpoint');
}
// New checkpoint - existing validated checkpoint is no longer valid
pendingValidatedCheckpoint = null;
const diff = line.checkpoint_diff;
const newBuckets = new Map<string, BucketChecksum>();
for (const checksum of targetCheckpoint.buckets) {
Expand Down
6 changes: 4 additions & 2 deletions packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
connector: PowerSyncBackendConnector,
options: NodeAdditionalConnectionOptions
): AbstractStreamingSyncImplementation {
const remote = new NodeRemote(connector, this.options.logger, {
const logger = this.options.logger;
const remote = new NodeRemote(connector, logger, {
dispatcher: options.dispatcher,
...(this.options as NodePowerSyncDatabaseOptions).remoteOptions
});
Expand All @@ -92,7 +93,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
},
retryDelayMs: this.options.retryDelayMs,
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
identifier: this.database.name
identifier: this.database.name,
logger
});
}
}
80 changes: 80 additions & 0 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { describe, vi, expect, beforeEach } from 'vitest';
import util from 'node:util';

import { MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';
import {
AbstractPowerSyncDatabase,
BucketChecksum,
createLogger,
OplogEntryJSON,
PowerSyncConnectionOptions,
ProgressWithOperations,
Expand Down Expand Up @@ -537,6 +539,84 @@ function defineSyncTests(impl: SyncClientImplementation) {
expect(rows).toStrictEqual([{ name: 'from server' }]);
});

mockSyncServiceTest('handles uploads across checkpoints', async ({ syncService }) => {
const logger = createLogger('test', { logLevel: Logger.TRACE });
const logMessages: string[] = [];
(logger as any).invoke = (level, args) => {
console.log(...args);
logMessages.push(util.format(...args));
};

// Regression test for https://github.com/powersync-ja/powersync-js/pull/665
let database = await syncService.createDatabase({ logger });
const connector = new TestConnector();
let finishUpload: () => void;
const finishUploadPromise = new Promise<void>((resolve, reject) => {
finishUpload = resolve;
});
connector.uploadData = async (db) => {
const batch = await db.getCrudBatch();
if (batch != null) {
await finishUploadPromise;
await batch.complete();
}
};

await database.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?);', ['local']);
database.connect(connector, options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } });
syncService.pushLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '1',
op: 'PUT',
object_id: '1',
object_type: 'lists',
data: '{"name": "s1"}'
}
]
}
});
// 1. Could not apply checkpoint due to local data. We will retry [...] after that upload is completed.
syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } });
await vi.waitFor(() => {
expect(logMessages).toEqual(expect.arrayContaining([expect.stringContaining('due to local data')]));
});

// 2. Send additional checkpoint while we're still busy uploading
syncService.pushLine({ checkpoint: { last_op_id: '2', write_checkpoint: '2', buckets: [bucket('a', 2)] } });
syncService.pushLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '2',
op: 'PUT',
object_id: '2',
object_type: 'lists',
data: '{"name": "s2"}'
}
]
}
});
syncService.pushLine({ checkpoint_complete: { last_op_id: '2' } });

// 3. Crud upload complete
finishUpload!();

// 4. Ensure the database is applying the second checkpoint
await vi.waitFor(async () => {
const rows = await database.getAll('SELECT * FROM lists WHERE name = ?', ['s2']);
expect(rows).toHaveLength(1);
});
});

mockSyncServiceTest('should update sync state incrementally', async ({ syncService }) => {
const powersync = await syncService.createDatabase();
powersync.connect(new TestConnector(), options);
Expand Down