Skip to content

Commit 440d32f

Browse files
committed
Add parameter compact action.
1 parent 8defa6e commit 440d32f

File tree

4 files changed

+114
-2
lines changed

4 files changed

+114
-2
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { logger } from '@powersync/lib-services-framework';
2+
import { bson, InternalOpId } from '@powersync/service-core';
3+
import { PowerSyncMongo } from './db.js';
4+
5+
export class MongoParameterCompactor {
6+
constructor(
7+
private db: PowerSyncMongo,
8+
private group_id: number,
9+
private checkpoint: InternalOpId
10+
) {}
11+
12+
/**
13+
* This is the oldest checkpoint that we consider safe to still use. We cleanup old parameter
14+
* but no data that would be used by this checkpoint.
15+
*
16+
* Specifically, we return a checkpoint that has been available for at least 5 minutes, then
17+
* we can delete data only used for checkpoints older than that.
18+
*
19+
* @returns null if there is no safe checkpoint available.
20+
*/
21+
async getActiveCheckpoint(): Promise<InternalOpId | null> {
22+
const syncRules = await this.db.sync_rules.findOne({ _id: this.group_id });
23+
if (syncRules == null) {
24+
return null;
25+
}
26+
27+
return syncRules.last_checkpoint;
28+
}
29+
30+
async compact() {
31+
logger.info(`Compacting parameters for group ${this.group_id} up to checkpoint ${this.checkpoint}`);
32+
// This is the currently-active checkpoint.
33+
// We do not remove any data that may be used by this checkpoint.
34+
// snapshot queries ensure that if any clients are still using older checkpoints, they would
35+
// not be affected by this compaction.
36+
const checkpoint = await this.checkpoint;
37+
if (checkpoint == null) {
38+
return;
39+
}
40+
41+
// Index on {'key.g': 1, lookup: 1, _id: 1}
42+
// In theory, we could let MongoDB do more of the work here, by grouping by (key, lookup)
43+
// in MongoDB already. However, that risks running into cases where MongoDB needs to process
44+
// very large amounts of data before returning results, which could lead to timeouts.
45+
const cursor = this.db.bucket_parameters.find(
46+
{
47+
'key.g': this.group_id
48+
},
49+
{
50+
sort: { lookup: 1, _id: 1 },
51+
batchSize: 10_000,
52+
projection: { _id: 1, key: 1, lookup: 1 }
53+
}
54+
);
55+
56+
let lastDoc: RawParameterData | null = null;
57+
let removeIds: InternalOpId[] = [];
58+
59+
while (await cursor.hasNext()) {
60+
const batch = cursor.readBufferedDocuments();
61+
for (let doc of batch) {
62+
if (doc._id >= checkpoint) {
63+
continue;
64+
}
65+
const rawDoc: RawParameterData = {
66+
_id: doc._id,
67+
data: bson.serialize({
68+
key: doc.key,
69+
lookup: doc.lookup
70+
}) as Buffer
71+
};
72+
if (lastDoc != null && lastDoc.data.equals(rawDoc.data) && lastDoc._id < doc._id) {
73+
removeIds.push(lastDoc._id);
74+
}
75+
76+
lastDoc = rawDoc;
77+
}
78+
79+
if (removeIds.length >= 1000) {
80+
await this.db.bucket_parameters.deleteMany({ _id: { $in: removeIds } });
81+
logger.info(`Removed ${removeIds.length} stale parameter entries`);
82+
removeIds = [];
83+
}
84+
}
85+
86+
if (removeIds.length > 0) {
87+
await this.db.bucket_parameters.deleteMany({ _id: { $in: removeIds } });
88+
logger.info(`Removed ${removeIds.length} stale parameter entries`);
89+
}
90+
logger.info('Parameter compaction completed');
91+
}
92+
}
93+
94+
interface RawParameterData {
95+
_id: InternalOpId;
96+
data: Buffer;
97+
}

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
3333
import { MongoCompactor } from './MongoCompactor.js';
3434
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
3535
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js';
36+
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
3637

3738
export class MongoSyncBucketStorage
3839
extends BaseObserver<storage.SyncRulesBucketStorageListener>
@@ -677,7 +678,11 @@ export class MongoSyncBucketStorage
677678
}
678679

679680
async compact(options?: storage.CompactOptions) {
680-
return new MongoCompactor(this.db, this.group_id, options).compact();
681+
const checkpoint = await this.getCheckpointInternal();
682+
await new MongoCompactor(this.db, this.group_id, options).compact();
683+
if (checkpoint != null && options?.compactParameterData) {
684+
await new MongoParameterCompactor(this.db, this.group_id, checkpoint.checkpoint).compact();
685+
}
681686
}
682687

683688
/**

packages/service-core/src/entry/commands/compact-action.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,15 @@ export function registerCompactAction(program: Command) {
5959
return;
6060
}
6161
logger.info('Performing compaction...');
62-
await active.compact({ memoryLimitMB: COMPACT_MEMORY_LIMIT_MB, compactBuckets: buckets });
62+
if (buckets != null) {
63+
await active.compact({
64+
memoryLimitMB: COMPACT_MEMORY_LIMIT_MB,
65+
compactBuckets: buckets,
66+
compactParameterData: false
67+
});
68+
} else {
69+
await active.compact({ memoryLimitMB: COMPACT_MEMORY_LIMIT_MB, compactParameterData: true });
70+
}
6371
logger.info('Successfully compacted storage.');
6472
} catch (e) {
6573
logger.error(`Failed to compact: ${e.toString()}`);

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ export interface CompactOptions {
193193
*/
194194
compactBuckets?: string[];
195195

196+
compactParameterData?: boolean;
197+
196198
/** Minimum of 2 */
197199
clearBatchLimit?: number;
198200

0 commit comments

Comments
 (0)