diff --git a/.changeset/fifty-socks-fly.md b/.changeset/fifty-socks-fly.md new file mode 100644 index 000000000..906449589 --- /dev/null +++ b/.changeset/fifty-socks-fly.md @@ -0,0 +1,11 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +--- + +Create a persisted checksum cache when compacting buckets. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index d1189b8e3..472565106 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -97,7 +97,7 @@ export class MongoBucketBatch private persisted_op: InternalOpId | null = null; /** - * For tests only - not for persistence logic. + * Last written op, if any. This may not reflect a consistent checkpoint. */ public last_flushed_op: InternalOpId | null = null; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 6f4d852f9..f6bf44f2c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -1,10 +1,11 @@ import { mongo } from '@powersync/lib-service-mongodb'; -import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework'; -import { InternalOpId, storage, utils } from '@powersync/service-core'; +import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework'; +import { addChecksums, InternalOpId, storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; -import { BucketDataDocument, BucketDataKey } from './models.js'; +import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js'; import { cacheKey } from './OperationBatch.js'; +import { readSingleBatch } from './util.js'; interface CurrentBucketState { /** Bucket name */ @@ -27,6 +28,21 @@ interface CurrentBucketState { * Number of REMOVE/MOVE operations seen since lastNotPut. */ opsSincePut: number; + + /** + * Incrementally-updated checksum, up to maxOpId + */ + checksum: number; + + /** + * op count for the checksum + */ + opCount: number; + + /** + * Byte size of ops covered by the checksum. + */ + opBytes: number; } /** @@ -43,13 +59,15 @@ const DEFAULT_MEMORY_LIMIT_MB = 64; export class MongoCompactor { private updates: mongo.AnyBulkWriteOperation[] = []; + private bucketStateUpdates: mongo.AnyBulkWriteOperation[] = []; private idLimitBytes: number; private moveBatchLimit: number; private moveBatchQueryLimit: number; private clearBatchLimit: number; - private maxOpId: bigint | undefined; + private maxOpId: bigint; private buckets: string[] | undefined; + private signal?: AbortSignal; constructor( private db: PowerSyncMongo, @@ -60,8 +78,9 @@ export class MongoCompactor { this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT; this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT; this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT; - this.maxOpId = options?.maxOpId; + this.maxOpId = options?.maxOpId ?? 0n; this.buckets = options?.compactBuckets; + this.signal = options?.signal; } /** @@ -117,31 +136,33 @@ export class MongoCompactor { o: new mongo.MaxKey() as any }; - while (true) { + while (!this.signal?.aborted) { // Query one batch at a time, to avoid cursor timeouts - const batch = await this.db.bucket_data - .find( - { + const cursor = this.db.bucket_data.aggregate([ + { + $match: { _id: { $gte: lowerBound, $lt: upperBound } - }, - { - projection: { - _id: 1, - op: 1, - table: 1, - row_id: 1, - source_table: 1, - source_key: 1 - }, - limit: this.moveBatchQueryLimit, - sort: { _id: -1 }, - singleBatch: true } - ) - .toArray(); + }, + { $sort: { _id: -1 } }, + { $limit: this.moveBatchQueryLimit }, + { + $project: { + _id: 1, + op: 1, + table: 1, + row_id: 1, + source_table: 1, + source_key: 1, + checksum: 1, + size: { $bsonSize: '$$ROOT' } + } + } + ]); + const { data: batch } = await readSingleBatch(cursor); if (batch.length == 0) { // We've reached the end @@ -153,34 +174,47 @@ export class MongoCompactor { for (let doc of batch) { if (currentState == null || doc._id.b != currentState.bucket) { - if (currentState != null && currentState.lastNotPut != null && currentState.opsSincePut >= 1) { - // Important to flush before clearBucket() - await this.flush(); - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); + if (currentState != null) { + if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { + // Important to flush before clearBucket() + // Does not have to happen before flushBucketChecksums() + await this.flush(); + logger.info( + `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` + ); + + // Free memory before clearing bucket + currentState!.seen.clear(); + + await this.clearBucket(currentState); + } - const bucket = currentState.bucket; - const clearOp = currentState.lastNotPut; - // Free memory before clearing bucket - currentState = null; - await this.clearBucket(bucket, clearOp); + // Should happen after clearBucket() for accurate stats + this.updateBucketChecksums(currentState); } currentState = { bucket: doc._id.b, seen: new Map(), trackingSize: 0, lastNotPut: null, - opsSincePut: 0 + opsSincePut: 0, + + checksum: 0, + opCount: 0, + opBytes: 0 }; } - if (this.maxOpId != null && doc._id.o > this.maxOpId) { + if (doc._id.o > this.maxOpId) { continue; } + currentState.checksum = addChecksums(currentState.checksum, Number(doc.checksum)); + currentState.opCount += 1; + let isPersistentPut = doc.op == 'PUT'; + currentState.opBytes += Number(doc.size); if (doc.op == 'REMOVE' || doc.op == 'PUT') { const key = `${doc.table}/${doc.row_id}/${cacheKey(doc.source_table!, doc.source_key!)}`; const targetOp = currentState.seen.get(key); @@ -208,6 +242,8 @@ export class MongoCompactor { } } }); + + currentState.opBytes += 200 - Number(doc.size); // TODO: better estimate for this } else { if (currentState.trackingSize >= idLimitBytes) { // Reached memory limit. @@ -234,24 +270,72 @@ export class MongoCompactor { currentState.opsSincePut += 1; } - if (this.updates.length >= this.moveBatchLimit) { + if (this.updates.length + this.bucketStateUpdates.length >= this.moveBatchLimit) { await this.flush(); } } } - await this.flush(); currentState?.seen.clear(); if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) { logger.info( `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` ); - const bucket = currentState.bucket; - const clearOp = currentState.lastNotPut; - // Free memory before clearing bucket - currentState = null; - await this.clearBucket(bucket, clearOp); + // Need flush() before clear() + await this.flush(); + await this.clearBucket(currentState); } + if (currentState != null) { + // Do this _after_ clearBucket so that we have accurate counts. + this.updateBucketChecksums(currentState); + } + // Need another flush after updateBucketChecksums() + await this.flush(); + } + + /** + * Call when done with a bucket. + */ + private updateBucketChecksums(state: CurrentBucketState) { + if (state.opCount < 0) { + throw new ServiceAssertionError( + `Invalid opCount: ${state.opCount} checksum ${state.checksum} opsSincePut: ${state.opsSincePut} maxOpId: ${this.maxOpId}` + ); + } + this.bucketStateUpdates.push({ + updateOne: { + filter: { + _id: { + g: this.group_id, + b: state.bucket + } + }, + update: { + $set: { + compacted_state: { + op_id: this.maxOpId, + count: state.opCount, + checksum: BigInt(state.checksum), + bytes: state.opBytes + }, + estimate_since_compact: { + // Note: There could have been a whole bunch of new operations added to the bucket _while_ compacting, + // which we don't currently cater for. + // We could potentially query for that, but that could add overhead. + count: 0, + bytes: 0 + } + }, + $setOnInsert: { + // Only set this if we're creating the document. + // In all other cases, the replication process will have a set a more accurate id. + last_op: this.maxOpId + } + }, + // We generally expect this to have been created before, but do handle cases of old unchanged buckets + upsert: true + } + }); } private async flush() { @@ -266,15 +350,26 @@ export class MongoCompactor { }); this.updates = []; } + if (this.bucketStateUpdates.length > 0) { + logger.info(`Updating ${this.bucketStateUpdates.length} bucket states`); + await this.db.bucket_state.bulkWrite(this.bucketStateUpdates, { + ordered: false + }); + this.bucketStateUpdates = []; + } } /** * Perform a CLEAR compact for a bucket. * + * * @param bucket bucket name * @param op op_id of the last non-PUT operation, which will be converted to CLEAR. */ - private async clearBucket(bucket: string, op: InternalOpId) { + private async clearBucket(currentState: CurrentBucketState) { + const bucket = currentState.bucket; + const clearOp = currentState.lastNotPut!; + const opFilter = { _id: { $gte: { @@ -285,7 +380,7 @@ export class MongoCompactor { $lte: { g: this.group_id, b: bucket, - o: op + o: clearOp } } }; @@ -293,7 +388,8 @@ export class MongoCompactor { const session = this.db.client.startSession(); try { let done = false; - while (!done) { + while (!done && !this.signal?.aborted) { + let opCountDiff = 0; // Do the CLEAR operation in batches, with each batch a separate transaction. // The state after each batch is fully consistent. // We need a transaction per batch to make sure checksums stay consistent. @@ -364,12 +460,16 @@ export class MongoCompactor { }, { session } ); + + opCountDiff = -numberOfOpsToClear + 1; }, { writeConcern: { w: 'majority' }, readConcern: { level: 'snapshot' } } ); + // Update _outside_ the transaction, since the transaction can be retried multiple times. + currentState.opCount += opCountDiff; } } finally { await session.endSession(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index d7f824f7f..781fc3cc8 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -7,14 +7,18 @@ import { ServiceAssertionError } from '@powersync/lib-services-framework'; import { + addPartialChecksums, BroadcastIterable, + BucketChecksum, CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, + CompactOptions, deserializeParameterLookup, GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, maxLsn, + PartialChecksum, ProtocolOpId, ReplicationCheckpoint, storage, @@ -31,9 +35,9 @@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; +import { MongoParameterCompactor } from './MongoParameterCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js'; -import { MongoParameterCompactor } from './MongoParameterCompactor.js'; export class MongoSyncBucketStorage extends BaseObserver @@ -490,24 +494,71 @@ export class MongoSyncBucketStorage return this.checksumCache.getChecksumMap(checkpoint, buckets); } + clearChecksumCache() { + this.checksumCache.clear(); + } + private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { if (batch.length == 0) { return new Map(); } + const preFilters: any[] = []; + for (let request of batch) { + if (request.start == null) { + preFilters.push({ + _id: { + g: this.group_id, + b: request.bucket + }, + 'compacted_state.op_id': { $exists: true, $lte: request.end } + }); + } + } + + const preStates = new Map(); + + if (preFilters.length > 0) { + // For un-cached bucket checksums, attempt to use the compacted state first. + const states = await this.db.bucket_state + .find({ + $or: preFilters + }) + .toArray(); + for (let state of states) { + const compactedState = state.compacted_state!; + preStates.set(state._id.b, { + opId: compactedState.op_id, + checksum: { + bucket: state._id.b, + checksum: Number(compactedState.checksum), + count: compactedState.count + } + }); + } + } + const filters: any[] = []; for (let request of batch) { + let start = request.start; + if (start == null) { + const preState = preStates.get(request.bucket); + if (preState != null) { + start = preState.opId; + } + } + filters.push({ _id: { $gt: { g: this.group_id, b: request.bucket, - o: request.start ? BigInt(request.start) : new bson.MinKey() + o: start ?? new bson.MinKey() }, $lte: { g: this.group_id, b: request.bucket, - o: BigInt(request.end) + o: request.end } } }); @@ -544,19 +595,41 @@ export class MongoSyncBucketStorage throw lib_mongo.mapQueryError(e, 'while reading checksums'); }); - return new Map( + const partialChecksums = new Map( aggregate.map((doc) => { + const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; + const bucket = doc._id; return [ - doc._id, - { - bucket: doc._id, - partialCount: doc.count, - partialChecksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff, - isFullChecksum: doc.has_clear_op == 1 - } satisfies storage.PartialChecksum + bucket, + doc.has_clear_op == 1 + ? ({ + // full checksum - replaces any previous one + bucket, + checksum: partialChecksum, + count: doc.count + } satisfies BucketChecksum) + : ({ + // partial checksum - is added to a previous one + bucket, + partialCount: doc.count, + partialChecksum + } satisfies PartialChecksum) ]; }) ); + + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available + const preState = preStates.get(bucket); + // Could be null if we got no data + const partialChecksum = partialChecksums.get(bucket); + const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); + + return [bucket, merged]; + }) + ); } async terminate(options?: storage.TerminateOptions) { @@ -701,13 +774,31 @@ export class MongoSyncBucketStorage } async compact(options?: storage.CompactOptions) { - const checkpoint = await this.getCheckpointInternal(); - await new MongoCompactor(this.db, this.group_id, options).compact(); - if (checkpoint != null && options?.compactParameterData) { - await new MongoParameterCompactor(this.db, this.group_id, checkpoint.checkpoint, options).compact(); + let maxOpId = options?.maxOpId; + if (maxOpId == null) { + const checkpoint = await this.getCheckpointInternal(); + maxOpId = checkpoint?.checkpoint ?? undefined; + } + await new MongoCompactor(this.db, this.group_id, { ...options, maxOpId }).compact(); + if (maxOpId != null && options?.compactParameterData) { + await new MongoParameterCompactor(this.db, this.group_id, maxOpId, options).compact(); } } + async populatePersistentChecksumCache(options: Pick): Promise { + const start = Date.now(); + // We do a minimal compact, primarily to populate the checksum cache + await this.compact({ + ...options, + // Skip parameter data + compactParameterData: false, + // Don't track updates for MOVE compacting + memoryLimitMB: 0 + }); + const duration = Date.now() - start; + logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`); + } + /** * Instance-wide watch on the latest available checkpoint (op_id + lsn). */ diff --git a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts index b319053d5..5f33d0988 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts @@ -71,15 +71,17 @@ export class PersistedBatch { this.logger = options?.logger ?? defaultLogger; } - private incrementBucket(bucket: string, op_id: InternalOpId) { + private incrementBucket(bucket: string, op_id: InternalOpId, bytes: number) { let existingState = this.bucketStates.get(bucket); if (existingState) { existingState.lastOp = op_id; existingState.incrementCount += 1; + existingState.incrementBytes += bytes; } else { this.bucketStates.set(bucket, { lastOp: op_id, - incrementCount: 1 + incrementCount: 1, + incrementBytes: bytes }); } } @@ -115,7 +117,8 @@ export class PersistedBatch { } remaining_buckets.delete(key); - this.currentSize += recordData.length + 200; + const byteEstimate = recordData.length + 200; + this.currentSize += byteEstimate; const op_id = options.op_seq.next(); this.debugLastOpId = op_id; @@ -138,7 +141,7 @@ export class PersistedBatch { } } }); - this.incrementBucket(k.bucket, op_id); + this.incrementBucket(k.bucket, op_id, byteEstimate); } for (let bd of remaining_buckets.values()) { @@ -166,7 +169,7 @@ export class PersistedBatch { } }); this.currentSize += 200; - this.incrementBucket(bd.bucket, op_id); + this.incrementBucket(bd.bucket, op_id, 200); } } @@ -369,6 +372,10 @@ export class PersistedBatch { update: { $set: { last_op: state.lastOp + }, + $inc: { + 'estimate_since_compact.count': state.incrementCount, + 'estimate_since_compact.bytes': state.incrementBytes } }, upsert: true @@ -381,4 +388,5 @@ export class PersistedBatch { interface BucketStateUpdate { lastOp: InternalOpId; incrementCount: number; + incrementBytes: number; } diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 33eac22d8..b877f7f98 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -98,6 +98,21 @@ export interface BucketStateDocument { b: string; }; last_op: bigint; + /** + * If set, this can be treated as "cache" of a checksum at a specific point. + * Can be updated periodically, for example by the compact job. + */ + compacted_state?: { + op_id: InternalOpId; + count: number; + checksum: bigint; + bytes: number; + }; + + estimate_since_compact?: { + count: number; + bytes: number; + }; } export interface IdSequenceDocument { diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/storage/implementation/util.ts index d6a4f489e..9cf5eabd6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/util.ts @@ -43,7 +43,7 @@ export function generateSlotName(prefix: string, sync_rules_id: number) { * * For this to be effective, set batchSize = limit in the find command. */ -export async function readSingleBatch(cursor: mongo.FindCursor): Promise<{ data: T[]; hasMore: boolean }> { +export async function readSingleBatch(cursor: mongo.AbstractCursor): Promise<{ data: T[]; hasMore: boolean }> { try { let data: T[]; let hasMore = true; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index e22a2f713..a529c7336 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -10,6 +10,7 @@ import { ServiceError } from '@powersync/lib-services-framework'; import { + InternalOpId, MetricsEngine, RelationCache, SaveOperationTag, @@ -317,7 +318,7 @@ export class ChangeStream { const sourceTables = this.sync_rules.getSourceTables(); await this.client.connect(); - await this.storage.startBatch( + const flushResult = await this.storage.startBatch( { logger: this.logger, zeroLSN: MongoLSN.ZERO.comparable, @@ -383,6 +384,7 @@ export class ChangeStream { this.logger.info(`Snapshot done. Need to replicate from ${snapshotLsn} to ${checkpoint} to be consistent`); } ); + return { lastOpId: flushResult?.flushed_op }; } private async setupCheckpointsCollection() { @@ -689,7 +691,15 @@ export class ChangeStream { // Snapshot LSN is not present, so we need to start replication from scratch. await this.storage.clear({ signal: this.abort_signal }); } - await this.initialReplication(result.snapshotLsn); + const { lastOpId } = await this.initialReplication(result.snapshotLsn); + if (lastOpId != null) { + // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. + await this.storage.populatePersistentChecksumCache({ + signal: this.abort_signal, + // No checkpoint yet, but we do have the opId. + maxOpId: lastOpId + }); + } } } diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 3d4c87563..c0d4e91e3 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -10,6 +10,7 @@ import { ColumnDescriptor, framework, getUuidReplicaIdentityBson, + InternalOpId, MetricsEngine, SourceTable, storage @@ -252,6 +253,7 @@ export class BinLogStream { const promiseConnection = (connection as mysql.Connection).promise(); const headGTID = await common.readExecutedGtid(promiseConnection); this.logger.info(`Using snapshot checkpoint GTID: '${headGTID}'`); + let lastOp: InternalOpId | null = null; try { this.logger.info(`Starting initial replication`); await promiseConnection.query( @@ -261,7 +263,7 @@ export class BinLogStream { await promiseConnection.query(`SET time_zone = '+00:00'`); const sourceTables = this.syncRules.getSourceTables(); - await this.storage.startBatch( + const flushResults = await this.storage.startBatch( { logger: this.logger, zeroLSN: common.ReplicatedGTID.ZERO.comparable, @@ -280,6 +282,7 @@ export class BinLogStream { await batch.commit(headGTID.comparable); } ); + lastOp = flushResults?.flushed_op ?? null; this.logger.info(`Initial replication done`); await promiseConnection.query('COMMIT'); } catch (e) { @@ -288,6 +291,15 @@ export class BinLogStream { } finally { connection.release(); } + + if (lastOp != null) { + // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. + await this.storage.populatePersistentChecksumCache({ + // No checkpoint yet, but we do have the opId. + maxOpId: lastOp, + signal: this.abortSignal + }); + } } private async snapshotTable( diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 21d8d448d..88f36b145 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -1,13 +1,16 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { BroadcastIterable, + BucketChecksum, CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, + CompactOptions, GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, LastValueSink, maxLsn, + PartialChecksum, ReplicationCheckpoint, storage, utils, @@ -109,6 +112,10 @@ export class PostgresSyncRulesStorage return new PostgresCompactor(this.db, this.group_id, options).compact(); } + async populatePersistentChecksumCache(options: Pick): Promise { + // no-op - checksum cache is not implemented for Postgres yet + } + lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { return this.writeCheckpointAPI.lastWriteCheckpoint({ ...filters, @@ -571,6 +578,10 @@ export class PostgresSyncRulesStorage return this.checksumCache.getChecksumMap(checkpoint, buckets); } + clearChecksumCache() { + this.checksumCache.clear(); + } + async terminate(options?: storage.TerminateOptions) { if (!options || options?.clearStorage) { await this.clear(options); @@ -692,16 +703,24 @@ export class PostgresSyncRulesStorage b.bucket_name; `.rows<{ bucket: string; checksum_total: bigint; total: bigint; has_clear_op: number }>(); - return new Map( + return new Map( results.map((doc) => { + const checksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; + return [ doc.bucket, - { - bucket: doc.bucket, - partialCount: Number(doc.total), - partialChecksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff, - isFullChecksum: doc.has_clear_op == 1 - } satisfies storage.PartialChecksum + doc.has_clear_op == 1 + ? ({ + // full checksum + bucket: doc.bucket, + count: Number(doc.total), + checksum + } satisfies BucketChecksum) + : ({ + bucket: doc.bucket, + partialCount: Number(doc.total), + partialChecksum: checksum + } satisfies PartialChecksum) ]; }) ); diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index ad088f296..3813caa0a 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -458,7 +458,7 @@ WHERE oid = $1::regclass`, async initialReplication(db: pgwire.PgConnection) { const sourceTables = this.sync_rules.getSourceTables(); - await this.storage.startBatch( + const flushResults = await this.storage.startBatch( { logger: this.logger, zeroLSN: ZERO_LSN, @@ -506,6 +506,16 @@ WHERE oid = $1::regclass`, * to advance the active sync rules LSN. */ await sendKeepAlive(db); + + const lastOp = flushResults?.flushed_op; + if (lastOp != null) { + // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. + await this.storage.populatePersistentChecksumCache({ + // No checkpoint yet, but we do have the opId. + maxOpId: lastOp, + signal: this.abort_signal + }); + } } static *getQueryData(results: Iterable): Generator { diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index c4d975db6..8edad4516 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -6,14 +6,15 @@ const TEST_TABLE = test_utils.makeTestTable('test', ['id']); export function registerCompactTests(generateStorageFactory: storage.TestStorageFactory) { test('compacting (1)', async () => { - const sync_rules = test_utils.testRules(` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: [select * from test] - `); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + ` + }); + const bucketStorage = factory.getInstance(syncRules); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -42,6 +43,8 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('t2') }); + + await batch.commit('1/1'); }); const checkpoint = result!.flushed_op; @@ -72,6 +75,7 @@ bucket_definitions: op_id: '3' } ]); + expect(batchBefore.targetOp).toEqual(null); await bucketStorage.compact({ clearBatchLimit: 2, @@ -84,6 +88,8 @@ bucket_definitions: ); const dataAfter = batchAfter.chunkData.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); + bucketStorage.clearChecksumCache(); + const checksumAfter2 = await bucketStorage.getChecksums(checkpoint, ['global[]']); expect(batchAfter.targetOp).toEqual(3n); expect(dataAfter).toMatchObject([ @@ -106,20 +112,22 @@ bucket_definitions: } ]); - expect(checksumBefore.get('global[]')).toEqual(checksumAfter.get('global[]')); + expect(checksumAfter.get('global[]')).toEqual(checksumBefore.get('global[]')); + expect(checksumAfter2.get('global[]')).toEqual(checksumBefore.get('global[]')); test_utils.validateCompactedBucket(dataBefore, dataAfter); }); test('compacting (2)', async () => { - const sync_rules = test_utils.testRules(` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: [select * from test] - `); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + ` + }); + const bucketStorage = factory.getInstance(syncRules); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -157,6 +165,8 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('t2') }); + + await batch.commit('1/1'); }); const checkpoint = result!.flushed_op; @@ -204,6 +214,7 @@ bucket_definitions: bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); const dataAfter = batchAfter.chunkData.data; + bucketStorage.clearChecksumCache(); const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); expect(batchAfter.targetOp).toEqual(4n); @@ -220,20 +231,24 @@ bucket_definitions: op_id: '4' } ]); - expect(checksumBefore.get('global[]')).toEqual(checksumAfter.get('global[]')); + expect(checksumAfter.get('global[]')).toEqual({ + ...checksumBefore.get('global[]'), + count: 2 + }); test_utils.validateCompactedBucket(dataBefore, dataAfter); }); test('compacting (3)', async () => { - const sync_rules = test_utils.testRules(` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: [select * from test] - `); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + ` + }); + const bucketStorage = factory.getInstance(syncRules); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -262,6 +277,8 @@ bucket_definitions: }, beforeReplicaId: 't1' }); + + await batch.commit('1/1'); }); const checkpoint1 = result!.flushed_op; @@ -276,6 +293,7 @@ bucket_definitions: }, beforeReplicaId: 't2' }); + await batch.commit('2/1'); }); const checkpoint2 = result2!.flushed_op; @@ -289,6 +307,7 @@ bucket_definitions: bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', 0n]])) ); const dataAfter = batchAfter.chunkData.data; + await bucketStorage.clearChecksumCache(); const checksumAfter = await bucketStorage.getChecksums(checkpoint2, ['global[]']); expect(batchAfter.targetOp).toEqual(4n); @@ -307,18 +326,18 @@ bucket_definitions: }); test('compacting (4)', async () => { - const sync_rules = test_utils.testRules(/* yaml */ - ` bucket_definitions: - grouped: - # The parameter query here is not important - # We specifically don't want to create bucket_parameter records here - # since the op_ids for bucket_data could vary between storage implementations. - parameters: select 'b' as b - data: - - select * from test where b = bucket.b`); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + const syncRules = await factory.updateSyncRules({ + /* yaml */ content: ` bucket_definitions: + grouped: + # The parameter query here is not important + # We specifically don't want to create bucket_parameter records here + # since the op_ids for bucket_data could vary between storage implementations. + parameters: select 'b' as b + data: + - select * from test where b = bucket.b` + }); + const bucketStorage = factory.getInstance(syncRules); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { /** @@ -383,6 +402,8 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('t2') }); + + await batch.commit('1/1'); } }); @@ -431,4 +452,73 @@ bucket_definitions: ]) ); }); + + test('partial checksums after compacting', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: [select * from test] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1' + }, + afterReplicaId: 't1' + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2' + }, + afterReplicaId: 't2' + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.DELETE, + before: { + id: 't1' + }, + beforeReplicaId: 't1' + }); + + await batch.commit('1/1'); + }); + + await bucketStorage.compact({ + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 1 + }); + + const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.DELETE, + before: { + id: 't2' + }, + beforeReplicaId: 't2' + }); + await batch.commit('2/1'); + }); + const checkpoint2 = result2!.flushed_op; + await bucketStorage.clearChecksumCache(); + const checksumAfter = await bucketStorage.getChecksums(checkpoint2, ['global[]']); + expect(checksumAfter.get('global[]')).toEqual({ + bucket: 'global[]', + count: 4, + checksum: 1874612650 + }); + }); } diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 350d10039..2e5ac1c22 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -1,40 +1,21 @@ import { OrderedSet } from '@js-sdsl/ordered-set'; import { LRUCache } from 'lru-cache/min'; import { BucketChecksum } from '../util/protocol-types.js'; -import { addBucketChecksums, ChecksumMap, InternalOpId } from '../util/utils.js'; +import { addBucketChecksums, ChecksumMap, InternalOpId, PartialChecksum } from '../util/utils.js'; interface ChecksumFetchContext { fetch(bucket: string): Promise; checkpoint: InternalOpId; } -export interface PartialChecksum { - bucket: string; - /** - * 32-bit unsigned hash. - */ - partialChecksum: number; - - /** - * Count of operations - informational only. - */ - partialCount: number; - - /** - * True if the queried operations contains (starts with) a CLEAR - * operation, indicating that the partial checksum is the full - * checksum, and must not be added to a previously-cached checksum. - */ - isFullChecksum: boolean; -} - export interface FetchPartialBucketChecksum { bucket: string; start?: InternalOpId; end: InternalOpId; } -export type PartialChecksumMap = Map; +export type PartialOrFullChecksum = PartialChecksum | BucketChecksum; +export type PartialChecksumMap = Map; export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise; @@ -127,6 +108,11 @@ export class ChecksumCache { }); } + clear() { + this.cache.clear(); + this.bucketCheckpoints.clear(); + } + async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { const checksums = await this.getChecksumMap(checkpoint, buckets); // Return results in the same order as the request diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 8ad18df89..e10ef392e 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -62,6 +62,11 @@ export interface SyncRulesBucketStorage compact(options?: CompactOptions): Promise; + /** + * Lightweight "compact" process to populate the checksum cache, if any. + */ + populatePersistentChecksumCache(options?: Pick): Promise; + // ## Read operations getCheckpoint(): Promise; @@ -108,6 +113,11 @@ export interface SyncRulesBucketStorage * Returns zero checksums for any buckets not found. */ getChecksums(checkpoint: util.InternalOpId, buckets: string[]): Promise; + + /** + * Clear checksum cache. Primarily intended for tests. + */ + clearChecksumCache(): void; } export interface SyncRulesBucketStorageListener { @@ -208,6 +218,8 @@ export interface CompactOptions { * Internal/testing use: Cache size for compacting parameters. */ compactParameterCacheLimit?: number; + + signal?: AbortSignal; } export interface ClearStorageOptions { diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index a7275fb84..85b9e2c75 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -6,11 +6,26 @@ import { BucketChecksum, ProtocolOpId, OplogEntry } from './protocol-types.js'; import * as storage from '../storage/storage-index.js'; -import { PartialChecksum } from '../storage/ChecksumCache.js'; import { ServiceAssertionError } from '@powersync/lib-services-framework'; export type ChecksumMap = Map; +/** + * A partial checksum can never be used on its own - must always be combined with a full BucketChecksum. + */ +export interface PartialChecksum { + bucket: string; + /** + * 32-bit unsigned hash. + */ + partialChecksum: number; + + /** + * Count of operations - informational only. + */ + partialCount: number; +} + /** * op_id as used internally, for individual operations and checkpoints. * @@ -83,20 +98,49 @@ export function addChecksums(a: number, b: number) { return (a + b) & 0xffffffff; } -export function addBucketChecksums(a: BucketChecksum, b: PartialChecksum | null): BucketChecksum { - if (b == null) { - return a; - } else if (b.isFullChecksum) { +export function isPartialChecksum(c: PartialChecksum | BucketChecksum): c is PartialChecksum { + return 'partialChecksum' in c; +} + +export function addBucketChecksums(a: BucketChecksum, b: PartialChecksum | BucketChecksum | null): BucketChecksum { + const checksum = addPartialChecksums(a.bucket, a, b); + if (isPartialChecksum(checksum)) { + // Should not happen since a != null + throw new ServiceAssertionError('Expected full checksum'); + } + return checksum; +} + +export function addPartialChecksums( + bucket: string, + a: BucketChecksum | null, + b: PartialChecksum | BucketChecksum | null +): PartialChecksum | BucketChecksum { + if (a != null && b != null) { + if (!isPartialChecksum(b)) { + // Replaces preState + return b; + } + // merge + return { + bucket, + checksum: addChecksums(a.checksum, b.partialChecksum), + count: a.count + b.partialCount + }; + } else if (a != null) { return { - bucket: b.bucket, - count: b.partialCount, - checksum: b.partialChecksum + bucket, + checksum: a.checksum, + count: a.count }; + } else if (b != null) { + return b; } else { + // No data found (may still have a previously-cached checksum). return { - bucket: a.bucket, - count: a.count + b.partialCount, - checksum: addChecksums(a.checksum, b.partialChecksum) + bucket, + partialChecksum: 0, + partialCount: 0 }; } } diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 649610869..f0b61342c 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -1,5 +1,5 @@ -import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum, PartialChecksum } from '@/storage/ChecksumCache.js'; -import { addChecksums, InternalOpId } from '@/util/util-index.js'; +import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum } from '@/storage/ChecksumCache.js'; +import { addChecksums, BucketChecksum, InternalOpId, PartialChecksum } from '@/util/util-index.js'; import * as crypto from 'node:crypto'; import { describe, expect, it } from 'vitest'; @@ -12,22 +12,20 @@ function testHash(bucket: string, checkpoint: InternalOpId) { return hash; } -function testPartialHash(request: FetchPartialBucketChecksum): PartialChecksum { +function testPartialHash(request: FetchPartialBucketChecksum): PartialChecksum | BucketChecksum { if (request.start) { const a = testHash(request.bucket, request.start); const b = testHash(request.bucket, request.end); return { bucket: request.bucket, partialCount: Number(request.end) - Number(request.start), - partialChecksum: addChecksums(b, -a), - isFullChecksum: false + partialChecksum: addChecksums(b, -a) }; } else { return { bucket: request.bucket, - partialChecksum: testHash(request.bucket, request.end), - partialCount: Number(request.end), - isFullChecksum: true + checksum: testHash(request.bucket, request.end), + count: Number(request.end) }; } }