Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .changeset/seven-mangos-sleep.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
'@powersync/service-image': patch
---

[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting.
[MongoDB Storage] Only compact modified buckets. Add index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting, and skip small buckets.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { cacheKey } from './OperationBatch.js';
interface CurrentBucketState {
/** Bucket name */
bucket: string;

/**
* Rows seen in the bucket, with the last op_id of each.
*/
Expand Down Expand Up @@ -96,75 +97,56 @@ export class MongoCompactor {
// We can make this more efficient later on by iterating
// through the buckets in a single query.
// That makes batching more tricky, so we leave for later.
await this.compactInternal(bucket);
await this.compactSingleBucket(bucket);
}
} else {
await this.compactDirtyBuckets();
}
}

private async compactDirtyBuckets() {
for await (let buckets of this.iterateDirtyBuckets()) {
while (!this.signal?.aborted) {
// Process all buckets with 1 or more changes since last time
const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 });
if (buckets.length == 0) {
// All done
break;
}
for (let bucket of buckets) {
await this.compactInternal(bucket);
await this.compactSingleBucket(bucket);
}
}
}

private async compactInternal(bucket: string | undefined) {
private async compactSingleBucket(bucket: string) {
const idLimitBytes = this.idLimitBytes;

let currentState: CurrentBucketState | null = null;

let bucketLower: string | mongo.MinKey;
let bucketUpper: string | mongo.MaxKey;
let currentState: CurrentBucketState = {
bucket,
seen: new Map(),
trackingSize: 0,
lastNotPut: null,
opsSincePut: 0,

if (bucket == null) {
bucketLower = new mongo.MinKey();
bucketUpper = new mongo.MaxKey();
} else if (bucket.includes('[')) {
// Exact bucket name
bucketLower = bucket;
bucketUpper = bucket;
} else {
// Bucket definition name
bucketLower = `${bucket}[`;
bucketUpper = `${bucket}[\uFFFF`;
}
checksum: 0,
opCount: 0,
opBytes: 0
};

// Constant lower bound
const lowerBound: BucketDataKey = {
g: this.group_id,
b: bucketLower as string,
b: bucket,
o: new mongo.MinKey() as any
};

// Upper bound is adjusted for each batch
let upperBound: BucketDataKey = {
g: this.group_id,
b: bucketUpper as string,
b: bucket,
o: new mongo.MaxKey() as any
};

const doneWithBucket = async () => {
if (currentState == null) {
return;
}
// Free memory before clearing bucket
currentState.seen.clear();
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
logger.info(
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
// Need flush() before clear()
await this.flush();
await this.clearBucket(currentState);
}

// Do this _after_ clearBucket so that we have accurate counts.
this.updateBucketChecksums(currentState);
};

while (!this.signal?.aborted) {
// Query one batch at a time, to avoid cursor timeouts
const cursor = this.db.bucket_data.aggregate<BucketDataDocument & { size: number | bigint }>(
Expand Down Expand Up @@ -211,22 +193,6 @@ export class MongoCompactor {
upperBound = batch[batch.length - 1]._id;

for (let doc of batch) {
if (currentState == null || doc._id.b != currentState.bucket) {
await doneWithBucket();

currentState = {
bucket: doc._id.b,
seen: new Map(),
trackingSize: 0,
lastNotPut: null,
opsSincePut: 0,

checksum: 0,
opCount: 0,
opBytes: 0
};
}

if (doc._id.o > this.maxOpId) {
continue;
}
Expand Down Expand Up @@ -297,12 +263,22 @@ export class MongoCompactor {
}
}

if (currentState != null) {
logger.info(`Processed batch of length ${batch.length} current bucket: ${currentState.bucket}`);
}
logger.info(`Processed batch of length ${batch.length} current bucket: ${bucket}`);
}

await doneWithBucket();
// Free memory before clearing bucket
currentState.seen.clear();
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
logger.info(
`Inserting CLEAR at ${this.group_id}:${bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
// Need flush() before clear()
await this.flush();
await this.clearBucket(currentState);
}

// Do this _after_ clearBucket so that we have accurate counts.
this.updateBucketChecksums(currentState);

// Need another flush after updateBucketChecksums()
await this.flush();
Expand Down Expand Up @@ -490,8 +466,13 @@ export class MongoCompactor {
/**
* Subset of compact, only populating checksums where relevant.
*/
async populateChecksums() {
for await (let buckets of this.iterateDirtyBuckets()) {
async populateChecksums(options: { minBucketChanges: number }) {
while (!this.signal?.aborted) {
const buckets = await this.dirtyBucketBatch(options);
if (buckets.length == 0) {
// All done
break;
}
const start = Date.now();
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);

Expand All @@ -500,51 +481,37 @@ export class MongoCompactor {
}
}

private async *iterateDirtyBuckets(): AsyncGenerator<string[]> {
// This is updated after each batch
let lowerBound: BucketStateDocument['_id'] = {
g: this.group_id,
b: new mongo.MinKey() as any
};
// This is static
const upperBound: BucketStateDocument['_id'] = {
g: this.group_id,
b: new mongo.MaxKey() as any
};
while (!this.signal?.aborted) {
// By filtering buckets, we effectively make this "resumeable".
const filter: mongo.Filter<BucketStateDocument> = {
_id: {
$gt: lowerBound,
$lt: upperBound
/**
* Returns a batch of dirty buckets - buckets with most changes first.
*
* This cannot be used to iterate on its own - the client is expected to process these buckets and
* set estimate_since_compact.count: 0 when done, before fetching the next batch.
*/
private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise<string[]> {
if (options.minBucketChanges <= 0) {
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
}
// We make use of an index on {_id.g: 1, 'estimate_since_compact.count': -1}
const dirtyBuckets = await this.db.bucket_state
.find(
{
'_id.g': this.group_id,
'estimate_since_compact.count': { $gte: options.minBucketChanges }
},
// Partial index exists on this
'estimate_since_compact.count': { $gt: 0 }
};

const dirtyBuckets = await this.db.bucket_state
.find(filter, {
{
projection: {
_id: 1
},
sort: {
_id: 1
'estimate_since_compact.count': -1
},
limit: 5_000,
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
// Make sure we use the partial index
hint: 'dirty_buckets'
})
.toArray();

if (dirtyBuckets.length == 0) {
break;
}

yield dirtyBuckets.map((bucket) => bucket._id.b);
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
}
)
.toArray();

lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id;
}
return dirtyBuckets.map((bucket) => bucket._id.b);
}

private async updateChecksumsBatch(buckets: string[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,11 @@ export class MongoSyncBucketStorage
memoryLimitMB: 0
});

await compactor.populateChecksums();
await compactor.populateChecksums({
// There are cases with millions of small buckets, in which case it can take very long to
// populate the checksums, with minimal benefit. We skip the small buckets here.
minBucketChanges: 10
});
const duration = Date.now() - start;
logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ export class PowerSyncMongo {
// TODO: Implement a better mechanism to use migrations in tests
await this.bucket_state.createIndex(
{
_id: 1,
'estimate_since_compact.count': 1
'_id.g': 1,
'estimate_since_compact.count': -1
},
{ name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } }
{ name: 'dirty_count' }
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/src/entry/commands/compact-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024);
export function registerCompactAction(program: Command) {
const compactCommand = program
.command(COMMAND_NAME)
.option(`-b, --buckets [buckets]`, 'Bucket or bucket definition name (optional, comma-separate multiple names)');
.option(`-b, --buckets [buckets]`, 'Bucket name (optional, comma-separate multiple names)');

wrapConfigCommand(compactCommand);

Expand Down