Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/fifty-socks-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
---

Create a persisted checksum cache when compacting buckets.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class MongoBucketBatch
private persisted_op: InternalOpId | null = null;

/**
* For tests only - not for persistence logic.
* Last written op, if any. This may not reflect a consistent checkpoint.
*/
public last_flushed_op: InternalOpId | null = null;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework';
import { InternalOpId, storage, utils } from '@powersync/service-core';
import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework';
import { addChecksums, InternalOpId, storage, utils } from '@powersync/service-core';

import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey } from './models.js';
import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js';
import { cacheKey } from './OperationBatch.js';
import { readSingleBatch } from './util.js';

interface CurrentBucketState {
/** Bucket name */
Expand All @@ -27,6 +28,21 @@ interface CurrentBucketState {
* Number of REMOVE/MOVE operations seen since lastNotPut.
*/
opsSincePut: number;

/**
* Incrementally-updated checksum, up to maxOpId
*/
checksum: number;

/**
* op count for the checksum
*/
opCount: number;

/**
* Byte size of ops covered by the checksum.
*/
opBytes: number;
}

/**
Expand All @@ -43,13 +59,15 @@ const DEFAULT_MEMORY_LIMIT_MB = 64;

export class MongoCompactor {
private updates: mongo.AnyBulkWriteOperation<BucketDataDocument>[] = [];
private bucketStateUpdates: mongo.AnyBulkWriteOperation<BucketStateDocument>[] = [];

private idLimitBytes: number;
private moveBatchLimit: number;
private moveBatchQueryLimit: number;
private clearBatchLimit: number;
private maxOpId: bigint | undefined;
private maxOpId: bigint;
private buckets: string[] | undefined;
private signal?: AbortSignal;

constructor(
private db: PowerSyncMongo,
Expand All @@ -60,8 +78,9 @@ export class MongoCompactor {
this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
this.maxOpId = options?.maxOpId;
this.maxOpId = options?.maxOpId ?? 0n;
this.buckets = options?.compactBuckets;
this.signal = options?.signal;
}

/**
Expand Down Expand Up @@ -117,31 +136,33 @@ export class MongoCompactor {
o: new mongo.MaxKey() as any
};

while (true) {
while (!this.signal?.aborted) {
// Query one batch at a time, to avoid cursor timeouts
const batch = await this.db.bucket_data
.find(
{
const cursor = this.db.bucket_data.aggregate<BucketDataDocument & { size: number | bigint }>([
{
$match: {
_id: {
$gte: lowerBound,
$lt: upperBound
}
},
{
projection: {
_id: 1,
op: 1,
table: 1,
row_id: 1,
source_table: 1,
source_key: 1
},
limit: this.moveBatchQueryLimit,
sort: { _id: -1 },
singleBatch: true
}
)
.toArray();
},
{ $sort: { _id: -1 } },
{ $limit: this.moveBatchQueryLimit },
{
$project: {
_id: 1,
op: 1,
table: 1,
row_id: 1,
source_table: 1,
source_key: 1,
checksum: 1,
size: { $bsonSize: '$$ROOT' }
}
}
]);
const { data: batch } = await readSingleBatch(cursor);

if (batch.length == 0) {
// We've reached the end
Expand All @@ -153,34 +174,47 @@ export class MongoCompactor {

for (let doc of batch) {
if (currentState == null || doc._id.b != currentState.bucket) {
if (currentState != null && currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
// Important to flush before clearBucket()
await this.flush();
logger.info(
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
if (currentState != null) {
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
// Important to flush before clearBucket()
// Does not have to happen before flushBucketChecksums()
await this.flush();
logger.info(
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);

// Free memory before clearing bucket
currentState!.seen.clear();

await this.clearBucket(currentState);
}

const bucket = currentState.bucket;
const clearOp = currentState.lastNotPut;
// Free memory before clearing bucket
currentState = null;
await this.clearBucket(bucket, clearOp);
// Should happen after clearBucket() for accurate stats
this.updateBucketChecksums(currentState);
}
currentState = {
bucket: doc._id.b,
seen: new Map(),
trackingSize: 0,
lastNotPut: null,
opsSincePut: 0
opsSincePut: 0,

checksum: 0,
opCount: 0,
opBytes: 0
};
}

if (this.maxOpId != null && doc._id.o > this.maxOpId) {
if (doc._id.o > this.maxOpId) {
continue;
}

currentState.checksum = addChecksums(currentState.checksum, Number(doc.checksum));
currentState.opCount += 1;

let isPersistentPut = doc.op == 'PUT';

currentState.opBytes += Number(doc.size);
if (doc.op == 'REMOVE' || doc.op == 'PUT') {
const key = `${doc.table}/${doc.row_id}/${cacheKey(doc.source_table!, doc.source_key!)}`;
const targetOp = currentState.seen.get(key);
Expand Down Expand Up @@ -208,6 +242,8 @@ export class MongoCompactor {
}
}
});

currentState.opBytes += 200 - Number(doc.size); // TODO: better estimate for this
} else {
if (currentState.trackingSize >= idLimitBytes) {
// Reached memory limit.
Expand All @@ -234,24 +270,72 @@ export class MongoCompactor {
currentState.opsSincePut += 1;
}

if (this.updates.length >= this.moveBatchLimit) {
if (this.updates.length + this.bucketStateUpdates.length >= this.moveBatchLimit) {
await this.flush();
}
}
}

await this.flush();
currentState?.seen.clear();
if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) {
logger.info(
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
const bucket = currentState.bucket;
const clearOp = currentState.lastNotPut;
// Free memory before clearing bucket
currentState = null;
await this.clearBucket(bucket, clearOp);
// Need flush() before clear()
await this.flush();
await this.clearBucket(currentState);
}
if (currentState != null) {
// Do this _after_ clearBucket so that we have accurate counts.
this.updateBucketChecksums(currentState);
}
// Need another flush after updateBucketChecksums()
await this.flush();
}

/**
* Call when done with a bucket.
*/
private updateBucketChecksums(state: CurrentBucketState) {
if (state.opCount < 0) {
throw new ServiceAssertionError(
`Invalid opCount: ${state.opCount} checksum ${state.checksum} opsSincePut: ${state.opsSincePut} maxOpId: ${this.maxOpId}`
);
}
this.bucketStateUpdates.push({
updateOne: {
filter: {
_id: {
g: this.group_id,
b: state.bucket
}
},
update: {
$set: {
compacted_state: {
op_id: this.maxOpId,
count: state.opCount,
checksum: BigInt(state.checksum),
bytes: state.opBytes
},
estimate_since_compact: {
// Note: There could have been a whole bunch of new operations added to the bucket _while_ compacting,
// which we don't currently cater for.
// We could potentially query for that, but that could add overhead.
count: 0,
bytes: 0
}
},
$setOnInsert: {
// Only set this if we're creating the document.
// In all other cases, the replication process will have a set a more accurate id.
last_op: this.maxOpId
}
},
// We generally expect this to have been created before, but do handle cases of old unchanged buckets
upsert: true
}
});
}

private async flush() {
Expand All @@ -266,15 +350,26 @@ export class MongoCompactor {
});
this.updates = [];
}
if (this.bucketStateUpdates.length > 0) {
logger.info(`Updating ${this.bucketStateUpdates.length} bucket states`);
await this.db.bucket_state.bulkWrite(this.bucketStateUpdates, {
ordered: false
});
this.bucketStateUpdates = [];
}
}

