Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/wicked-rockets-add.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-image': patch
---

Fix rare issue of incorrect checksums on fallback after checksum query timed out.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { mongo } from '@powersync/lib-service-mongodb';
import { PowerSyncMongo } from './implementation/db.js';
import { SyncRuleDocument } from './implementation/models.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage } from './implementation/MongoSyncBucketStorage.js';
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js';
import { generateSlotName } from './implementation/util.js';

export class MongoBucketStorage
Expand All @@ -31,7 +31,8 @@ export class MongoBucketStorage
db: PowerSyncMongo,
options: {
slot_name_prefix: string;
}
},
private internalOptions?: MongoSyncBucketStorageOptions
) {
super();
this.client = db.client;
Expand All @@ -49,7 +50,7 @@ export class MongoBucketStorage
if ((typeof id as any) == 'bigint') {
id = Number(id);
}
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name);
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, this.internalOptions);
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as lib_mongo from '@powersync/lib-service-mongodb';
import {
addPartialChecksums,
bson,
Expand All @@ -11,12 +12,34 @@ import {
PartialChecksumMap,
PartialOrFullChecksum
} from '@powersync/service-core';
import * as lib_mongo from '@powersync/lib-service-mongodb';
import { logger } from '@powersync/lib-services-framework';
import { PowerSyncMongo } from './db.js';

/**
* Checksum calculation options, primarily for tests.
*/
export interface MongoChecksumOptions {
/**
* How many buckets to process in a batch when calculating checksums.
*/
bucketBatchLimit?: number;

/**
* Limit on the number of documents to calculate a checksum on at a time.
*/
operationBatchLimit?: number;
}

const DEFAULT_BUCKET_BATCH_LIMIT = 200;
const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;

/**
* Checksum query implementation.
*
* General implementation flow is:
* 1. getChecksums() -> check cache for (partial) matches. If not found or partial match, query the remainder using getChecksumsInternal().
* 2. getChecksumsInternal() -> query bucket_state for partial matches. Query the remainder using queryPartialChecksums().
* 3. queryPartialChecksums() -> split into batches of 200 buckets at a time -> queryPartialChecksumsInternal()
* 4. queryPartialChecksumsInternal() -> aggregate over 50_000 operations in bucket_data at a time
*/
export class MongoChecksums {
private cache = new ChecksumCache({
Expand All @@ -27,7 +50,8 @@ export class MongoChecksums {

constructor(
private db: PowerSyncMongo,
private group_id: number
private group_id: number,
private options?: MongoChecksumOptions
) {}

/**
Expand All @@ -44,7 +68,7 @@ export class MongoChecksums {
/**
* Calculate (partial) checksums from bucket_state and the data collection.
*
* Results are not cached.
* Results are not cached here.
*/
private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> {
if (batch.length == 0) {
Expand Down Expand Up @@ -117,61 +141,143 @@ export class MongoChecksums {
}

/**
* Calculate (partial) checksums from the data collection directly.
* Calculate (partial) checksums from the data collection directly, bypassing the cache and bucket_state.
*
* Internally, we do calculations in smaller batches as appropriate.
*/
async queryPartialChecksums(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> {
try {
public async queryPartialChecksums(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> {
// Limit the number of buckets we query for at a time.
const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT;

if (batch.length < bucketBatchLimit) {
// Single batch - no need for splitting the batch and merging results
return await this.queryPartialChecksumsInternal(batch);
} catch (e) {
if (e.codeName == 'MaxTimeMSExpired') {
logger.warn(`Checksum query timed out; falling back to slower version`, e);
// Timeout - try the slower but more robust version
return await this.queryPartialChecksumsFallback(batch);
}
// Split the batch and merge results
let results = new Map<string, PartialOrFullChecksum>();
for (let i = 0; i < batch.length; i += bucketBatchLimit) {
const bucketBatch = batch.slice(i, i + bucketBatchLimit);
const batchResults = await this.queryPartialChecksumsInternal(bucketBatch);
for (let r of batchResults.values()) {
results.set(r.bucket, r);
}
throw lib_mongo.mapQueryError(e, 'while reading checksums');
}
return results;
}

/**
* Query a batch of checksums.
*
* We limit the number of operations that the query aggregates in each batch, to avoid potential query timeouts.
*/
private async queryPartialChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> {
const filters: any[] = [];
const batchLimit = this.options?.operationBatchLimit ?? DEFAULT_OPERATION_BATCH_LIMIT;

// Map requests by bucket. We adjust this as we get partial results.
let requests = new Map<string, FetchPartialBucketChecksum>();
for (let request of batch) {
filters.push({
_id: {
$gt: {
g: this.group_id,
b: request.bucket,
o: request.start ?? new bson.MinKey()
},
$lte: {
g: this.group_id,
b: request.bucket,
o: request.end
}
}
});
requests.set(request.bucket, request);
}

const aggregate = await this.db.bucket_data
.aggregate(
[
{
$match: {
$or: filters
const partialChecksums = new Map<string, PartialOrFullChecksum>();

while (requests.size > 0) {
const filters: any[] = [];
for (let request of requests.values()) {
filters.push({
_id: {
$gt: {
g: this.group_id,
b: request.bucket,
o: request.start ?? new bson.MinKey()
},
$lte: {
g: this.group_id,
b: request.bucket,
o: request.end
}
},
CHECKSUM_QUERY_GROUP_STAGE
],
{ session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS }
)
// Don't map the error here - we want to keep timeout errors as-is
.toArray();
}
});
}

// Aggregate over a max of `batchLimit` operations at a time.
// Let's say we have 3 buckets (A, B, C), each with 10 operations, and our batch limit is 12.
// Then we'll do three batches:
// 1. Query: A[1-end], B[1-end], C[1-end]
// Returns: A[1-10], B[1-2]
// 2. Query: B[3-end], C[1-end]
// Returns: B[3-10], C[1-4]
// 3. Query: C[5-end]
// Returns: C[5-10]
const aggregate = await this.db.bucket_data
.aggregate(
[
{
$match: {
$or: filters
}
},
// sort and limit _before_ grouping
{ $sort: { _id: 1 } },
{ $limit: batchLimit },
{
$group: {
_id: '$_id.b',
// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
checksum_total: { $sum: { $toLong: '$checksum' } },
count: { $sum: 1 },
has_clear_op: {
$max: {
$cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0]
}
},
last_op: { $max: '$_id.o' }
}
}
],
{ session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS }
)
.toArray()
.catch((e) => {
throw lib_mongo.mapQueryError(e, 'while reading checksums');
});

const partialChecksums = new Map<string, PartialOrFullChecksum>(
aggregate.map((doc) => {
let batchCount = 0;
let limitReached = false;
for (let doc of aggregate) {
const bucket = doc._id;
return [bucket, checksumFromAggregate(doc)];
})
);
const checksum = checksumFromAggregate(doc);

const existing = partialChecksums.get(bucket);
if (existing != null) {
partialChecksums.set(bucket, addPartialChecksums(bucket, existing, checksum));
} else {
partialChecksums.set(bucket, checksum);
}

batchCount += doc.count;
if (batchCount == batchLimit) {
// Limit reached. Request more in the next batch.
// Note that this only affects the _last_ bucket in a batch.
limitReached = true;
const req = requests.get(bucket);
requests.set(bucket, {
bucket,
start: doc.last_op,
end: req!.end
});
} else {
// All done for this bucket
requests.delete(bucket);
}
batchCount++;
}
if (!limitReached) {
break;
}
}

return new Map<string, PartialOrFullChecksum>(
batch.map((request) => {
Expand All @@ -197,106 +303,10 @@ export class MongoChecksums {
})
);
}

/**
* Checksums for large buckets can run over the query timeout.
* To avoid this, we query in batches.
* This version can handle larger amounts of data, but is slower, especially for many buckets.
*/
async queryPartialChecksumsFallback(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> {
const partialChecksums = new Map<string, PartialOrFullChecksum>();
for (let request of batch) {
const checksum = await this.slowChecksum(request);
partialChecksums.set(request.bucket, checksum);
}

return partialChecksums;
}

private async slowChecksum(request: FetchPartialBucketChecksum): Promise<PartialOrFullChecksum> {
const batchLimit = 50_000;

let lowerBound = 0n;
const bucket = request.bucket;

let runningChecksum: PartialOrFullChecksum = {
bucket,
partialCount: 0,
partialChecksum: 0
};
if (request.start == null) {
runningChecksum = {
bucket,
count: 0,
checksum: 0
};
}

while (true) {
const filter = {
_id: {
$gt: {
g: this.group_id,
b: bucket,
o: lowerBound
},
$lte: {
g: this.group_id,
b: bucket,
o: request.end
}
}
};
const docs = await this.db.bucket_data
.aggregate(
[
{
$match: filter
},
// sort and limit _before_ grouping
{ $sort: { _id: 1 } },
{ $limit: batchLimit },
CHECKSUM_QUERY_GROUP_STAGE
],
{ session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS }
)
.toArray();
const doc = docs[0];
if (doc == null) {
return runningChecksum;
}
const partial = checksumFromAggregate(doc);
runningChecksum = addPartialChecksums(bucket, runningChecksum, partial);
const isFinal = doc.count != batchLimit;
if (isFinal) {
break;
} else {
lowerBound = doc.last_op;
}
}
return runningChecksum;
}
}

const CHECKSUM_QUERY_GROUP_STAGE = {
$group: {
_id: '$_id.b',
// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
checksum_total: { $sum: { $toLong: '$checksum' } },
count: { $sum: 1 },
has_clear_op: {
$max: {
$cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0]
}
},
last_op: { $max: '$_id.o' }
}
};

/**
* Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum.
* Convert output of the $group stage into a checksum.
*/
function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum {
const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff;
Expand Down
Loading