Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/seven-mangos-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

const INDEX_NAME = 'dirty_buckets';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.createBucketStateIndex2();
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.bucket_state.indexExists(INDEX_NAME)) {
await db.bucket_state.dropIndex(INDEX_NAME);
}
} finally {
await db.client.close();
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,19 @@ export class MongoCompactor {
await this.compactInternal(bucket);
}
} else {
await this.compactInternal(undefined);
await this.compactDirtyBuckets();
}
}

async compactInternal(bucket: string | undefined) {
private async compactDirtyBuckets() {
for await (let buckets of this.iterateDirtyBuckets()) {
for (let bucket of buckets) {
await this.compactInternal(bucket);
}
}
}

private async compactInternal(bucket: string | undefined) {
const idLimitBytes = this.idLimitBytes;

let currentState: CurrentBucketState | null = null;
Expand Down Expand Up @@ -483,6 +491,16 @@ export class MongoCompactor {
* Subset of compact, only populating checksums where relevant.
*/
async populateChecksums() {
for await (let buckets of this.iterateDirtyBuckets()) {
const start = Date.now();
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);

await this.updateChecksumsBatch(buckets);
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
}
}

private async *iterateDirtyBuckets(): AsyncGenerator<string[]> {
// This is updated after each batch
let lowerBound: BucketStateDocument['_id'] = {
g: this.group_id,
Expand All @@ -500,10 +518,11 @@ export class MongoCompactor {
$gt: lowerBound,
$lt: upperBound
},
compacted_state: { $exists: false }
// Partial index exists on this
'estimate_since_compact.count': { $gt: 0 }
};

const bucketsWithoutChecksums = await this.db.bucket_state
const dirtyBuckets = await this.db.bucket_state
.find(filter, {
projection: {
_id: 1
Expand All @@ -512,19 +531,19 @@ export class MongoCompactor {
_id: 1
},
limit: 5_000,
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
// Make sure we use the partial index
hint: 'dirty_buckets'
})
.toArray();
if (bucketsWithoutChecksums.length == 0) {
// All done

if (dirtyBuckets.length == 0) {
break;
}

logger.info(`Calculating checksums for batch of ${bucketsWithoutChecksums.length} buckets`);

await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b));
yield dirtyBuckets.map((bucket) => bucket._id.b);

lowerBound = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id;
lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id;
}
}

Expand Down Expand Up @@ -559,6 +578,10 @@ export class MongoCompactor {
count: bucketChecksum.count,
checksum: BigInt(bucketChecksum.checksum),
bytes: null
},
estimate_since_compact: {
count: 0,
bytes: 0
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag
// Full migrations are not currently run for tests, so we manually create the important ones
await db.createCheckpointEventsCollection();
await db.createBucketStateIndex();
await db.createBucketStateIndex2();

return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions);
};
Expand Down
13 changes: 13 additions & 0 deletions modules/module-mongodb-storage/src/storage/implementation/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ export class PowerSyncMongo {
{ name: 'bucket_updates', unique: true }
);
}
/**
* Only use in migrations and tests.
*/
async createBucketStateIndex2() {
// TODO: Implement a better mechanism to use migrations in tests
await this.bucket_state.createIndex(
{
_id: 1,
'estimate_since_compact.count': 1
},
{ name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } }
);
}
}

export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {
Expand Down