/**
* Perform a CLEAR compact for a bucket.
*
*
* @param bucket bucket name
* @param op op_id of the last non-PUT operation, which will be converted to CLEAR.
*/
private async clearBucket(bucket: string, op: InternalOpId) {
private async clearBucket(currentState: CurrentBucketState) {
const bucket = currentState.bucket;
const clearOp = currentState.lastNotPut!;

const opFilter = {
_id: {
$gte: {
Expand All @@ -285,15 +380,16 @@ export class MongoCompactor {
$lte: {
g: this.group_id,
b: bucket,
o: op
o: clearOp
}
}
};

const session = this.db.client.startSession();
try {
let done = false;
while (!done) {
while (!done && !this.signal?.aborted) {
let opCountDiff = 0;
// Do the CLEAR operation in batches, with each batch a separate transaction.
// The state after each batch is fully consistent.
// We need a transaction per batch to make sure checksums stay consistent.
Expand Down Expand Up @@ -364,12 +460,16 @@ export class MongoCompactor {
},
{ session }
);

opCountDiff = -numberOfOpsToClear + 1;
},
{
writeConcern: { w: 'majority' },
readConcern: { level: 'snapshot' }
}
);
// Update _outside_ the transaction, since the transaction can be retried multiple times.
currentState.opCount += opCountDiff;
}
} finally {
await session.endSession();
Expand Down
Loading