Skip to content

Commit ca2f83a

Browse files
committed
Fixes.
1 parent dde29e3 commit ca2f83a

File tree

8 files changed

+138
-74
lines changed

8 files changed

+138
-74
lines changed

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as lib_postgres from '@powersync/lib-service-postgres';
22
import { DisposableObserver, ReplicationAssertionError } from '@powersync/lib-services-framework';
33
import {
44
BroadcastIterable,
5+
CHECKPOINT_INVALIDATE_ALL,
56
CheckpointChanges,
67
GetCheckpointChangesOptions,
78
LastValueSink,
@@ -735,12 +736,7 @@ export class PostgresSyncRulesStorage
735736
yield {
736737
base: cp,
737738
writeCheckpoint: currentWriteCheckpoint,
738-
update: {
739-
invalidateDataBuckets: true,
740-
invalidateParameterBuckets: true,
741-
updatedDataBuckets: [],
742-
updatedParameterBucketDefinitions: []
743-
}
739+
update: CHECKPOINT_INVALIDATE_ALL
744740
};
745741
}
746742
}
@@ -805,12 +801,7 @@ export class PostgresSyncRulesStorage
805801

806802
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
807803
// We do not track individual changes yet
808-
return {
809-
invalidateDataBuckets: true,
810-
invalidateParameterBuckets: true,
811-
updatedDataBuckets: [],
812-
updatedParameterBucketDefinitions: []
813-
};
804+
return CHECKPOINT_INVALIDATE_ALL;
814805
}
815806

816807
private makeActiveCheckpoint(row: models.ActiveCheckpointDecoded | null) {

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,10 @@ export interface CheckpointChanges {
247247
updatedParameterBucketDefinitions: string[];
248248
invalidateParameterBuckets: boolean;
249249
}
250+
251+
export const CHECKPOINT_INVALIDATE_ALL: CheckpointChanges = {
252+
updatedDataBuckets: [],
253+
invalidateDataBuckets: true,
254+
updatedParameterBucketDefinitions: [],
255+
invalidateParameterBuckets: true
256+
};

packages/service-core/test/src/sync/BucketChecksumState.test.ts

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
BucketChecksum,
33
BucketChecksumState,
44
BucketChecksumStateStorage,
5+
CHECKPOINT_INVALIDATE_ALL,
56
ChecksumMap,
67
OpId,
78
WatchFilterEvent
@@ -56,10 +57,11 @@ bucket_definitions:
5657
bucketStorage: storage
5758
});
5859

