Skip to content

Commit 19e6dd5

Browse files
committed
Move getParameterSets to ReplicationCheckpoint.
1 parent d7374e9 commit 19e6dd5

File tree

6 files changed

+94
-87
lines changed

6 files changed

+94
-87
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,9 @@ export class MongoSyncBucketStorage
113113
}
114114
);
115115
if (!doc?.snapshot_done) {
116-
return {
117-
checkpoint: 0n,
118-
lsn: null
119-
};
116+
return new MongoReplicationCheckpoint(this, 0n, null);
120117
}
121-
return {
122-
checkpoint: doc?.last_checkpoint ?? 0n,
123-
lsn: doc?.last_checkpoint_lsn ?? null
124-
};
118+
return new MongoReplicationCheckpoint(this, doc.last_checkpoint ?? 0n, doc.last_checkpoint_lsn ?? null);
125119
}
126120

127121
async startBatch(
@@ -661,11 +655,8 @@ export class MongoSyncBucketStorage
661655
return new MongoCompactor(this.db, this.group_id, options).compact();
662656
}
663657

664-
private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) {
665-
return {
666-
checkpoint: doc?.last_checkpoint ?? 0n,
667-
lsn: doc?.last_checkpoint_lsn ?? null
668-
};
658+
private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null): ReplicationCheckpoint {
659+
return new MongoReplicationCheckpoint(this, doc?.last_checkpoint ?? 0n, doc?.last_checkpoint_lsn ?? null);
669660
}
670661

