Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .changeset/seven-mangos-sleep.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
'@powersync/service-image': patch
---

[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting.
[MongoDB Storage] Only compact modified buckets. Add index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting, and skip small buckets.
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { mongo, MONGO_OPERATION_TIMEOUT_MS } from '@powersync/lib-service-mongodb';
import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework';
import { addChecksums, InternalOpId, isPartialChecksum, storage, utils } from '@powersync/service-core';
import {
addChecksums,
InternalOpId,
isPartialChecksum,
PopulateChecksumCacheResults,
storage,
utils
} from '@powersync/service-core';

import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js';
Expand All @@ -10,6 +17,7 @@ import { cacheKey } from './OperationBatch.js';
interface CurrentBucketState {
/** Bucket name */
bucket: string;

/**
* Rows seen in the bucket, with the last op_id of each.
*/
Expand Down Expand Up @@ -96,75 +104,56 @@ export class MongoCompactor {
// We can make this more efficient later on by iterating
// through the buckets in a single query.
// That makes batching more tricky, so we leave for later.
await this.compactInternal(bucket);
await this.compactSingleBucket(bucket);
}
} else {
await this.compactDirtyBuckets();
}
}

private async compactDirtyBuckets() {
for await (let buckets of this.iterateDirtyBuckets()) {
while (!this.signal?.aborted) {
// Process all buckets with 1 or more changes since last time
const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 });
if (buckets.length == 0) {
// All done
break;
}
for (let bucket of buckets) {
await this.compactInternal(bucket);
await this.compactSingleBucket(bucket);
}
}
}

