Skip to content

Commit 5837e14

Browse files
committed
Cache dynamic bucket lookups.
1 parent 62d40f8 commit 5837e14

File tree

2 files changed

+84
-33
lines changed

2 files changed

+84
-33
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
3838
import { MongoCompactor } from './MongoCompactor.js';
3939
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4040
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
41+
import { LRUCache } from 'lru-cache';
4142

4243
export class MongoSyncBucketStorage
4344
extends BaseObserver<storage.SyncRulesBucketStorageListener>
@@ -868,25 +869,40 @@ export class MongoSyncBucketStorage
868869
return pipeline;
869870
}
870871

871-
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
872-
const dataBuckets = await this.db.bucket_data
873-
.find(
874-
{
875-
'_id.g': this.group_id,
876-
'_id.o': { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }
877-
},
878-
{
879-
projection: {
880-
'_id.b': 1
881-
},
882-
limit: 1001,
883-
batchSize: 1001,
884-
singleBatch: true
885-
}
886-
)
887-
.toArray();
888-
const invalidateDataBuckets = dataBuckets.length > 1000;
872+
private async getDataBucketChanges(
873+
options: GetCheckpointChangesOptions
874+
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
875+
// The query below can be slow, since we don't have an index on _id.o.
876+
// We could try to query the oplog for these, but that is risky.
877+
// Or we could store updated buckets in a separate collection, and query those.
878+
// For now, we ignore this optimization
879+
880+
// const dataBuckets = await this.db.bucket_data
881+
// .find(
882+
// {
883+
// '_id.g': this.group_id,
884+
// '_id.o': { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }
885+
// },
886+
// {
887+
// projection: {
888+
// '_id.b': 1
889+
// },
890+
// limit: 1001,
891+
// batchSize: 1001,
892+
// singleBatch: true
893+
// }
894+
// )
895+
// .toArray();
896+
897+
return {
898+
invalidateDataBuckets: true,
899+
updatedDataBuckets: []
900+
};
901+
}
889902

903+
private async getParameterBucketChanges(
904+
options: GetCheckpointChangesOptions
905+
): Promise<Pick<CheckpointChanges, 'updatedParameterBucketDefinitions' | 'invalidateParameterBuckets'>> {
890906
const parameterUpdates = await this.db.bucket_parameters
891907
.find(
892908
{
@@ -906,13 +922,37 @@ export class MongoSyncBucketStorage
906922
const invalidateParameterUpdates = parameterUpdates.length > 1000;
907923

908924
return {
909-
invalidateDataBuckets,
910-
updatedDataBuckets: invalidateDataBuckets ? [] : dataBuckets.map((b) => b._id.b),
911-
912925
invalidateParameterBuckets: invalidateParameterUpdates,
913926
updatedParameterBucketDefinitions: invalidateParameterUpdates
914927
? []
915928
: [...new Set<string>(parameterUpdates.map((p) => getLookupBucketDefinitionName(p.lookup)))]
916929
};
917930
}
931+
932+
private checkpointChangesCache = new LRUCache<string, CheckpointChanges, { options: GetCheckpointChangesOptions }>({
933+
max: 50,
934+
maxSize: 10 * 1024 * 1024,
935+
sizeCalculation: (value: CheckpointChanges) => {
936+
return value.updatedParameterBucketDefinitions.reduce<number>((a, b) => a + b.length, 0);
937+
},
938+
fetchMethod: async (_key, _staleValue, options) => {
939+
return this.getCheckpointChangesInternal(options.context.options);
940+
}
941+
});
942+
943+
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
944+
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;
945+
const result = await this.checkpointChangesCache.fetch(key, { context: { options } });
946+
return result!;
947+
}
948+
949+
private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
950+
const dataUpdates = await this.getDataBucketChanges(options);
951+
const parameterUpdates = await this.getParameterBucketChanges(options);
952+
953+
return {
954+
...dataUpdates,
955+
...parameterUpdates
956+
};
957+
}
918958
}

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ export class BucketParameterState {
268268
public readonly syncParams: RequestParameters;
269269
private readonly querier: BucketParameterQuerier;
270270
private readonly staticBuckets: Map<string, BucketDescription>;
271+
private cachedDynamicBuckets: BucketDescription[] | null = null;
271272

272273
constructor(
273274
context: SyncContext,
@@ -361,29 +362,39 @@ export class BucketParameterState {
361362
const staticBuckets = querier.staticBuckets;
362363
const update = checkpoint.update;
363364

364-
let hasChange = false;
365+
let hasDataChange = false;
366+
let hasParameterChange = false;
365367
if (update.invalidateDataBuckets || update.updatedDataBuckets?.length > 0) {
366-
hasChange = true;
367-
} else if (update.invalidateParameterBuckets) {
368-
hasChange = true;
368+
hasDataChange = true;
369+
}
370+
371+
if (update.invalidateParameterBuckets) {
372+
hasParameterChange = true;
369373
} else {
370-
for (let bucket of update.updatedParameterBucketDefinitions ?? []) {
374+
for (let bucket of update.updatedParameterBucketDefinitions) {
375+
// This is a very coarse check, but helps
371376
if (querier.dynamicBucketDefinitions.has(bucket)) {
372-
hasChange = true;
377+
hasParameterChange = true;
373378
break;
374379
}
375380
}
376381
}
377382

378-
if (!hasChange) {
383+
if (!hasDataChange && !hasParameterChange) {
379384
return null;
380385
}
381386

382-
const dynamicBuckets = await querier.queryDynamicBucketDescriptions({
383-
getParameterSets(lookups) {
384-
return storage.getParameterSets(checkpoint.base.checkpoint, lookups);
385-
}
386-
});
387+
let dynamicBuckets: BucketDescription[];
388+
if (hasParameterChange || this.cachedDynamicBuckets == null) {
389+
dynamicBuckets = await querier.queryDynamicBucketDescriptions({
390+
getParameterSets(lookups) {
391+
return storage.getParameterSets(checkpoint.base.checkpoint, lookups);
392+
}
393+
});
394+
this.cachedDynamicBuckets = dynamicBuckets;
395+
} else {
396+
dynamicBuckets = this.cachedDynamicBuckets;
397+
}
387398
const allBuckets = [...staticBuckets, ...dynamicBuckets];
388399

389400
return {

0 commit comments

Comments
 (0)