Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/stupid-maps-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix checksum cache edge case with compacting
34 changes: 32 additions & 2 deletions packages/service-core/src/storage/ChecksumCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,34 @@ interface ChecksumFetchContext {
checkpoint: bigint;
}

export interface PartialChecksum {
bucket: string;
/**
* 32-bit unsigned hash.
*/
partialChecksum: number;

/**
* Count of operations - informational only.
*/
partialCount: number;

/**
* True if the queried operations contains (starts with) a CLEAR
* operation, indicating that the partial checksum is the full
* checksum, and must not be added to a previously-cached checksum.
*/
isFullChecksum: boolean;
}
export interface FetchPartialBucketChecksum {
bucket: string;
start?: OpId;
end: OpId;
}

export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise<ChecksumMap>;
export type PartialChecksumMap = Map<string, PartialChecksum>;

export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise<PartialChecksumMap>;

export interface ChecksumCacheOptions {
/**
Expand All @@ -33,6 +54,8 @@ export interface ChecksumCacheOptions {
// Approximately 5MB of memory, if we assume 50 bytes per entry
const DEFAULT_MAX_SIZE = 100_000;

const TTL_MS = 3_600_000;

/**
* Implement a LRU cache for checksum requests. Each (bucket, checkpoint) request is cached separately,
* while the lookups occur in batches.
Expand Down Expand Up @@ -93,7 +116,14 @@ export class ChecksumCache {

// When we have more fetches than the cache size, complete the fetches instead
// of failing with Error('evicted').
ignoreFetchAbort: true
ignoreFetchAbort: true,

// We use a TTL so that counts can eventually be refreshed
// after a compact. This only has effect if the bucket has
// not been checked in the meantime.
ttl: TTL_MS,
ttlResolution: 1_000,
allowStale: false
});
}

Expand Down
26 changes: 18 additions & 8 deletions packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
SyncRulesBucketStorage,
SyncRuleStatus
} from '../BucketStorage.js';
import { ChecksumCache, FetchPartialBucketChecksum } from '../ChecksumCache.js';
import { ChecksumCache, FetchPartialBucketChecksum, PartialChecksum, PartialChecksumMap } from '../ChecksumCache.js';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { SourceTable } from '../SourceTable.js';
import { PowerSyncMongo } from './db.js';
Expand Down Expand Up @@ -333,7 +333,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
return this.checksumCache.getChecksumMap(checkpoint, buckets);
}

private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise<util.ChecksumMap> {
private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> {
if (batch.length == 0) {
return new Map();
}
Expand Down Expand Up @@ -365,22 +365,32 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
}
},
{
$group: { _id: '$_id.b', checksum_total: { $sum: '$checksum' }, count: { $sum: 1 } }
$group: {
_id: '$_id.b',
checksum_total: { $sum: '$checksum' },
count: { $sum: 1 },
has_clear_op: {
$max: {
$cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0]
}
}
}
}
],
{ session: undefined }
{ session: undefined, readConcern: 'snapshot' }
)
.toArray();

return new Map<string, util.BucketChecksum>(
return new Map<string, PartialChecksum>(
aggregate.map((doc) => {
return [
doc._id,
{
bucket: doc._id,
count: doc.count,
checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff
} satisfies util.BucketChecksum
partialCount: doc.count,
partialChecksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff,
isFullChecksum: doc.has_clear_op == 1
} satisfies PartialChecksum
];
})
);
Expand Down
6 changes: 3 additions & 3 deletions packages/service-core/src/storage/mongo/util.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { SqliteJsonValue } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import * as mongo from 'mongodb';
import * as crypto from 'crypto';
import { BucketDataDocument } from './models.js';
import { timestampToOpId } from '../../util/utils.js';
import * as mongo from 'mongodb';
import { OplogEntry } from '../../util/protocol-types.js';
import { timestampToOpId } from '../../util/utils.js';
import { BucketDataDocument } from './models.js';

/**
* Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers.
Expand Down
19 changes: 13 additions & 6 deletions packages/service-core/src/util/utils.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import crypto from 'crypto';

import { logger } from '@powersync/lib-services-framework';
import * as pgwire from '@powersync/service-jpgwire';
import { pgwireRows } from '@powersync/service-jpgwire';

import { PartialChecksum } from '../storage/ChecksumCache.js';
import * as storage from '../storage/storage-index.js';
import { BucketChecksum, OpId } from './protocol-types.js';
import { retriedQuery } from './pgwire_utils.js';
import { logger } from '@powersync/lib-services-framework';
import { BucketChecksum, OpId } from './protocol-types.js';

export type ChecksumMap = Map<string, BucketChecksum>;

Expand Down Expand Up @@ -64,14 +65,20 @@ export function addChecksums(a: number, b: number) {
return (a + b) & 0xffffffff;
}

export function addBucketChecksums(a: BucketChecksum, b: BucketChecksum | null): BucketChecksum {
export function addBucketChecksums(a: BucketChecksum, b: PartialChecksum | null): BucketChecksum {
if (b == null) {
return a;
} else if (b.isFullChecksum) {
return {
bucket: b.bucket,
count: b.partialCount,
checksum: b.partialChecksum
};
} else {
return {
bucket: a.bucket,
count: a.count + b.count,
checksum: addChecksums(a.checksum, b.checksum)
count: a.count + b.partialCount,
checksum: addChecksums(a.checksum, b.partialChecksum)
};
}
}
Expand Down
47 changes: 27 additions & 20 deletions packages/service-core/test/src/checksum_cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { describe, expect, it } from 'vitest';
import { BucketChecksum, OpId } from '@/util/protocol-types.js';
import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum, PartialChecksum } from '@/storage/ChecksumCache.js';
import { OpId } from '@/util/protocol-types.js';
import { addChecksums } from '@/util/util-index.js';
import * as crypto from 'node:crypto';
import { addBucketChecksums } from '@/util/util-index.js';
import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum } from '@/storage/ChecksumCache.js';
import { describe, expect, it } from 'vitest';

/**
* Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes.
Expand All @@ -13,28 +13,22 @@ function testHash(bucket: string, checkpoint: OpId) {
return hash;
}

function testPartialHash(request: FetchPartialBucketChecksum): BucketChecksum {
function testPartialHash(request: FetchPartialBucketChecksum): PartialChecksum {
if (request.start) {
const a = testHash(request.bucket, request.start);
const b = testHash(request.bucket, request.end);
return addBucketChecksums(
{
bucket: request.bucket,
checksum: b,
count: Number(request.end)
},
{
// Subtract a
bucket: request.bucket,
checksum: -a,
count: -Number(request.start)
}
);
return {
bucket: request.bucket,
partialCount: Number(request.end) - Number(request.start),
partialChecksum: addChecksums(b, -a),
isFullChecksum: false
};
} else {
return {
bucket: request.bucket,
checksum: testHash(request.bucket, request.end),
count: Number(request.end)
partialChecksum: testHash(request.bucket, request.end),
partialCount: Number(request.end),
isFullChecksum: true
};
}
}
Expand Down Expand Up @@ -433,4 +427,17 @@ describe('checksum cache', function () {
[{ bucket: 'test2', end: '123' }]
]);
});

it('should handle CLEAR/isFullChecksum checksums', async function () {
let lookups: FetchPartialBucketChecksum[][] = [];
const cache = factory(async (batch) => {
lookups.push(batch);
// This forces a `isFullChecksum: true` result
delete batch[0].start;
return fetchTestChecksums(batch);
});

expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]);
expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]);
});
});
78 changes: 78 additions & 0 deletions packages/service-core/test/src/compacting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ bucket_definitions:

const batchBefore = await oneFromAsync(storage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])));
const dataBefore = batchBefore.batch.data;
const checksumBefore = await storage.getChecksums(checkpoint, ['global[]']);

expect(dataBefore).toMatchObject([
{
Expand All @@ -82,6 +83,7 @@ bucket_definitions:

const batchAfter = await oneFromAsync(storage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])));
const dataAfter = batchAfter.batch.data;
const checksumAfter = await storage.getChecksums(checkpoint, ['global[]']);

expect(batchAfter.targetOp).toEqual(3n);
expect(dataAfter).toMatchObject([
Expand All @@ -104,6 +106,8 @@ bucket_definitions:
}
]);

expect(checksumBefore.get('global[]')).toEqual(checksumAfter.get('global[]'));

validateCompactedBucket(dataBefore, dataAfter);
});

Expand Down Expand Up @@ -154,6 +158,7 @@ bucket_definitions:

const batchBefore = await oneFromAsync(storage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])));
const dataBefore = batchBefore.batch.data;
const checksumBefore = await storage.getChecksums(checkpoint, ['global[]']);

expect(dataBefore).toMatchObject([
{
Expand Down Expand Up @@ -186,6 +191,7 @@ bucket_definitions:

const batchAfter = await oneFromAsync(storage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])));
const dataAfter = batchAfter.batch.data;
const checksumAfter = await storage.getChecksums(checkpoint, ['global[]']);

expect(batchAfter.targetOp).toEqual(4n);
expect(dataAfter).toMatchObject([
Expand All @@ -201,7 +207,79 @@ bucket_definitions:
op_id: '4'
}
]);
expect(checksumBefore.get('global[]')).toEqual(checksumAfter.get('global[]'));

validateCompactedBucket(dataBefore, dataAfter);
});

test('compacting (3)', async () => {
const sync_rules = SqlSyncRules.fromYaml(`
bucket_definitions:
global:
data: [select * from test]
`);

const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });

const result = await storage.startBatch({}, async (batch) => {
await batch.save({
sourceTable: TEST_TABLE,
tag: 'insert',
after: {
id: 't1'
}
});

await batch.save({
sourceTable: TEST_TABLE,
tag: 'insert',
after: {
id: 't2'
}
});

await batch.save({
sourceTable: TEST_TABLE,
tag: 'delete',
before: {
id: 't1'
}
});
});

const checkpoint1 = result!.flushed_op;
const checksumBefore = await storage.getChecksums(checkpoint1, ['global[]']);
console.log('before', checksumBefore);

const result2 = await storage.startBatch({}, async (batch) => {
await batch.save({
sourceTable: TEST_TABLE,
tag: 'delete',
before: {
id: 't2'
}
});
});
const checkpoint2 = result2!.flushed_op;

await storage.compact(compactOptions);

const batchAfter = await oneFromAsync(storage.getBucketDataBatch(checkpoint2, new Map([['global[]', '0']])));
const dataAfter = batchAfter.batch.data;
const checksumAfter = await storage.getChecksums(checkpoint2, ['global[]']);

expect(batchAfter.targetOp).toEqual(4n);
expect(dataAfter).toMatchObject([
{
checksum: 857217610,
op: 'CLEAR',
op_id: '4'
}
]);
expect(checksumAfter.get('global[]')).toEqual({
bucket: 'global[]',
count: 1,
checksum: 857217610
});
});
}
Loading