private async compactInternal(bucket: string | undefined) {
private async compactSingleBucket(bucket: string) {
const idLimitBytes = this.idLimitBytes;

let currentState: CurrentBucketState | null = null;

let bucketLower: string | mongo.MinKey;
let bucketUpper: string | mongo.MaxKey;
let currentState: CurrentBucketState = {
bucket,
seen: new Map(),
trackingSize: 0,
lastNotPut: null,
opsSincePut: 0,

if (bucket == null) {
bucketLower = new mongo.MinKey();
bucketUpper = new mongo.MaxKey();
} else if (bucket.includes('[')) {
// Exact bucket name
bucketLower = bucket;
bucketUpper = bucket;
} else {
// Bucket definition name
bucketLower = `${bucket}[`;
bucketUpper = `${bucket}[\uFFFF`;
}
checksum: 0,
opCount: 0,
opBytes: 0
};

// Constant lower bound
const lowerBound: BucketDataKey = {
g: this.group_id,
b: bucketLower as string,
b: bucket,
o: new mongo.MinKey() as any
};

// Upper bound is adjusted for each batch
let upperBound: BucketDataKey = {
g: this.group_id,
b: bucketUpper as string,
b: bucket,
o: new mongo.MaxKey() as any
};

const doneWithBucket = async () => {
if (currentState == null) {
return;
}
// Free memory before clearing bucket
currentState.seen.clear();
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
logger.info(
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
// Need flush() before clear()
await this.flush();
await this.clearBucket(currentState);
}

// Do this _after_ clearBucket so that we have accurate counts.
this.updateBucketChecksums(currentState);
};

while (!this.signal?.aborted) {
// Query one batch at a time, to avoid cursor timeouts
const cursor = this.db.bucket_data.aggregate<BucketDataDocument & { size: number | bigint }>(
Expand Down Expand Up @@ -211,22 +200,6 @@ export class MongoCompactor {
upperBound = batch[batch.length - 1]._id;

for (let doc of batch) {
if (currentState == null || doc._id.b != currentState.bucket) {
await doneWithBucket();

currentState = {
bucket: doc._id.b,
seen: new Map(),
trackingSize: 0,
lastNotPut: null,
opsSincePut: 0,

checksum: 0,
opCount: 0,
opBytes: 0
};
}

if (doc._id.o > this.maxOpId) {
continue;
}
Expand Down Expand Up @@ -297,12 +270,22 @@ export class MongoCompactor {
}
}

if (currentState != null) {
logger.info(`Processed batch of length ${batch.length} current bucket: ${currentState.bucket}`);
}
logger.info(`Processed batch of length ${batch.length} current bucket: ${bucket}`);
}

await doneWithBucket();
// Free memory before clearing bucket
currentState.seen.clear();
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
logger.info(
`Inserting CLEAR at ${this.group_id}:${bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
// Need flush() before clear()
await this.flush();
await this.clearBucket(currentState);
}

// Do this _after_ clearBucket so that we have accurate counts.
this.updateBucketChecksums(currentState);

// Need another flush after updateBucketChecksums()
await this.flush();
Expand Down Expand Up @@ -490,61 +473,55 @@ export class MongoCompactor {
/**
* Subset of compact, only populating checksums where relevant.
*/
async populateChecksums() {
for await (let buckets of this.iterateDirtyBuckets()) {
async populateChecksums(options: { minBucketChanges: number }): Promise<PopulateChecksumCacheResults> {
let count = 0;
while (!this.signal?.aborted) {
const buckets = await this.dirtyBucketBatch(options);
if (buckets.length == 0) {
// All done
break;
}
const start = Date.now();
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);

await this.updateChecksumsBatch(buckets);
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
count += buckets.length;
}
return { buckets: count };
}

private async *iterateDirtyBuckets(): AsyncGenerator<string[]> {
// This is updated after each batch
let lowerBound: BucketStateDocument['_id'] = {
g: this.group_id,
b: new mongo.MinKey() as any
};
// This is static
const upperBound: BucketStateDocument['_id'] = {
g: this.group_id,
b: new mongo.MaxKey() as any
};
while (!this.signal?.aborted) {
// By filtering buckets, we effectively make this "resumeable".
const filter: mongo.Filter<BucketStateDocument> = {
_id: {
$gt: lowerBound,
$lt: upperBound
/**
* Returns a batch of dirty buckets - buckets with most changes first.
*
* This cannot be used to iterate on its own - the client is expected to process these buckets and
* set estimate_since_compact.count: 0 when done, before fetching the next batch.
*/
private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise<string[]> {
if (options.minBucketChanges <= 0) {
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
}
// We make use of an index on {_id.g: 1, 'estimate_since_compact.count': -1}
const dirtyBuckets = await this.db.bucket_state
.find(
{
'_id.g': this.group_id,
'estimate_since_compact.count': { $gte: options.minBucketChanges }
},
// Partial index exists on this
'estimate_since_compact.count': { $gt: 0 }
};

const dirtyBuckets = await this.db.bucket_state
.find(filter, {
{
projection: {
_id: 1
},
sort: {
_id: 1
'estimate_since_compact.count': -1
},
limit: 5_000,
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
// Make sure we use the partial index
hint: 'dirty_buckets'
})
.toArray();

if (dirtyBuckets.length == 0) {
break;
}

yield dirtyBuckets.map((bucket) => bucket._id.b);
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
}
)
.toArray();

lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id;
}
return dirtyBuckets.map((bucket) => bucket._id.b);
}

private async updateChecksumsBatch(buckets: string[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
InternalOpId,
internalToExternalOpId,
maxLsn,
PopulateChecksumCacheOptions,
PopulateChecksumCacheResults,
ProtocolOpId,
ReplicationCheckpoint,
storage,
Expand Down Expand Up @@ -665,7 +667,7 @@ export class MongoSyncBucketStorage
}
}

async populatePersistentChecksumCache(options: Required<Pick<CompactOptions, 'signal' | 'maxOpId'>>): Promise<void> {
async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise<PopulateChecksumCacheResults> {
logger.info(`Populating persistent checksum cache...`);
const start = Date.now();
// We do a minimal compact here.
Expand All @@ -676,9 +678,14 @@ export class MongoSyncBucketStorage
memoryLimitMB: 0
});

await compactor.populateChecksums();
const result = await compactor.populateChecksums({
// There are cases with millions of small buckets, in which case it can take very long to
// populate the checksums, with minimal benefit. We skip the small buckets here.
minBucketChanges: options.minBucketChanges ?? 10
});
const duration = Date.now() - start;
logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`);
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ export class PowerSyncMongo {
// TODO: Implement a better mechanism to use migrations in tests
await this.bucket_state.createIndex(
{
_id: 1,
'estimate_since_compact.count': 1
'_id.g': 1,
'estimate_since_compact.count': -1
},
{ name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } }
{ name: 'dirty_count' }
);
}
}
Expand Down
19 changes: 17 additions & 2 deletions modules/module-mongodb-storage/test/src/storage_compacting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,25 @@ bucket_definitions:
await populate(bucketStorage);
const { checkpoint } = await bucketStorage.getCheckpoint();

await bucketStorage.populatePersistentChecksumCache({
// Default is to small small numbers - should be a no-op
const result0 = await bucketStorage.populatePersistentChecksumCache({
maxOpId: checkpoint
});
expect(result0.buckets).toEqual(0);

// This should cache the checksums for the two buckets
const result1 = await bucketStorage.populatePersistentChecksumCache({
maxOpId: checkpoint,
minBucketChanges: 1
});
expect(result1.buckets).toEqual(2);

// This should be a no-op, as the checksums are already cached
const result2 = await bucketStorage.populatePersistentChecksumCache({
maxOpId: checkpoint,
signal: new AbortController().signal
minBucketChanges: 1
});
expect(result2.buckets).toEqual(0);

const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']);
expect(checksumAfter.get('by_user2["u1"]')).toEqual({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
LastValueSink,
maxLsn,
PartialChecksum,
PopulateChecksumCacheOptions,
PopulateChecksumCacheResults,
ReplicationCheckpoint,
storage,
utils,
Expand Down Expand Up @@ -112,8 +114,9 @@ export class PostgresSyncRulesStorage
return new PostgresCompactor(this.db, this.group_id, options).compact();
}

async populatePersistentChecksumCache(options: Pick<CompactOptions, 'signal' | 'maxOpId'>): Promise<void> {
async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise<PopulateChecksumCacheResults> {
// no-op - checksum cache is not implemented for Postgres yet
return { buckets: 0 };
}

lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise<bigint | null> {
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/src/entry/commands/compact-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024);
export function registerCompactAction(program: Command) {
const compactCommand = program
.command(COMMAND_NAME)
.option(`-b, --buckets [buckets]`, 'Bucket or bucket definition name (optional, comma-separate multiple names)');
.option(`-b, --buckets [buckets]`, 'Bucket name (optional, comma-separate multiple names)');

wrapConfigCommand(compactCommand);

Expand Down
Loading