59-
// This simulates a checkpoint subscription
60-
storage.filter = state.checkpointFilter;
61-
62-
const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!;
60+
const line = (await state.buildNextCheckpointLine({
61+
base: { checkpoint: '1', lsn: '1' },
62+
writeCheckpoint: null,
63+
update: CHECKPOINT_INVALIDATE_ALL
64+
}))!;
6365
expect(line.checkpointLine).toEqual({
6466
checkpoint: {
6567
buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3 }],
@@ -85,7 +87,13 @@ bucket_definitions:
8587
// Now we get a new line
8688
const line2 = (await state.buildNextCheckpointLine({
8789
base: { checkpoint: '2', lsn: '2' },
88-
writeCheckpoint: null
90+
writeCheckpoint: null,
91+
update: {
92+
updatedDataBuckets: ['global[]'],
93+
invalidateDataBuckets: false,
94+
updatedParameterBucketDefinitions: [],
95+
invalidateParameterBuckets: false
96+
}
8997
}))!;
9098
expect(line2.checkpointLine).toEqual({
9199
checkpoint_diff: {
@@ -114,9 +122,11 @@ bucket_definitions:
114122
bucketStorage: storage
115123
});
116124

117-
storage.filter = state.checkpointFilter;
118-
119-
const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!;
125+
const line = (await state.buildNextCheckpointLine({
126+
base: { checkpoint: '1', lsn: '1' },
127+
writeCheckpoint: null,
128+
update: CHECKPOINT_INVALIDATE_ALL
129+
}))!;
120130
expect(line.checkpointLine).toEqual({
121131
checkpoint: {
122132
buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3 }],
@@ -146,9 +156,11 @@ bucket_definitions:
146156
bucketStorage: storage
147157
});
148158

149-
storage.filter = state.checkpointFilter;
150-
151-
const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!;
159+
const line = (await state.buildNextCheckpointLine({
160+
base: { checkpoint: '1', lsn: '1' },
161+
writeCheckpoint: null,
162+
update: CHECKPOINT_INVALIDATE_ALL
163+
}))!;
152164
expect(line.checkpointLine).toEqual({
153165
checkpoint: {
154166
buckets: [
@@ -175,7 +187,12 @@ bucket_definitions:
175187

176188
const line2 = (await state.buildNextCheckpointLine({
177189
base: { checkpoint: '2', lsn: '2' },
178-
writeCheckpoint: null
190+
writeCheckpoint: null,
191+
update: {
192+
...CHECKPOINT_INVALIDATE_ALL,
193+
updatedDataBuckets: ['global[1]', 'global[2]'],
194+
invalidateDataBuckets: false
195+
}
179196
}))!;
180197
expect(line2.checkpointLine).toEqual({
181198
checkpoint_diff: {
@@ -204,10 +221,13 @@ bucket_definitions:
204221
bucketStorage: storage
205222
});
206223

207-
storage.filter = state.checkpointFilter;
208224
storage.updateTestChecksum({ bucket: 'global[]', checksum: 1, count: 1 });
209225

210-
const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!;
226+
const line = (await state.buildNextCheckpointLine({
227+
base: { checkpoint: '1', lsn: '1' },
228+
writeCheckpoint: null,
229+
update: CHECKPOINT_INVALIDATE_ALL
230+
}))!;
211231
expect(line.checkpointLine).toEqual({
212232
checkpoint: {
213233
buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3 }],
@@ -241,22 +261,29 @@ bucket_definitions:
241261
// We specifically do not set this here, so that we have manual control over the events.
242262
// storage.filter = state.checkpointFilter;
243263

244-
await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null });
264+
await state.buildNextCheckpointLine({
265+
base: { checkpoint: '1', lsn: '1' },
266+
writeCheckpoint: null,
267+
update: CHECKPOINT_INVALIDATE_ALL
268+
});
245269

246270
state.updateBucketPosition({ bucket: 'global[1]', nextAfter: '1', hasMore: false });
247271
state.updateBucketPosition({ bucket: 'global[2]', nextAfter: '1', hasMore: false });
248272

249273
storage.updateTestChecksum({ bucket: 'global[1]', checksum: 2, count: 2 });
250274
storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 });
251275

252-
// Invalidate the state for global[1] - will only re-check the single bucket.
253-
// This is essentially inconsistent state, but is the simplest way to test that
254-
// the filter is working.
255-
state.checkpointFilter({ changedDataBucket: 'global[1]' });
256-
257276
const line2 = (await state.buildNextCheckpointLine({
258277
base: { checkpoint: '2', lsn: '2' },
259-
writeCheckpoint: null
278+
writeCheckpoint: null,
279+
update: {
280+
...CHECKPOINT_INVALIDATE_ALL,
281+
// Invalidate the state for global[1] - will only re-check the single bucket.
282+
// This is essentially inconsistent state, but is the simplest way to test that
283+
// the filter is working.
284+
updatedDataBuckets: ['global[1]'],
285+
invalidateDataBuckets: false
286+
}
260287
}))!;
261288
expect(line2.checkpointLine).toEqual({
262289
checkpoint_diff: {
@@ -289,17 +316,20 @@ bucket_definitions:
289316
storage.updateTestChecksum({ bucket: 'global[1]', checksum: 1, count: 1 });
290317
storage.updateTestChecksum({ bucket: 'global[2]', checksum: 1, count: 1 });
291318

292-
await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null });
319+
await state.buildNextCheckpointLine({
320+
base: { checkpoint: '1', lsn: '1' },
321+
writeCheckpoint: null,
322+
update: CHECKPOINT_INVALIDATE_ALL
323+
});
293324

294325
storage.updateTestChecksum({ bucket: 'global[1]', checksum: 2, count: 2 });
295326
storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 });
296327

297-
// Invalidate the state - will re-check all buckets
298-
state.checkpointFilter({ invalidate: true });
299-
300328
const line2 = (await state.buildNextCheckpointLine({
301329
base: { checkpoint: '2', lsn: '2' },
302-
writeCheckpoint: null
330+
writeCheckpoint: null,
331+
// Invalidate the state - will re-check all buckets
332+
update: CHECKPOINT_INVALIDATE_ALL
303333
}))!;
304334
expect(line2.checkpointLine).toEqual({
305335
checkpoint_diff: {
@@ -330,9 +360,11 @@ bucket_definitions:
330360
bucketStorage: storage
331361
});
332362

333-
storage.filter = state.checkpointFilter;
334-
335-
const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '3', lsn: '3' }, writeCheckpoint: null }))!;
363+
const line = (await state.buildNextCheckpointLine({
364+
base: { checkpoint: '3', lsn: '3' },
365+
writeCheckpoint: null,
366+
update: CHECKPOINT_INVALIDATE_ALL
367+
}))!;
336368
expect(line.checkpointLine).toEqual({
337369
checkpoint: {
338370
buckets: [
@@ -353,6 +385,7 @@ bucket_definitions:
353385
priority: 3
354386
}
355387
]);
388+
356389
// This is the bucket data to be fetched
357390
expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(
358391
new Map([
@@ -369,7 +402,12 @@ bucket_definitions:
369402

370403
const line2 = (await state.buildNextCheckpointLine({
371404
base: { checkpoint: '4', lsn: '4' },
372-
writeCheckpoint: null
405+
writeCheckpoint: null,
406+
update: {
407+
...CHECKPOINT_INVALIDATE_ALL,
408+
invalidateDataBuckets: false,
409+
updatedDataBuckets: ['global[1]']
410+
}
373411
}))!;
374412
expect(line2.checkpointLine).toEqual({
375413
checkpoint_diff: {
@@ -419,16 +457,17 @@ bucket_definitions:
419457
bucketStorage: storage
420458
});
421459

422-
// This simulates a checkpoint subscription
423-
storage.filter = state.checkpointFilter;
424-
425460
storage.getParameterSets = async (checkpoint: OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> => {
426461
expect(checkpoint).toEqual('1');
427462
expect(lookups).toEqual([['by_project', '1', 'u1']]);
428463
return [{ id: 1 }, { id: 2 }];
429464
};
430465

431-
const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!;
466+
const line = (await state.buildNextCheckpointLine({
467+
base: { checkpoint: '1', lsn: '1' },
468+
writeCheckpoint: null,
469+
update: CHECKPOINT_INVALIDATE_ALL
470+
}))!;
432471
expect(line.checkpointLine).toEqual({
433472
checkpoint: {
434473
buckets: [
@@ -460,8 +499,6 @@ bucket_definitions:
460499
state.updateBucketPosition({ bucket: 'by_project[1]', nextAfter: '1', hasMore: false });
461500
state.updateBucketPosition({ bucket: 'by_project[2]', nextAfter: '1', hasMore: false });
462501

463-
state.checkpointFilter({ changedParameterBucketDefinition: 'by_project' });
464-
465502
storage.getParameterSets = async (checkpoint: OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> => {
466503
expect(checkpoint).toEqual('2');
467504
expect(lookups).toEqual([['by_project', '1', 'u1']]);
@@ -471,7 +508,13 @@ bucket_definitions:
471508
// Now we get a new line
472509
const line2 = (await state.buildNextCheckpointLine({
473510
base: { checkpoint: '2', lsn: '2' },
474-
writeCheckpoint: null
511+
writeCheckpoint: null,
512+
update: {
513+
invalidateDataBuckets: false,
514+
updatedDataBuckets: [],
515+
updatedParameterBucketDefinitions: ['by_project'],
516+
invalidateParameterBuckets: false
517+
}
475518
}))!;
476519
expect(line2.checkpointLine).toEqual({
477520
checkpoint_diff: {

packages/sync-rules/src/BucketParameterQuerier.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export interface QueryBucketDescriptorOptions extends ParameterLookupSource {
4747
}
4848

4949
export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]): BucketParameterQuerier {
50-
const dynamicBucketDefinitions = new Set<string>(...queriers.flatMap((q) => [...q.dynamicBucketDefinitions]));
50+
const dynamicBucketDefinitions = new Set<string>(queriers.flatMap((q) => [...q.dynamicBucketDefinitions]));
5151
return {
5252
staticBuckets: queriers.flatMap((q) => q.staticBuckets),
5353
hasDynamicBuckets: dynamicBucketDefinitions.size > 0,

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,6 @@ export class SqlBucketDescriptor {
125125
return mergeBucketParameterQueriers([staticQuerier, ...dynamicQueriers]);
126126
}
127127

128-
/**
129-
* Return bucket definition names for all static buckets.
130-
*/
131-
hasStaticBuckets(): boolean {
132-
return this.global_parameter_queries.length > 0;
133-
}
134-
135128
getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] {
136129
let results: BucketDescription[] = [];
137130
for (let query of this.global_parameter_queries) {

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ export class SqlParameterQuery {
383383
return {
384384
staticBuckets: [],
385385
hasDynamicBuckets: true,
386-
dynamicBucketDefinitions: new Set<string>(this.descriptor_name!),
386+
dynamicBucketDefinitions: new Set<string>([this.descriptor_name!]),
387387
queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => {
388388
const bucketParameters = await source.getParameterSets(lookups);
389389
return this.resolveBucketDescriptions(bucketParameters, requestParameters);

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -326,20 +326,6 @@ export class SqlSyncRules implements SyncRules {
326326
return mergeBucketParameterQueriers(queriers);
327327
}
328328

329-
/**
330-
* Return bucket definition names for all static buckets.
331-
*/
332-
getStaticBucketDefinitionList(): string[] {
333-
let results: string[] = [];
334-
335-
for (let query of this.bucket_descriptors) {
336-
if (query.hasStaticBuckets()) {
337-
results.push(query.name);
338-
}
339-
}
340-
return results;
341-
}
342-
343329
getSourceTables(): TablePattern[] {
344330
const sourceTables = new Map<String, TablePattern>();
345331
for (const bucket of this.bucket_descriptors) {

0 commit comments

Comments
 (0)