Skip to content

Commit 650555d

Browse files
committed
Test partial priority with data
1 parent 0bb65a2 commit 650555d

File tree

3 files changed

+79
-10
lines changed

3 files changed

+79
-10
lines changed

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
205205
async validateChecksums(checkpoint: Checkpoint, priority: number | undefined): Promise<SyncLocalDatabaseResult> {
206206
if (priority !== undefined) {
207207
// Only validate the buckets within the priority we care about
208-
const newBuckets = [...checkpoint.buckets];
209-
newBuckets.filter((cs) => cs.priority <= priority);
208+
const newBuckets = checkpoint.buckets.filter((cs) => cs.priority <= priority);
210209
checkpoint = {...checkpoint, buckets: newBuckets};
211210
}
212211

@@ -215,7 +214,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
215214
]);
216215

217216
const resultItem = rs.rows?.item(0);
218-
this.logger.debug('validateChecksums result item', resultItem);
217+
this.logger.debug('validateChecksums priority, checkpoint, result item', priority, checkpoint, resultItem);
219218
if (!resultItem) {
220219
return {
221220
checkpointValid: false,

packages/web/tests/stream.test.ts

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,34 +165,100 @@ describe('Streaming', () => {
165165

166166
const buckets: BucketChecksum[] = [];
167167
for (let prio = 0; prio <= 3; prio++) {
168-
buckets.push({bucket: `prio${prio}`, priority: prio, checksum: 0});
168+
buckets.push({bucket: `prio${prio}`, priority: prio, checksum: 10 + prio});
169169
}
170170
remote.enqueueLine({
171171
checkpoint: {
172-
last_op_id: '0',
172+
last_op_id: '4',
173173
buckets,
174174
},
175175
});
176176

177+
let operationId = 1;
178+
const addRow = (prio: number) => {
179+
remote.enqueueLine({
180+
data: {
181+
bucket: `prio${prio}`,
182+
data: [{
183+
checksum: prio + 10,
184+
data: JSON.stringify({'name': 'row'}),
185+
op: 'PUT',
186+
op_id: (operationId++).toString(),
187+
object_id: `prio${prio}`,
188+
object_type: 'users'
189+
}]
190+
},
191+
});
192+
}
193+
194+
const syncCompleted = vi.fn();
195+
powersync.waitForFirstSync().then(syncCompleted);
196+
177197
// Emit partial sync complete for each priority but the last.
178198
for (var prio = 0; prio < 3; prio++) {
199+
const partialSyncCompleted = vi.fn();
200+
powersync.waitForFirstSync({priority: prio}).then(partialSyncCompleted);
179201
expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false);
202+
expect(partialSyncCompleted).not.toHaveBeenCalled();
203+
expect(syncCompleted).not.toHaveBeenCalled();
180204

205+
addRow(prio);
181206
remote.enqueueLine({
182207
partial_checkpoint_complete: {
183-
last_op_id: '0',
208+
last_op_id: operationId.toString(),
184209
priority: prio,
185210
}
186211
});
187212

188213
await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => {
189214
return status.statusForPriority(prio).hasSynced === true;
190215
});
216+
await new Promise(r => setTimeout(r));
217+
expect(partialSyncCompleted).toHaveBeenCalledOnce();
218+
219+
expect(await powersync.getAll('select * from users')).toHaveLength(prio + 1);
191220
}
192221

193222
// Then, complete the sync.
194-
remote.enqueueLine({checkpoint_complete: {last_op_id: '0'}});
195-
await powersync.waitForFirstSync();
223+
addRow(3);
224+
remote.enqueueLine({checkpoint_complete: {last_op_id: operationId.toString()}});
225+
await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500);
226+
expect(await powersync.getAll('select * from users')).toHaveLength(4);
227+
});
228+
229+
itWithGenerators('Should remember sync state', async (createConnectedDatabase) => {
230+
const { powersync, remote, openAnother } = await createConnectedDatabase();
231+
expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false);
232+
233+
const buckets: BucketChecksum[] = [];
234+
for (let prio = 0; prio <= 3; prio++) {
235+
buckets.push({bucket: `prio${prio}`, priority: prio, checksum: 0});
236+
}
237+
remote.enqueueLine({
238+
checkpoint: {
239+
last_op_id: '0',
240+
buckets,
241+
},
242+
});
243+
remote.enqueueLine({
244+
partial_checkpoint_complete: {
245+
last_op_id: '0',
246+
priority: 0,
247+
}
248+
});
249+
250+
await powersync.waitForFirstSync({priority: 0});
251+
252+
// Open another database instance.
253+
const another = openAnother();
254+
onTestFinished(async () => {
255+
await another.close();
256+
});
257+
await another.init();
258+
259+
expect(another.currentStatus.statusInPriority).toHaveLength(1);
260+
expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy();
261+
await another.waitForFirstSync({priority: 0});
196262
});
197263
});
198264
});

packages/web/tests/utils/generateConnectedDatabase.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ export async function generateConnectedDatabase(
5151
},
5252
remote
5353
);
54-
const powersync = factory.getInstance();
54+
55+
const openAnother = factory.getInstance.bind(factory);
56+
const powersync = openAnother();
5557

5658
const waitForStream = () =>
5759
new Promise<void>((resolve) => {
@@ -75,6 +77,7 @@ export async function generateConnectedDatabase(
7577
await connectedPromise;
7678
};
7779

80+
7881
await connect();
7982

8083
onTestFinished(async () => {
@@ -89,6 +92,7 @@ export async function generateConnectedDatabase(
8992
powersync,
9093
remote,
9194
uploadSpy,
92-
waitForStream
95+
waitForStream,
96+
openAnother,
9397
};
9498
}

0 commit comments

Comments
 (0)