671662
/**
@@ -980,3 +971,15 @@ interface InternalCheckpointChanges extends CheckpointChanges {
980971
updatedWriteCheckpoints: Map<string, bigint>;
981972
invalidateWriteCheckpoints: boolean;
982973
}
974+
975+
class MongoReplicationCheckpoint implements ReplicationCheckpoint {
976+
constructor(
977+
private storage: MongoSyncBucketStorage,
978+
public readonly checkpoint: InternalOpId,
979+
public readonly lsn: string | null
980+
) {}
981+
982+
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
983+
return this.storage.getParameterSets(this, lookups);
984+
}
985+
}

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,11 @@ export class PostgresSyncRulesStorage
137137
.decoded(pick(models.SyncRules, ['last_checkpoint', 'last_checkpoint_lsn']))
138138
.first();
139139

140-
return {
141-
checkpoint: checkpointRow?.last_checkpoint ?? 0n,
142-
lsn: checkpointRow?.last_checkpoint_lsn ?? null
143-
};
140+
return new PostgresReplicationCheckpoint(
141+
this,
142+
checkpointRow?.last_checkpoint ?? 0n,
143+
checkpointRow?.last_checkpoint_lsn ?? null
144+
);
144145
}
145146

146147
async resolveTable(options: storage.ResolveTableOptions): Promise<storage.ResolveTableResult> {
@@ -834,9 +835,18 @@ export class PostgresSyncRulesStorage
834835
}
835836

836837
private makeActiveCheckpoint(row: models.ActiveCheckpointDecoded | null) {
837-
return {
838-
checkpoint: row?.last_checkpoint ?? 0n,
839-
lsn: row?.last_checkpoint_lsn ?? null
840-
} satisfies storage.ReplicationCheckpoint;
838+
return new PostgresReplicationCheckpoint(this, row?.last_checkpoint ?? 0n, row?.last_checkpoint_lsn ?? null);
839+
}
840+
}
841+
842+
class PostgresReplicationCheckpoint implements storage.ReplicationCheckpoint {
843+
constructor(
844+
private storage: PostgresSyncRulesStorage,
845+
public readonly checkpoint: utils.InternalOpId,
846+
public readonly lsn: string | null
847+
) {}
848+
849+
getParameterSets(lookups: sync_rules.ParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
850+
return this.storage.getParameterSets(this, lookups);
841851
}
842852
}

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ bucket_definitions:
7777
const checkpoint = await bucketStorage.getCheckpoint();
7878
console.log('Checkpoint:', checkpoint);
7979
console.log(await bucketStorage.getStatus());
80-
const parameters = await bucketStorage.getParameterSets(checkpoint, [
81-
ParameterLookup.normalized('mybucket', '1', ['user1'])
82-
]);
80+
const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]);
8381
expect(parameters).toEqual([
8482
{
8583
group_id: 'group1a'
@@ -127,19 +125,15 @@ bucket_definitions:
127125
});
128126
const checkpoint2 = await bucketStorage.getCheckpoint();
129127

130-
const parameters = await bucketStorage.getParameterSets(checkpoint2, [
131-
ParameterLookup.normalized('mybucket', '1', ['user1'])
132-
]);
128+
const parameters = await checkpoint2.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]);
133129
expect(parameters).toEqual([
134130
{
135131
group_id: 'group2'
136132
}
137133
]);
138134

139135
// Use the checkpoint to get older data if relevant
140-
const parameters2 = await bucketStorage.getParameterSets(checkpoint1, [
141-
ParameterLookup.normalized('mybucket', '1', ['user1'])
142-
]);
136+
const parameters2 = await checkpoint1.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]);
143137
expect(parameters2).toEqual([
144138
{
145139
group_id: 'group1'
@@ -207,7 +201,7 @@ bucket_definitions:
207201
// There removal operation for the association of `list2`::`todo2` should not interfere with the new
208202
// association of `list1`::`todo2`
209203
const checkpoint = await bucketStorage.getCheckpoint();
210-
const parameters = await bucketStorage.getParameterSets(checkpoint, [
204+
const parameters = await checkpoint.getParameterSets([
211205
ParameterLookup.normalized('mybucket', '1', ['list1']),
212206
ParameterLookup.normalized('mybucket', '1', ['list2'])
213207
]);
@@ -256,15 +250,15 @@ bucket_definitions:
256250

257251
const checkpoint = await bucketStorage.getCheckpoint();
258252

259-
const parameters1 = await bucketStorage.getParameterSets(checkpoint, [
253+
const parameters1 = await checkpoint.getParameterSets([
260254
ParameterLookup.normalized('mybucket', '1', [314n, 314, 3.14])
261255
]);
262256
expect(parameters1).toEqual([TEST_PARAMS]);
263-
const parameters2 = await bucketStorage.getParameterSets(checkpoint, [
257+
const parameters2 = await checkpoint.getParameterSets([
264258
ParameterLookup.normalized('mybucket', '1', [314, 314n, 3.14])
265259
]);
266260
expect(parameters2).toEqual([TEST_PARAMS]);
267-
const parameters3 = await bucketStorage.getParameterSets(checkpoint, [
261+
const parameters3 = await checkpoint.getParameterSets([
268262
ParameterLookup.normalized('mybucket', '1', [314n, 314, 3])
269263
]);
270264
expect(parameters3).toEqual([]);
@@ -319,7 +313,7 @@ bucket_definitions:
319313

320314
const checkpoint = await bucketStorage.getCheckpoint();
321315

322-
const parameters1 = await bucketStorage.getParameterSets(checkpoint, [
316+
const parameters1 = await checkpoint.getParameterSets([
323317
ParameterLookup.normalized('mybucket', '1', [1152921504606846976n])
324318
]);
325319
expect(parameters1).toEqual([TEST_PARAMS]);
@@ -424,12 +418,12 @@ bucket_definitions:
424418
const lookups = q1.getLookups(parameters);
425419
expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]);
426420

427-
const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups);
421+
const parameter_sets = await checkpoint.getParameterSets(lookups);
428422
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]);
429423

430424
const buckets = await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({
431425
getParameterSets(lookups) {
432-
return bucketStorage.getParameterSets(checkpoint, lookups);
426+
return checkpoint.getParameterSets(lookups);
433427
}
434428
});
435429
expect(buckets).toEqual([{ bucket: 'by_workspace["workspace1"]', priority: 3 }]);
@@ -495,13 +489,13 @@ bucket_definitions:
495489
const lookups = q1.getLookups(parameters);
496490
expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]);
497491

498-
const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups);
492+
const parameter_sets = await checkpoint.getParameterSets(lookups);
499493
parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
500494
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]);
501495

502496
const buckets = await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({
503497
getParameterSets(lookups) {
504-
return bucketStorage.getParameterSets(checkpoint, lookups);
498+
return checkpoint.getParameterSets(lookups);
505499
}
506500
});
507501
buckets.sort((a, b) => a.bucket.localeCompare(b.bucket));
@@ -585,23 +579,23 @@ bucket_definitions:
585579
const lookups1 = q1.getLookups(parameters);
586580
expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]);
587581

588-
const parameter_sets1 = await bucketStorage.getParameterSets(checkpoint, lookups1);
582+
const parameter_sets1 = await checkpoint.getParameterSets(lookups1);
589583
parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
590584
expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]);
591585

592586
const q2 = sync_rules.bucketDescriptors[0].parameterQueries[1];
593587
const lookups2 = q2.getLookups(parameters);
594588
expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]);
595589

596-
const parameter_sets2 = await bucketStorage.getParameterSets(checkpoint, lookups2);
590+
const parameter_sets2 = await checkpoint.getParameterSets(lookups2);
597591
parameter_sets2.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
598592
expect(parameter_sets2).toEqual([{ workspace_id: 'workspace3' }]);
599593

600594
// Test final values - the important part
601595
const buckets = (
602596
await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({
603597
getParameterSets(lookups) {
604-
return bucketStorage.getParameterSets(checkpoint, lookups);
598+
return checkpoint.getParameterSets(lookups);
605599
}
606600
})
607601
).map((e) => e.bucket);
@@ -904,9 +898,7 @@ bucket_definitions:
904898

905899
const checkpoint = await bucketStorage.getCheckpoint();
906900

907-
const parameters = await bucketStorage.getParameterSets(checkpoint, [
908-
ParameterLookup.normalized('mybucket', '1', ['user1'])
909-
]);
901+
const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]);
910902
expect(parameters).toEqual([]);
911903
});
912904

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,6 @@ export interface SyncRulesBucketStorage
6666

6767
getCheckpoint(): Promise<ReplicationCheckpoint>;
6868

69-
/**
70-
* Used to resolve "dynamic" parameter queries.
71-
*/
72-
getParameterSets(checkpoint: ReplicationCheckpoint, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]>;
73-
7469
/**
7570
* Given two checkpoints, return the changes in bucket data and parameters that may have occurred
7671
* in that period.
@@ -243,6 +238,13 @@ export interface SyncBucketDataChunk {
243238
export interface ReplicationCheckpoint {
244239
readonly checkpoint: util.InternalOpId;
245240
readonly lsn: string | null;
241+
242+
/**
243+
* Used to resolve "dynamic" parameter queries.
244+
*
245+
* This gets parameter sets specific to this checkpoint.
246+
*/
247+
getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]>;
246248
}
247249

248250
export interface WatchWriteCheckpointOptions {

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ export class BucketParameterState {
440440
if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) {
441441
dynamicBuckets = await querier.queryDynamicBucketDescriptions({
442442
getParameterSets(lookups) {
443-
return storage.getParameterSets(checkpoint.base, lookups);
443+
return checkpoint.base.getParameterSets(lookups);
444444
}
445445
});
446446
this.cachedDynamicBuckets = dynamicBuckets;
@@ -501,7 +501,7 @@ export interface CheckpointLine {
501501
}
502502

503503
// Use a more specific type to simplify testing
504-
export type BucketChecksumStateStorage = Pick<storage.SyncRulesBucketStorage, 'getChecksums' | 'getParameterSets'>;
504+
export type BucketChecksumStateStorage = Pick<storage.SyncRulesBucketStorage, 'getChecksums'>;
505505

506506
function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number) {
507507
buckets = buckets.map((b) => {

0 commit comments

Comments
 (0)