diff --git a/.changeset/ninety-dancers-agree.md b/.changeset/ninety-dancers-agree.md new file mode 100644 index 000000000..103e02b7e --- /dev/null +++ b/.changeset/ninety-dancers-agree.md @@ -0,0 +1,12 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +'@powersync/lib-service-postgres': minor +--- + +Use bigint everywhere internally for OpId. diff --git a/.changeset/rotten-pianos-film.md b/.changeset/rotten-pianos-film.md new file mode 100644 index 000000000..eeb070fa7 --- /dev/null +++ b/.changeset/rotten-pianos-film.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mysql': patch +--- + +Fix race condition when stopping replication immediately after starting it. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 0a30859ce..04ed68ca4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -11,7 +11,7 @@ import { ReplicationAssertionError, ServiceError } from '@powersync/lib-services-framework'; -import { deserializeBson, SaveOperationTag, storage, utils } from '@powersync/service-core'; +import { deserializeBson, InternalOpId, SaveOperationTag, storage, utils } from '@powersync/service-core'; import * as timers from 'node:timers/promises'; import { PowerSyncMongo } from './db.js'; import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js'; @@ -39,7 +39,7 @@ export interface MongoBucketBatchOptions { groupId: number; slotName: string; lastCheckpointLsn: string | null; - keepaliveOp: string | null; + keepaliveOp: InternalOpId | null; noCheckpointBeforeLsn: string; storeCurrentData: boolean; /** @@ -77,12 +77,12 @@ export class MongoBucketBatch private no_checkpoint_before_lsn: string; - private persisted_op: bigint | null = null; + private persisted_op: InternalOpId | null = null; /** * For tests only - not for persistence logic. */ - public last_flushed_op: bigint | null = null; + public last_flushed_op: InternalOpId | null = null; constructor(options: MongoBucketBatchOptions) { super(); @@ -98,9 +98,7 @@ export class MongoBucketBatch this.skipExistingRows = options.skipExistingRows; this.batch = new OperationBatch(); - if (options.keepaliveOp) { - this.persisted_op = BigInt(options.keepaliveOp); - } + this.persisted_op = options.keepaliveOp ?? null; } addCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): void { @@ -135,7 +133,7 @@ export class MongoBucketBatch return null; } - let last_op: bigint | null = null; + let last_op: InternalOpId | null = null; let resumeBatch: OperationBatch | null = null; await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => { @@ -153,7 +151,7 @@ export class MongoBucketBatch this.persisted_op = last_op; this.last_flushed_op = last_op; - return { flushed_op: String(last_op) }; + return { flushed_op: last_op }; } private async replicateBatch( @@ -776,22 +774,23 @@ export class MongoBucketBatch async truncate(sourceTables: storage.SourceTable[]): Promise { await this.flush(); - let last_op: bigint | null = null; + let last_op: InternalOpId | null = null; for (let table of sourceTables) { last_op = await this.truncateSingle(table); } if (last_op) { this.persisted_op = last_op; + return { + flushed_op: last_op + }; + } else { + return null; } - - return { - flushed_op: String(last_op!) - }; } - async truncateSingle(sourceTable: storage.SourceTable): Promise { - let last_op: bigint | null = null; + async truncateSingle(sourceTable: storage.SourceTable): Promise { + let last_op: InternalOpId | null = null; // To avoid too large transactions, we limit the amount of data we delete per transaction. // Since we don't use the record data here, we don't have explicit size limits per batch. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 5e79d8df7..8d83a7998 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -1,6 +1,6 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework'; -import { storage, utils } from '@powersync/service-core'; +import { InternalOpId, storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey } from './models.js'; @@ -12,7 +12,7 @@ interface CurrentBucketState { /** * Rows seen in the bucket, with the last op_id of each. */ - seen: Map; + seen: Map; /** * Estimated memory usage of the seen Map. */ @@ -21,7 +21,7 @@ interface CurrentBucketState { /** * Last (lowest) seen op_id that is not a PUT. */ - lastNotPut: bigint | null; + lastNotPut: InternalOpId | null; /** * Number of REMOVE/MOVE operations seen since lastNotPut. @@ -274,7 +274,7 @@ export class MongoCompactor { * @param bucket bucket name * @param op op_id of the last non-PUT operation, which will be converted to CLEAR. */ - private async clearBucket(bucket: string, op: bigint) { + private async clearBucket(bucket: string, op: InternalOpId) { const opFilter = { _id: { $gte: { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 2a7873653..760703116 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -12,6 +12,9 @@ import { CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, GetCheckpointChangesOptions, + InternalOpId, + internalToExternalOpId, + ProtocolOpId, ReplicationCheckpoint, SourceTable, storage, @@ -119,7 +122,7 @@ export class MongoSyncBucketStorage } ); return { - checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n), + checkpoint: doc?.last_checkpoint ?? 0n, lsn: doc?.last_checkpoint_lsn ?? null }; } @@ -143,7 +146,7 @@ export class MongoSyncBucketStorage slotName: this.slot_name, lastCheckpointLsn: checkpoint_lsn, noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN, - keepaliveOp: doc?.keepalive_op ?? null, + keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null, storeCurrentData: options.storeCurrentData, skipExistingRows: options.skipExistingRows ?? false }); @@ -152,7 +155,7 @@ export class MongoSyncBucketStorage await callback(batch); await batch.flush(); if (batch.last_flushed_op) { - return { flushed_op: String(batch.last_flushed_op) }; + return { flushed_op: batch.last_flushed_op }; } else { return null; } @@ -249,7 +252,7 @@ export class MongoSyncBucketStorage return result!; } - async getParameterSets(checkpoint: utils.OpId, lookups: SqliteJsonValue[][]): Promise { + async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise { const lookupFilter = lookups.map((lookup) => { return storage.serializeLookup(lookup); }); @@ -259,7 +262,7 @@ export class MongoSyncBucketStorage $match: { 'key.g': this.group_id, lookup: { $in: lookupFilter }, - _id: { $lte: BigInt(checkpoint) } + _id: { $lte: checkpoint } } }, { @@ -284,8 +287,8 @@ export class MongoSyncBucketStorage } async *getBucketDataBatch( - checkpoint: utils.OpId, - dataBuckets: Map, + checkpoint: utils.InternalOpId, + dataBuckets: Map, options?: storage.BucketDataBatchOptions ): AsyncIterable { if (dataBuckets.size == 0) { @@ -293,14 +296,17 @@ export class MongoSyncBucketStorage } let filters: mongo.Filter[] = []; - const end = checkpoint ? BigInt(checkpoint) : new bson.MaxKey(); + if (checkpoint == null) { + throw new ServiceAssertionError('checkpoint is null'); + } + const end = checkpoint; for (let [name, start] of dataBuckets.entries()) { filters.push({ _id: { $gt: { g: this.group_id, b: name, - o: BigInt(start) + o: start }, $lte: { g: this.group_id, @@ -347,7 +353,7 @@ export class MongoSyncBucketStorage let batchSize = 0; let currentBatch: utils.SyncBucketData | null = null; - let targetOp: bigint | null = null; + let targetOp: InternalOpId | null = null; // Ordered by _id, meaning buckets are grouped together for (let rawData of data) { @@ -355,7 +361,7 @@ export class MongoSyncBucketStorage const bucket = row._id.b; if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) { - let start: string | undefined = undefined; + let start: ProtocolOpId | undefined = undefined; if (currentBatch != null) { if (currentBatch.bucket == bucket) { currentBatch.has_more = true; @@ -369,9 +375,12 @@ export class MongoSyncBucketStorage targetOp = null; } - start ??= dataBuckets.get(bucket); if (start == null) { - throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`); + const startOpId = dataBuckets.get(bucket); + if (startOpId == null) { + throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`); + } + start = internalToExternalOpId(startOpId); } currentBatch = { bucket, @@ -406,7 +415,7 @@ export class MongoSyncBucketStorage } } - async getChecksums(checkpoint: utils.OpId, buckets: string[]): Promise { + async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise { return this.checksumCache.getChecksumMap(checkpoint, buckets); } @@ -638,7 +647,7 @@ export class MongoSyncBucketStorage private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) { return { - checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n), + checkpoint: doc?.last_checkpoint ?? 0n, lsn: doc?.last_checkpoint_lsn ?? null }; } @@ -755,7 +764,7 @@ export class MongoSyncBucketStorage */ async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable { const { user_id, signal } = options; - let lastCheckpoint: utils.OpId | null = null; + let lastCheckpoint: utils.InternalOpId | null = null; let lastWriteCheckpoint: bigint | null = null; const iter = wrapWithAbort(this.sharedIter, signal); diff --git a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts index 29441fe61..e1f128eec 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts @@ -4,7 +4,7 @@ import { EvaluatedParameters, EvaluatedRow } from '@powersync/service-sync-rules import * as bson from 'bson'; import { logger } from '@powersync/lib-services-framework'; -import { storage, utils } from '@powersync/service-core'; +import { InternalOpId, storage, utils } from '@powersync/service-core'; import { currentBucketKey } from './MongoBucketBatch.js'; import { MongoIdSequence } from './MongoIdSequence.js'; import { PowerSyncMongo } from './db.js'; @@ -52,7 +52,7 @@ export class PersistedBatch { /** * For debug logging only. */ - debugLastOpId: bigint | null = null; + debugLastOpId: InternalOpId | null = null; /** * Very rough estimate of transaction size. diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/storage/implementation/util.ts index 7722ad253..ea6b587f3 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/util.ts @@ -74,7 +74,7 @@ export async function readSingleBatch(cursor: mongo.FindCursor): Promise<{ export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry { if (row.op == 'PUT' || row.op == 'REMOVE') { return { - op_id: utils.timestampToOpId(row._id.o), + op_id: utils.internalToExternalOpId(row._id.o), op: row.op, object_type: row.table, object_id: row.row_id, @@ -86,7 +86,7 @@ export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry { // MOVE, CLEAR return { - op_id: utils.timestampToOpId(row._id.o), + op_id: utils.internalToExternalOpId(row._id.o), op: row.op, checksum: Number(row.checksum) }; diff --git a/modules/module-mongodb-storage/test/src/storage_sync.test.ts b/modules/module-mongodb-storage/test/src/storage_sync.test.ts index 1352586ae..a6a2056e5 100644 --- a/modules/module-mongodb-storage/test/src/storage_sync.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_sync.test.ts @@ -74,7 +74,7 @@ describe('sync - mongodb', () => { const options: storage.BucketDataBatchOptions = {}; const batch1 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), options) ); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 }, @@ -87,7 +87,7 @@ describe('sync - mongodb', () => { }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch1[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -99,7 +99,7 @@ describe('sync - mongodb', () => { }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch2[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index a2cc07187..f4e0d36fe 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -1,5 +1,11 @@ import { mongo } from '@powersync/lib-service-mongodb'; -import { BucketStorageFactory, OpId, ReplicationCheckpoint, SyncRulesBucketStorage } from '@powersync/service-core'; +import { + BucketStorageFactory, + InternalOpId, + ProtocolOpId, + ReplicationCheckpoint, + SyncRulesBucketStorage +} from '@powersync/service-core'; import { test_utils } from '@powersync/service-core-tests'; import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; @@ -93,28 +99,31 @@ export class ChangeStreamTestContext { getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }), this.streamPromise ]); - if (typeof checkpoint == 'undefined') { + if (checkpoint == null) { // This indicates an issue with the test setup - streamingPromise completed instead // of getClientCheckpoint() throw new Error('Test failure - streamingPromise completed'); } - return checkpoint as string; + return checkpoint; } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const map = new Map(Object.entries(buckets)); + const map = new Map(Object.entries(buckets)); return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } async getBucketData( bucket: string, - start?: string, + start?: ProtocolOpId | InternalOpId | undefined, options?: { timeout?: number; limit?: number; chunkLimitBytes?: number } ) { - start ??= '0'; + start ??= 0n; + if (typeof start == 'string') { + start = BigInt(start); + } let checkpoint = await this.getCheckpoint(options); - const map = new Map([[bucket, start]]); + const map = new Map([[bucket, start]]); const batch = this.storage!.getBucketDataBatch(checkpoint, map, { limit: options?.limit, chunkLimitBytes: options?.chunkLimitBytes @@ -140,7 +149,7 @@ export async function getClientCheckpoint( db: mongo.Db, storageFactory: BucketStorageFactory, options?: { timeout?: number } -): Promise { +): Promise { const start = Date.now(); const lsn = await createCheckpoint(client, db); // This old API needs a persisted checkpoint id. diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 408ae4c43..d2480857f 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -487,11 +487,6 @@ AND table_type = 'BASE TABLE';`, } }); - if (this.stopped) { - // Powersync is shutting down, don't start replicating - return; - } - // Set a heartbeat interval for the Zongji replication connection // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. @@ -517,6 +512,11 @@ AND table_type = 'BASE TABLE';`, socket.destroy(new Error('Replication connection timeout.')); }); + if (this.stopped) { + // Powersync is shutting down, don't start replicating + return; + } + logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); @@ -551,16 +551,19 @@ AND table_type = 'BASE TABLE';`, reject(error); }); - this.abortSignal.addEventListener( - 'abort', - () => { - logger.info('Abort signal received, stopping replication...'); - zongji.stop(); - queue.kill(); - resolve(); - }, - { once: true } - ); + const stop = () => { + logger.info('Abort signal received, stopping replication...'); + zongji.stop(); + queue.kill(); + resolve(); + }; + + this.abortSignal.addEventListener('abort', stop, { once: true }); + + if (this.stopped) { + // Generally this should have been picked up early, but we add this here as a failsafe. + stop(); + } }); } ); diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 2ed560980..6bde81601 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -173,6 +173,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`); await context.replicateSnapshot(); + context.startStreaming(); const data = await context.getBucketData('global[]'); expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]); @@ -198,6 +199,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { `); await context.replicateSnapshot(); + context.startStreaming(); const data = await context.getBucketData('global[]'); expect(data).toMatchObject([ diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 36da91f06..fcffea264 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -4,8 +4,9 @@ import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManag import { logger } from '@powersync/lib-services-framework'; import { BucketStorageFactory, - OpId, + InternalOpId, OplogEntry, + ProtocolOpId, ReplicationCheckpoint, storage, SyncRulesBucketStorage @@ -116,30 +117,38 @@ export class BinlogStreamTestContext { this.streamPromise = this.binlogStream.streamChanges(); } - async getCheckpoint(options?: { timeout?: number }): Promise { + async getCheckpoint(options?: { timeout?: number }): Promise { const connection = await this.connectionManager.getConnection(); let checkpoint = await Promise.race([ getClientCheckpoint(connection, this.factory, { timeout: options?.timeout ?? 60_000 }), this.streamPromise ]); connection.release(); - if (typeof checkpoint == undefined) { + if (checkpoint == null) { // This indicates an issue with the test setup - streamingPromise completed instead // of getClientCheckpoint() - throw new Error('Test failure - streamingPromise completed'); + throw new Error('Test failure - streamingPromise completed. Was startStreaming() called?'); } - return checkpoint as string; + return checkpoint; } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { const checkpoint = await this.getCheckpoint(options); - const map = new Map(Object.entries(buckets)); + const map = new Map(Object.entries(buckets)); return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } - async getBucketData(bucket: string, start = '0', options?: { timeout?: number }): Promise { + async getBucketData( + bucket: string, + start?: ProtocolOpId | InternalOpId | undefined, + options?: { timeout?: number } + ): Promise { + start = start ?? 0n; + if (typeof start == 'string') { + start = BigInt(start); + } const checkpoint = await this.getCheckpoint(options); - const map = new Map([[bucket, start]]); + const map = new Map([[bucket, start]]); const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); return batches[0]?.batch.data ?? []; @@ -150,7 +159,7 @@ export async function getClientCheckpoint( connection: mysqlPromise.Connection, storageFactory: BucketStorageFactory, options?: { timeout?: number } -): Promise { +): Promise { const start = Date.now(); const gtid = await readExecutedGtid(connection); // This old API needs a persisted checkpoint id. diff --git a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts index 67173c9f5..b19d866eb 100644 --- a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts +++ b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts @@ -1,6 +1,6 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework'; -import { storage, utils } from '@powersync/service-core'; +import { InternalOpId, storage, utils } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as t from 'ts-codec'; import { BIGINT_MAX } from '../types/codecs.js'; @@ -15,7 +15,7 @@ interface CurrentBucketState { /** * Rows seen in the bucket, with the last op_id of each. */ - seen: Map; + seen: Map; /** * Estimated memory usage of the seen Map. */ @@ -24,7 +24,7 @@ interface CurrentBucketState { /** * Last (lowest) seen op_id that is not a PUT. */ - lastNotPut: bigint | null; + lastNotPut: InternalOpId | null; /** * Number of REMOVE/MOVE operations seen since lastNotPut. @@ -51,7 +51,7 @@ export class PostgresCompactor { private moveBatchLimit: number; private moveBatchQueryLimit: number; private clearBatchLimit: number; - private maxOpId: bigint | undefined; + private maxOpId: InternalOpId | undefined; private buckets: string[] | undefined; constructor( @@ -264,7 +264,7 @@ export class PostgresCompactor { * @param bucket bucket name * @param op op_id of the last non-PUT operation, which will be converted to CLEAR. */ - private async clearBucket(bucket: string, op: bigint) { + private async clearBucket(bucket: string, op: InternalOpId) { /** * This entire method could be implemented as a Postgres function, but this might make debugging * a bit more challenging. @@ -280,8 +280,8 @@ export class PostgresCompactor { try { let checksum = 0; - let lastOpId: bigint | null = null; - let targetOp: bigint | null = null; + let lastOpId: InternalOpId | null = null; + let targetOp: InternalOpId | null = null; let gotAnOp = false; const codec = pick(models.BucketData, ['op', 'source_table', 'source_key', 'op_id', 'checksum', 'target_op']); diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 0a18c17bb..d4e94166a 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -5,6 +5,8 @@ import { CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, GetCheckpointChangesOptions, + InternalOpId, + internalToExternalOpId, LastValueSink, storage, utils, @@ -149,7 +151,7 @@ export class PostgresSyncRulesStorage .first(); return { - checkpoint: utils.timestampToOpId(checkpointRow?.last_checkpoint ?? 0n), + checkpoint: checkpointRow?.last_checkpoint ?? 0n, lsn: checkpointRow?.last_checkpoint_lsn ?? null }; } @@ -344,14 +346,14 @@ export class PostgresSyncRulesStorage await callback(batch); await batch.flush(); if (batch.last_flushed_op) { - return { flushed_op: String(batch.last_flushed_op) }; + return { flushed_op: batch.last_flushed_op }; } else { return null; } } async getParameterSets( - checkpoint: utils.OpId, + checkpoint: utils.InternalOpId, lookups: sync_rules.SqliteJsonValue[][] ): Promise { const rows = await this.db.sql` @@ -374,7 +376,7 @@ export class PostgresSyncRulesStorage value: lookups.map((l) => storage.serializeLookupBuffer(l).toString('hex')) }}) AS FILTER ) - AND id <= ${{ type: 'int8', value: BigInt(checkpoint) }} + AND id <= ${{ type: 'int8', value: checkpoint }} ORDER BY lookup, source_table, @@ -391,8 +393,8 @@ export class PostgresSyncRulesStorage } async *getBucketDataBatch( - checkpoint: utils.OpId, - dataBuckets: Map, + checkpoint: InternalOpId, + dataBuckets: Map, options?: storage.BucketDataBatchOptions ): AsyncIterable { if (dataBuckets.size == 0) { @@ -410,7 +412,7 @@ export class PostgresSyncRulesStorage let batchSize = 0; let currentBatch: utils.SyncBucketData | null = null; - let targetOp: bigint | null = null; + let targetOp: InternalOpId | null = null; let rowCount = 0; /** @@ -503,9 +505,12 @@ export class PostgresSyncRulesStorage } } - start ??= dataBuckets.get(bucket_name); if (start == null) { - throw new ReplicationAssertionError(`data for unexpected bucket: ${bucket_name}`); + const startOpId = dataBuckets.get(bucket_name); + if (startOpId == null) { + throw new framework.ServiceAssertionError(`data for unexpected bucket: ${bucket_name}`); + } + start = internalToExternalOpId(startOpId); } currentBatch = { bucket: bucket_name, @@ -549,7 +554,7 @@ export class PostgresSyncRulesStorage } } - async getChecksums(checkpoint: utils.OpId, buckets: string[]): Promise { + async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise { return this.checksumCache.getChecksumMap(checkpoint, buckets); } @@ -672,8 +677,9 @@ export class PostgresSyncRulesStorage } const rangedBatch = batch.map((b) => ({ - ...b, - start: b.start ?? 0 + bucket: b.bucket, + start: String(b.start ?? 0n), + end: String(b.end) })); const results = await this.db.sql` @@ -745,7 +751,7 @@ export class PostgresSyncRulesStorage } async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable { - let lastCheckpoint: utils.OpId | null = null; + let lastCheckpoint: utils.InternalOpId | null = null; let lastWriteCheckpoint: bigint | null = null; const { signal, user_id } = options; @@ -852,7 +858,7 @@ export class PostgresSyncRulesStorage private makeActiveCheckpoint(row: models.ActiveCheckpointDecoded | null) { return { - checkpoint: utils.timestampToOpId(row?.last_checkpoint ?? 0n), + checkpoint: row?.last_checkpoint ?? 0n, lsn: row?.last_checkpoint_lsn ?? null } satisfies storage.ReplicationCheckpoint; } diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index edbc5c76a..1bb964719 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -9,7 +9,7 @@ import { ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; -import { storage, utils } from '@powersync/service-core'; +import { InternalOpId, storage, utils } from '@powersync/service-core'; import * as sync_rules from '@powersync/service-sync-rules'; import * as timers from 'timers/promises'; import * as t from 'ts-codec'; @@ -29,7 +29,7 @@ export interface PostgresBucketBatchOptions { last_checkpoint_lsn: string | null; no_checkpoint_before_lsn: string; store_current_data: boolean; - keep_alive_op?: bigint | null; + keep_alive_op?: InternalOpId | null; /** * Set to true for initial replication. */ @@ -54,14 +54,14 @@ export class PostgresBucketBatch extends BaseObserver implements storage.BucketStorageBatch { - public last_flushed_op: bigint | null = null; + public last_flushed_op: InternalOpId | null = null; protected db: lib_postgres.DatabaseClient; protected group_id: number; protected last_checkpoint_lsn: string | null; protected no_checkpoint_before_lsn: string; - protected persisted_op: bigint | null; + protected persisted_op: InternalOpId | null; protected write_checkpoint_batch: storage.CustomWriteCheckpointOptions[]; protected readonly sync_rules: sync_rules.SqlSyncRules; @@ -132,18 +132,19 @@ export class PostgresBucketBatch async truncate(sourceTables: storage.SourceTable[]): Promise { await this.flush(); - let last_op: bigint | null = null; + let last_op: InternalOpId | null = null; for (let table of sourceTables) { last_op = await this.truncateSingle(table); } if (last_op) { this.persisted_op = last_op; + return { + flushed_op: last_op + }; + } else { + return null; } - - return { - flushed_op: String(last_op!) - }; } protected async truncateSingle(sourceTable: storage.SourceTable) { @@ -279,7 +280,7 @@ export class PostgresBucketBatch this.persisted_op = lastOp; this.last_flushed_op = lastOp; - return { flushed_op: String(lastOp) }; + return { flushed_op: lastOp }; } async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise { diff --git a/modules/module-postgres-storage/src/utils/bucket-data.ts b/modules/module-postgres-storage/src/utils/bucket-data.ts index e4b7f504d..4d3e66dff 100644 --- a/modules/module-postgres-storage/src/utils/bucket-data.ts +++ b/modules/module-postgres-storage/src/utils/bucket-data.ts @@ -5,7 +5,7 @@ import { replicaIdToSubkey } from './bson.js'; export const mapOpEntry = (entry: models.BucketDataDecoded) => { if (entry.op == models.OpType.PUT || entry.op == models.OpType.REMOVE) { return { - op_id: utils.timestampToOpId(entry.op_id), + op_id: utils.internalToExternalOpId(entry.op_id), op: entry.op, object_type: entry.table_name ?? undefined, object_id: entry.row_id ?? undefined, @@ -17,7 +17,7 @@ export const mapOpEntry = (entry: models.BucketDataDecoded) => { // MOVE, CLEAR return { - op_id: utils.timestampToOpId(entry.op_id), + op_id: utils.internalToExternalOpId(entry.op_id), op: entry.op, checksum: Number(entry.checksum) }; diff --git a/modules/module-postgres-storage/test/src/storage.test.ts b/modules/module-postgres-storage/test/src/storage.test.ts index 2aa5a2605..42a2a935c 100644 --- a/modules/module-postgres-storage/test/src/storage.test.ts +++ b/modules/module-postgres-storage/test/src/storage.test.ts @@ -81,7 +81,7 @@ describe('Postgres Sync Bucket Storage', () => { const options: storage.BucketDataBatchOptions = {}; const batch1 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), options) ); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } @@ -93,7 +93,7 @@ describe('Postgres Sync Bucket Storage', () => { }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch1[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 } @@ -105,7 +105,7 @@ describe('Postgres Sync Bucket Storage', () => { }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch2[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -117,7 +117,7 @@ describe('Postgres Sync Bucket Storage', () => { }); const batch4 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch3[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch3[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch4)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-postgres/test/src/slow_tests.test.ts b/modules/module-postgres/test/src/slow_tests.test.ts index fae84be08..c6b83393c 100644 --- a/modules/module-postgres/test/src/slow_tests.test.ts +++ b/modules/module-postgres/test/src/slow_tests.test.ts @@ -178,7 +178,7 @@ bucket_definitions: break; } - const checkpoint = BigInt((await storage.getCheckpoint()).checkpoint); + const checkpoint = (await storage.getCheckpoint()).checkpoint; if (f instanceof mongo_storage.storage.MongoBucketStorage) { const opsBefore = (await f.db.bucket_data.find().sort({ _id: 1 }).toArray()) .filter((row) => row._id.o <= checkpoint) @@ -403,7 +403,7 @@ bucket_definitions: getClientCheckpoint(pool, storage.factory, { timeout: TIMEOUT_MARGIN_MS }), streamPromise ]); - if (typeof checkpoint == undefined) { + if (checkpoint == null) { // This indicates an issue with the test setup - streamingPromise completed instead // of getClientCheckpoint() throw new Error('Test failure - streamingPromise completed'); diff --git a/modules/module-postgres/test/src/util.ts b/modules/module-postgres/test/src/util.ts index 0a8c76696..4135f4b78 100644 --- a/modules/module-postgres/test/src/util.ts +++ b/modules/module-postgres/test/src/util.ts @@ -2,7 +2,7 @@ import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js' import * as types from '@module/types/types.js'; import * as lib_postgres from '@powersync/lib-service-postgres'; import { logger } from '@powersync/lib-services-framework'; -import { BucketStorageFactory, OpId } from '@powersync/service-core'; +import { BucketStorageFactory, InternalOpId, TestStorageOptions } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as mongo_storage from '@powersync/service-module-mongodb-storage'; import * as postgres_storage from '@powersync/service-module-postgres-storage'; @@ -64,7 +64,7 @@ export async function getClientCheckpoint( db: pgwire.PgClient, storageFactory: BucketStorageFactory, options?: { timeout?: number } -): Promise { +): Promise { const start = Date.now(); const api = new PostgresRouteAPIAdapter(db); diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 459e0cee0..25b8dd1d2 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -1,6 +1,12 @@ import { PgManager } from '@module/replication/PgManager.js'; import { PUBLICATION_NAME, WalStream, WalStreamOptions } from '@module/replication/WalStream.js'; -import { BucketStorageFactory, OplogEntry, storage, SyncRulesBucketStorage } from '@powersync/service-core'; +import { + BucketStorageFactory, + InternalOpId, + OplogEntry, + storage, + SyncRulesBucketStorage +} from '@powersync/service-core'; import { test_utils } from '@powersync/service-core-tests'; import * as pgwire from '@powersync/service-jpgwire'; import { clearTestDb, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js'; @@ -120,27 +126,30 @@ export class WalStreamTestContext implements AsyncDisposable { getClientCheckpoint(this.pool, this.factory, { timeout: options?.timeout ?? 15_000 }), this.streamPromise ]); - if (typeof checkpoint == undefined) { + if (checkpoint == null) { // This indicates an issue with the test setup - streamingPromise completed instead // of getClientCheckpoint() throw new Error('Test failure - streamingPromise completed'); } - return checkpoint as string; + return checkpoint; } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const map = new Map(Object.entries(buckets)); + const map = new Map(Object.entries(buckets)); return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } /** * This waits for a client checkpoint. */ - async getBucketData(bucket: string, start?: string, options?: { timeout?: number }) { - start ??= '0'; + async getBucketData(bucket: string, start?: InternalOpId | string | undefined, options?: { timeout?: number }) { + start ??= 0n; + if (typeof start == 'string') { + start = BigInt(start); + } const checkpoint = await this.getCheckpoint(options); - const map = new Map([[bucket, start]]); + const map = new Map([[bucket, start]]); let data: OplogEntry[] = []; while (true) { const batch = this.storage!.getBucketDataBatch(checkpoint, map); @@ -150,7 +159,7 @@ export class WalStreamTestContext implements AsyncDisposable { if (batches.length == 0 || !batches[0]!.batch.has_more) { break; } - map.set(bucket, batches[0]!.batch.next_after); + map.set(bucket, BigInt(batches[0]!.batch.next_after)); } return data; } @@ -158,10 +167,13 @@ export class WalStreamTestContext implements AsyncDisposable { /** * This does not wait for a client checkpoint. */ - async getCurrentBucketData(bucket: string, start?: string) { - start ??= '0'; + async getCurrentBucketData(bucket: string, start?: InternalOpId | string | undefined) { + start ??= 0n; + if (typeof start == 'string') { + start = BigInt(start); + } const { checkpoint } = await this.storage!.getCheckpoint(); - const map = new Map([[bucket, start]]); + const map = new Map([[bucket, start]]); const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); return batches[0]?.batch.data ?? []; diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index fe5872c52..e5abe124b 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -58,7 +58,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; const batchBefore = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); const dataBefore = batchBefore.batch.data; const checksumBefore = await bucketStorage.getChecksums(checkpoint, ['global[]']); @@ -91,7 +91,7 @@ bucket_definitions: }); const batchAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); const dataAfter = batchAfter.batch.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); @@ -173,7 +173,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; const batchBefore = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); const dataBefore = batchBefore.batch.data; const checksumBefore = await bucketStorage.getChecksums(checkpoint, ['global[]']); @@ -212,7 +212,7 @@ bucket_definitions: }); const batchAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); const dataAfter = batchAfter.batch.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); @@ -297,7 +297,7 @@ bucket_definitions: }); const batchAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', '0']])) + bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', 0n]])) ); const dataAfter = batchAfter.batch.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint2, ['global[]']); @@ -409,8 +409,8 @@ bucket_definitions: bucketStorage.getBucketDataBatch( checkpoint, new Map([ - ['grouped["b1"]', '0'], - ['grouped["b2"]', '0'] + ['grouped["b1"]', 0n], + ['grouped["b2"]', 0n] ]) ) ); diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 53743f1fa..a88551e56 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -182,7 +182,7 @@ bucket_definitions: // We specifically request the todo_ids for both lists. // There removal operation for the association of `list2`::`todo2` should not interfere with the new // association of `list1`::`todo2` - const parameters = await bucketStorage.getParameterSets(BigInt(result2!.flushed_op).toString(), [ + const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [ ['mybucket', '1', 'list1'], ['mybucket', '1', 'list2'] ]); @@ -323,9 +323,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; - const batch = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) - ); + const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].batch.data.map((d) => { return { op: d.op, @@ -622,9 +620,7 @@ bucket_definitions: }); }); const checkpoint = result!.flushed_op; - const batch = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) - ); + const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].batch.data.map((d) => { return { op: d.op, @@ -688,9 +684,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; - const batch = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) - ); + const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].batch.data.map((d) => { return { op: d.op, @@ -805,9 +799,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; - const batch = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) - ); + const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].batch.data.map((d) => { return { @@ -918,7 +910,7 @@ bucket_definitions: }); }); - const checkpoint1 = result1?.flushed_op ?? '0'; + const checkpoint1 = result1?.flushed_op ?? 0n; // Test batch const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { @@ -1066,7 +1058,7 @@ bucket_definitions: }); }); - const checkpoint1 = result1?.flushed_op ?? '0'; + const checkpoint1 = result1?.flushed_op ?? 0n; const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Unchanged, but has a before id @@ -1174,7 +1166,7 @@ bucket_definitions: }); }); - const checkpoint1 = result1?.flushed_op ?? '0'; + const checkpoint1 = result1?.flushed_op ?? 0n; const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Unchanged, but has a before id @@ -1311,7 +1303,7 @@ bucket_definitions: }; const batch1 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), options) ); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 }, @@ -1324,7 +1316,7 @@ bucket_definitions: }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch1[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1795508474 }, @@ -1337,7 +1329,7 @@ bucket_definitions: }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch2[0].batch.next_after]]), options) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options) ); expect(test_utils.getBatchData(batch3)).toEqual([]); expect(test_utils.getBatchMeta(batch3)).toEqual(null); @@ -1375,7 +1367,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; const batch1 = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]), { limit: 4 }) + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), { limit: 4 }) ); expect(test_utils.getBatchData(batch1)).toEqual([ @@ -1392,7 +1384,7 @@ bucket_definitions: }); const batch2 = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch1.batch.next_after]]), { + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1.batch.next_after)]]), { limit: 4 }) ); @@ -1408,7 +1400,7 @@ bucket_definitions: }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch2.batch.next_after]]), { + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2.batch.next_after)]]), { limit: 4 }) ); diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index b43755ebf..e8dfc1c5f 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -4,6 +4,7 @@ import { BSON } from 'bson'; import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { SourceTable } from './SourceTable.js'; import { BatchedCustomWriteCheckpointOptions } from './storage-index.js'; +import { InternalOpId } from '../util/utils.js'; export const DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS: ResolvedBucketBatchCommitOptions = { createEmptyCheckpoints: true @@ -144,7 +145,7 @@ export interface BucketBatchStorageListener { } export interface FlushedResult { - flushed_op: string; + flushed_op: InternalOpId; } export interface BucketBatchCommitOptions { diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 671c33706..350d10039 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -1,12 +1,11 @@ -import { BucketChecksum, OpId } from '../util/protocol-types.js'; -import { ChecksumMap, addBucketChecksums } from '../util/utils.js'; -import { LRUCache } from 'lru-cache/min'; import { OrderedSet } from '@js-sdsl/ordered-set'; -import { BucketPriority } from '@powersync/service-sync-rules'; +import { LRUCache } from 'lru-cache/min'; +import { BucketChecksum } from '../util/protocol-types.js'; +import { addBucketChecksums, ChecksumMap, InternalOpId } from '../util/utils.js'; interface ChecksumFetchContext { fetch(bucket: string): Promise; - checkpoint: bigint; + checkpoint: InternalOpId; } export interface PartialChecksum { @@ -28,10 +27,11 @@ export interface PartialChecksum { */ isFullChecksum: boolean; } + export interface FetchPartialBucketChecksum { bucket: string; - start?: OpId; - end: OpId; + start?: InternalOpId; + end: InternalOpId; } export type PartialChecksumMap = Map; @@ -101,8 +101,7 @@ export class ChecksumCache { dispose: (value, key) => { // Remove from the set of cached checkpoints for the bucket - const { checkpointString } = parseCacheKey(key); - const checkpoint = BigInt(checkpointString); + const { checkpoint } = parseCacheKey(key); const checkpointSet = this.bucketCheckpoints.get(value.bucket); if (checkpointSet == null) { return; @@ -128,7 +127,7 @@ export class ChecksumCache { }); } - async getChecksums(checkpoint: OpId, buckets: string[]): Promise { + async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { const checksums = await this.getChecksumMap(checkpoint, buckets); // Return results in the same order as the request return buckets.map((bucket) => checksums.get(bucket)!); @@ -141,7 +140,7 @@ export class ChecksumCache { * * @returns a Map with exactly one entry for each bucket requested */ - async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise { + async getChecksumMap(checkpoint: InternalOpId, buckets: string[]): Promise { // Buckets that don't have a cached checksum for this checkpoint yet let toFetch = new Set(); @@ -235,7 +234,7 @@ export class ChecksumCache { // Partial checksum found - make a partial checksum request bucketRequest = { bucket, - start: cp.toString(), + start: cp, end: checkpoint }; add.set(bucket, cached); @@ -315,11 +314,11 @@ export class ChecksumCache { } } -function makeCacheKey(checkpoint: bigint | string, bucket: string) { +function makeCacheKey(checkpoint: InternalOpId | string, bucket: string) { return `${checkpoint}/${bucket}`; } function parseCacheKey(key: string) { const index = key.indexOf('/'); - return { checkpointString: key.substring(0, index), bucket: key.substring(index + 1) }; + return { checkpoint: BigInt(key.substring(0, index)), bucket: key.substring(index + 1) }; } diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index f80b511ad..bf350e7d7 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -71,7 +71,7 @@ export interface SyncRulesBucketStorage /** * Used to resolve "dynamic" parameter queries. */ - getParameterSets(checkpoint: util.OpId, lookups: SqliteJsonValue[][]): Promise; + getParameterSets(checkpoint: util.InternalOpId, lookups: SqliteJsonValue[][]): Promise; getCheckpointChanges(options: GetCheckpointChangesOptions): Promise; @@ -94,8 +94,8 @@ export interface SyncRulesBucketStorage * @param options batch size options */ getBucketDataBatch( - checkpoint: util.OpId, - dataBuckets: Map, + checkpoint: util.InternalOpId, + dataBuckets: Map, options?: BucketDataBatchOptions ): AsyncIterable; @@ -104,7 +104,7 @@ export interface SyncRulesBucketStorage * * Returns zero checksums for any buckets not found. */ - getChecksums(checkpoint: util.OpId, buckets: string[]): Promise; + getChecksums(checkpoint: util.InternalOpId, buckets: string[]): Promise; } export interface SyncRulesBucketStorageListener { @@ -169,7 +169,7 @@ export interface CompactOptions { * This can also be used to create a "safe buffer" of recent operations that should * not be compacted, to avoid invalidating checkpoints in use. */ - maxOpId?: bigint; + maxOpId?: util.InternalOpId; /** * If specified, compact only the specific buckets. @@ -215,11 +215,11 @@ export interface BucketDataBatchOptions { export interface SyncBucketDataBatch { batch: util.SyncBucketData; - targetOp: bigint | null; + targetOp: util.InternalOpId | null; } export interface ReplicationCheckpoint { - readonly checkpoint: util.OpId; + readonly checkpoint: util.InternalOpId; readonly lsn: string | null; } @@ -238,7 +238,7 @@ export interface WatchFilterEvent { export interface WriteCheckpoint { base: ReplicationCheckpoint; - writeCheckpoint: bigint | null; + writeCheckpoint: util.InternalOpId | null; } export interface StorageCheckpointUpdate extends WriteCheckpoint { @@ -246,8 +246,8 @@ export interface StorageCheckpointUpdate extends WriteCheckpoint { } export interface GetCheckpointChangesOptions { - lastCheckpoint: util.OpId; - nextCheckpoint: util.OpId; + lastCheckpoint: util.InternalOpId; + nextCheckpoint: util.InternalOpId; } export interface CheckpointChanges { diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index 7115e9dd2..88e1cba92 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -13,7 +13,7 @@ export interface BucketChecksumStateOptions { bucketStorage: BucketChecksumStateStorage; syncRules: SqlSyncRules; syncParams: RequestParameters; - initialBucketPositions?: { name: string; after: string }[]; + initialBucketPositions?: { name: string; after: util.InternalOpId }[]; } /** @@ -78,7 +78,7 @@ export class BucketChecksumState { for (let bucket of allBuckets) { dataBucketsNew.set(bucket.bucket, { description: bucket, - start_op_id: this.bucketDataPositions.get(bucket.bucket)?.start_op_id ?? '0' + start_op_id: this.bucketDataPositions.get(bucket.bucket)?.start_op_id ?? 0n }); } this.bucketDataPositions = dataBucketsNew; @@ -180,7 +180,7 @@ export class BucketChecksumState { checkpointLine = { checkpoint_diff: { - last_op_id: base.checkpoint, + last_op_id: util.internalToExternalOpId(base.checkpoint), write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, removed_buckets: diff.removedBuckets, updated_buckets: updatedBucketDescriptions @@ -193,7 +193,7 @@ export class BucketChecksumState { bucketsToFetch = allBuckets; checkpointLine = { checkpoint: { - last_op_id: base.checkpoint, + last_op_id: util.internalToExternalOpId(base.checkpoint), write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, buckets: [...checksumMap.values()].map((e) => ({ ...e, @@ -219,8 +219,8 @@ export class BucketChecksumState { * @param bucketsToFetch List of buckets to fetch, typically from buildNextCheckpointLine, or a subset of that * @returns */ - getFilteredBucketPositions(bucketsToFetch: BucketDescription[]): Map { - const filtered = new Map(); + getFilteredBucketPositions(bucketsToFetch: BucketDescription[]): Map { + const filtered = new Map(); for (let bucket of bucketsToFetch) { const state = this.bucketDataPositions.get(bucket.bucket); if (state) { @@ -236,7 +236,7 @@ export class BucketChecksumState { * @param bucket the bucket name * @param nextAfter sync operations >= this value in the next batch */ - updateBucketPosition(options: { bucket: string; nextAfter: string; hasMore: boolean }) { + updateBucketPosition(options: { bucket: string; nextAfter: util.InternalOpId; hasMore: boolean }) { const state = this.bucketDataPositions.get(options.bucket); if (state) { state.start_op_id = options.nextAfter; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 7b3aef7f0..64e55a41b 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -79,7 +79,7 @@ export async function* streamResponse( export type BucketSyncState = { description?: BucketDescription; // Undefined if the bucket has not yet been resolved by us. - start_op_id: string; + start_op_id: util.InternalOpId; }; async function* streamResponseInner( @@ -100,7 +100,10 @@ async function* streamResponseInner( bucketStorage, syncRules, syncParams, - initialBucketPositions: params.buckets + initialBucketPositions: params.buckets?.map((bucket) => ({ + name: bucket.name, + after: BigInt(bucket.after) + })) }); const stream = bucketStorage.watchWriteCheckpoint({ user_id: checkpointUserId, @@ -218,7 +221,7 @@ async function* streamResponseInner( interface BucketDataRequest { syncContext: SyncContext; bucketStorage: storage.SyncRulesBucketStorage; - checkpoint: string; + checkpoint: util.InternalOpId; bucketsToFetch: BucketDescription[]; /** Contains current bucket state. Modified by the request as data is sent. */ checksumState: BucketChecksumState; @@ -290,7 +293,6 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator checkpointOp) { + if (targetOp != null && targetOp > checkpoint) { checkpointInvalidated = true; } if (r.data.length == 0) { @@ -362,7 +364,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator; +/** + * op_id as used internally, for individual operations and checkpoints. + * + * This is just a type alias, but serves to document that we're working with an op_id. + */ +export type InternalOpId = bigint; + export const ID_NAMESPACE = 'a396dd91-09fc-4017-a28d-3df722f651e9'; export function escapeIdentifier(identifier: string) { @@ -31,7 +38,11 @@ export function hashDelete(sourceKey: string) { return buffer.readUInt32LE(0); } -export function timestampToOpId(ts: bigint): OpId { +/** + * Internally we always use bigint for op_ids. Externally (in JSON) we use strings. + * This converts between the two. + */ +export function internalToExternalOpId(ts: InternalOpId): ProtocolOpId { // Dynamic values are passed in in some cases, so we make extra sure that the // number is a bigint and not number or Long. if (typeof ts != 'bigint') { diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 1b22dd79a..649610869 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -1,13 +1,12 @@ import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum, PartialChecksum } from '@/storage/ChecksumCache.js'; -import { OpId } from '@/util/protocol-types.js'; -import { addChecksums } from '@/util/util-index.js'; +import { addChecksums, InternalOpId } from '@/util/util-index.js'; import * as crypto from 'node:crypto'; import { describe, expect, it } from 'vitest'; /** * Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes. */ -function testHash(bucket: string, checkpoint: OpId) { +function testHash(bucket: string, checkpoint: InternalOpId) { const key = `${checkpoint}/${bucket}`; const hash = crypto.createHash('sha256').update(key).digest().readInt32LE(0); return hash; @@ -77,17 +76,17 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + expect(await cache.getChecksums(1234n, ['test'])).toEqual([TEST_1234]); - expect(await cache.getChecksums('123', ['test2'])).toEqual([TEST2_123]); + expect(await cache.getChecksums(123n, ['test2'])).toEqual([TEST2_123]); expect(lookups).toEqual([ - [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', end: 123n }], // This should use the previous lookup - [{ bucket: 'test', start: '123', end: '1234' }], - [{ bucket: 'test2', end: '123' }] + [{ bucket: 'test', start: 123n, end: 1234n }], + [{ bucket: 'test2', end: 123n }] ]); }); @@ -99,17 +98,17 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test2'])).toEqual([TEST2_123]); + expect(await cache.getChecksums(123n, ['test2'])).toEqual([TEST2_123]); - expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + expect(await cache.getChecksums(1234n, ['test'])).toEqual([TEST_1234]); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); expect(lookups).toEqual([ // With this order, there is no option for a partial lookup - [{ bucket: 'test2', end: '123' }], - [{ bucket: 'test', end: '1234' }], - [{ bucket: 'test', end: '123' }] + [{ bucket: 'test2', end: 123n }], + [{ bucket: 'test', end: 1234n }], + [{ bucket: 'test', end: 123n }] ]); }); @@ -120,9 +119,9 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const p1 = cache.getChecksums('123', ['test']); - const p2 = cache.getChecksums('1234', ['test']); - const p3 = cache.getChecksums('123', ['test2']); + const p1 = cache.getChecksums(123n, ['test']); + const p2 = cache.getChecksums(1234n, ['test']); + const p3 = cache.getChecksums(123n, ['test2']); expect(await p1).toEqual([TEST_123]); expect(await p2).toEqual([TEST_1234]); @@ -130,9 +129,9 @@ describe('checksum cache', function () { // Concurrent requests, so we can't do a partial lookup for 123 -> 1234 expect(lookups).toEqual([ - [{ bucket: 'test', end: '123' }], - [{ bucket: 'test', end: '1234' }], - [{ bucket: 'test2', end: '123' }] + [{ bucket: 'test', end: 123n }], + [{ bucket: 'test', end: 1234n }], + [{ bucket: 'test2', end: 123n }] ]); }); @@ -143,15 +142,15 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const p1 = cache.getChecksums('123', ['test']); - const p2 = cache.getChecksums('123', ['test']); + const p1 = cache.getChecksums(123n, ['test']); + const p2 = cache.getChecksums(123n, ['test']); expect(await p1).toEqual([TEST_123]); expect(await p2).toEqual([TEST_123]); // The lookup should be deduplicated, even though it's in progress - expect(lookups).toEqual([[{ bucket: 'test', end: '123' }]]); + expect(lookups).toEqual([[{ bucket: 'test', end: 123n }]]); }); it('should handle serial + concurrent lookups', async function () { @@ -161,18 +160,18 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); - const p2 = cache.getChecksums('1234', ['test']); - const p3 = cache.getChecksums('1234', ['test']); + const p2 = cache.getChecksums(1234n, ['test']); + const p3 = cache.getChecksums(1234n, ['test']); expect(await p2).toEqual([TEST_1234]); expect(await p3).toEqual([TEST_1234]); expect(lookups).toEqual([ - [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', end: 123n }], // This lookup is deduplicated - [{ bucket: 'test', start: '123', end: '1234' }] + [{ bucket: 'test', start: 123n, end: 1234n }] ]); }); @@ -183,13 +182,13 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + expect(await cache.getChecksums(123n, ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); expect(lookups).toEqual([ [ // Both lookups in the same request - { bucket: 'test', end: '123' }, - { bucket: 'test2', end: '123' } + { bucket: 'test', end: 123n }, + { bucket: 'test2', end: 123n } ] ]); }); @@ -201,14 +200,14 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); expect(lookups).toEqual([ // Request 1 - [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', end: 123n }], // Request 2 - [{ bucket: 'test2', end: '123' }] + [{ bucket: 'test2', end: 123n }] ]); }); @@ -219,8 +218,8 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const a = cache.getChecksums('123', ['test', 'test2']); - const b = cache.getChecksums('123', ['test2', 'test3']); + const a = cache.getChecksums(123n, ['test', 'test2']); + const b = cache.getChecksums(123n, ['test2', 'test3']); expect(await a).toEqual([TEST_123, TEST2_123]); expect(await b).toEqual([TEST2_123, TEST3_123]); @@ -228,11 +227,11 @@ describe('checksum cache', function () { expect(lookups).toEqual([ // Request A [ - { bucket: 'test', end: '123' }, - { bucket: 'test2', end: '123' } + { bucket: 'test', end: 123n }, + { bucket: 'test2', end: 123n } ], // Request B (re-uses the checksum for test2 from request a) - [{ bucket: 'test3', end: '123' }] + [{ bucket: 'test3', end: 123n }] ]); }); @@ -243,9 +242,9 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums('125', ['test'])).toEqual([ + expect(await cache.getChecksums(125n, ['test'])).toEqual([ { bucket: 'test', checksum: -1865121912, @@ -253,7 +252,7 @@ describe('checksum cache', function () { } ]); - expect(await cache.getChecksums('124', ['test'])).toEqual([ + expect(await cache.getChecksums(124n, ['test'])).toEqual([ { bucket: 'test', checksum: 1887460431, @@ -261,9 +260,9 @@ describe('checksum cache', function () { } ]); expect(lookups).toEqual([ - [{ bucket: 'test', end: '123' }], - [{ bucket: 'test', start: '123', end: '125' }], - [{ bucket: 'test', start: '123', end: '124' }] + [{ bucket: 'test', end: 123n }], + [{ bucket: 'test', start: 123n, end: 125n }], + [{ bucket: 'test', start: 123n, end: 124n }] ]); }); @@ -278,14 +277,14 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const a = cache.getChecksums('123', ['test', 'test2']); - const b = cache.getChecksums('123', ['test2', 'test3']); + const a = cache.getChecksums(123n, ['test', 'test2']); + const b = cache.getChecksums(123n, ['test2', 'test3']); await expect(a).rejects.toEqual(TEST_ERROR); await expect(b).rejects.toEqual(TEST_ERROR); - const a2 = cache.getChecksums('123', ['test', 'test2']); - const b2 = cache.getChecksums('123', ['test2', 'test3']); + const a2 = cache.getChecksums(123n, ['test', 'test2']); + const b2 = cache.getChecksums(123n, ['test2', 'test3']); expect(await a2).toEqual([TEST_123, TEST2_123]); expect(await b2).toEqual([TEST2_123, TEST3_123]); @@ -293,16 +292,16 @@ describe('checksum cache', function () { expect(lookups).toEqual([ // Request A (fails) [ - { bucket: 'test', end: '123' }, - { bucket: 'test2', end: '123' } + { bucket: 'test', end: 123n }, + { bucket: 'test2', end: 123n } ], // Request B (re-uses the checksum for test2 from request a) // Even thought the full request fails, this batch succeeds - [{ bucket: 'test3', end: '123' }], + [{ bucket: 'test3', end: 123n }], // Retry request A [ - { bucket: 'test', end: '123' }, - { bucket: 'test2', end: '123' } + { bucket: 'test', end: 123n }, + { bucket: 'test2', end: 123n } ] ]); }); @@ -314,8 +313,8 @@ describe('checksum cache', function () { return fetchTestChecksums(batch.filter((b) => b.bucket != 'test')); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); - expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([ + expect(await cache.getChecksums(123n, ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); + expect(await cache.getChecksums(123n, ['test', 'test2'])).toEqual([ { bucket: 'test', checksum: 0, count: 0 }, TEST2_123 ]); @@ -325,11 +324,11 @@ describe('checksum cache', function () { let lookups: FetchPartialBucketChecksum[][] = []; const cache = factory(async (batch) => { lookups.push(batch); - return fetchTestChecksums(batch.filter((b) => b.bucket != 'test' || b.end != '123')); + return fetchTestChecksums(batch.filter((b) => b.bucket != 'test' || b.end != 123n)); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); - expect(await cache.getChecksums('1234', ['test'])).toEqual([ + expect(await cache.getChecksums(123n, ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); + expect(await cache.getChecksums(1234n, ['test'])).toEqual([ { bucket: 'test', checksum: 1597020602, @@ -337,7 +336,7 @@ describe('checksum cache', function () { } ]); - expect(lookups).toEqual([[{ bucket: 'test', end: '123' }], [{ bucket: 'test', start: '123', end: '1234' }]]); + expect(lookups).toEqual([[{ bucket: 'test', end: 123n }], [{ bucket: 'test', start: 123n, end: 1234n }]]); }); it('should use maxSize', async function () { @@ -350,8 +349,8 @@ describe('checksum cache', function () { maxSize: 2 }); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums('124', ['test'])).toEqual([ + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(124n, ['test'])).toEqual([ { bucket: 'test', checksum: 1887460431, @@ -359,36 +358,36 @@ describe('checksum cache', function () { } ]); - expect(await cache.getChecksums('125', ['test'])).toEqual([ + expect(await cache.getChecksums(125n, ['test'])).toEqual([ { bucket: 'test', checksum: -1865121912, count: 125 } ]); - expect(await cache.getChecksums('126', ['test'])).toEqual([ + expect(await cache.getChecksums(126n, ['test'])).toEqual([ { bucket: 'test', checksum: -1720007310, count: 126 } ]); - expect(await cache.getChecksums('124', ['test'])).toEqual([ + expect(await cache.getChecksums(124n, ['test'])).toEqual([ { bucket: 'test', checksum: 1887460431, count: 124 } ]); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); expect(lookups).toEqual([ - [{ bucket: 'test', end: '123' }], - [{ bucket: 'test', start: '123', end: '124' }], - [{ bucket: 'test', start: '124', end: '125' }], - [{ bucket: 'test', start: '125', end: '126' }], - [{ bucket: 'test', end: '124' }], - [{ bucket: 'test', end: '123' }] + [{ bucket: 'test', end: 123n }], + [{ bucket: 'test', start: 123n, end: 124n }], + [{ bucket: 'test', start: 124n, end: 125n }], + [{ bucket: 'test', start: 125n, end: 126n }], + [{ bucket: 'test', end: 124n }], + [{ bucket: 'test', end: 123n }] ]); }); @@ -403,10 +402,10 @@ describe('checksum cache', function () { maxSize: 2 }); - const p3 = cache.getChecksums('123', ['test3']); - const p4 = cache.getChecksums('123', ['test4']); - const p1 = cache.getChecksums('123', ['test']); - const p2 = cache.getChecksums('123', ['test2']); + const p3 = cache.getChecksums(123n, ['test3']); + const p4 = cache.getChecksums(123n, ['test4']); + const p1 = cache.getChecksums(123n, ['test']); + const p2 = cache.getChecksums(123n, ['test2']); expect(await p1).toEqual([TEST_123]); expect(await p2).toEqual([TEST2_123]); @@ -421,10 +420,10 @@ describe('checksum cache', function () { // The lookup should be deduplicated, even though it's in progress expect(lookups).toEqual([ - [{ bucket: 'test3', end: '123' }], - [{ bucket: 'test4', end: '123' }], - [{ bucket: 'test', end: '123' }], - [{ bucket: 'test2', end: '123' }] + [{ bucket: 'test3', end: 123n }], + [{ bucket: 'test4', end: 123n }], + [{ bucket: 'test', end: 123n }], + [{ bucket: 'test2', end: 123n }] ]); }); @@ -437,7 +436,7 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(1234n, ['test'])).toEqual([TEST_1234]); }); }); diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index 3266573f4..77c7b110e 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -4,7 +4,7 @@ import { BucketChecksumStateStorage, CHECKPOINT_INVALIDATE_ALL, ChecksumMap, - OpId, + InternalOpId, SyncContext, WatchFilterEvent } from '@/index.js'; @@ -66,7 +66,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -84,17 +84,17 @@ bucket_definitions: } ]); // This is the bucket data to be fetched - expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(new Map([['global[]', '0']])); + expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(new Map([['global[]', 0n]])); // This similuates the bucket data being sent - state.updateBucketPosition({ bucket: 'global[]', nextAfter: '1', hasMore: false }); + state.updateBucketPosition({ bucket: 'global[]', nextAfter: 1n, hasMore: false }); // Update bucket storage state storage.updateTestChecksum({ bucket: 'global[]', checksum: 2, count: 2 }); // Now we get a new line const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: '2', lsn: '2' }, + base: { checkpoint: 2n, lsn: '2' }, writeCheckpoint: null, update: { updatedDataBuckets: ['global[]'], @@ -111,7 +111,7 @@ bucket_definitions: write_checkpoint: undefined } }); - expect(state.getFilteredBucketPositions(line2.bucketsToFetch)).toEqual(new Map([['global[]', '1']])); + expect(state.getFilteredBucketPositions(line2.bucketsToFetch)).toEqual(new Map([['global[]', 1n]])); }); test('global bucket with initial state', async () => { @@ -125,14 +125,14 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, // Client sets the initial state here - initialBucketPositions: [{ name: 'global[]', after: '1' }], + initialBucketPositions: [{ name: 'global[]', after: 1n }], syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL, bucketStorage: storage }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -150,7 +150,7 @@ bucket_definitions: } ]); // This is the main difference between this and the previous test - expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(new Map([['global[]', '1']])); + expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(new Map([['global[]', 1n]])); }); test('multiple static buckets', async () => { @@ -167,7 +167,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -196,7 +196,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: '2', lsn: '2' }, + base: { checkpoint: 2n, lsn: '2' }, writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, @@ -226,7 +226,7 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, // Client sets the initial state here - initialBucketPositions: [{ name: 'something_here[]', after: '1' }], + initialBucketPositions: [{ name: 'something_here[]', after: 1n }], syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL, bucketStorage: storage @@ -235,7 +235,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[]', checksum: 1, count: 1 }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -252,7 +252,7 @@ bucket_definitions: priority: 3 } ]); - expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(new Map([['global[]', '0']])); + expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual(new Map([['global[]', 0n]])); }); test('invalidating individual bucket', async () => { @@ -274,19 +274,19 @@ bucket_definitions: // storage.filter = state.checkpointFilter; await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }); - state.updateBucketPosition({ bucket: 'global[1]', nextAfter: '1', hasMore: false }); - state.updateBucketPosition({ bucket: 'global[2]', nextAfter: '1', hasMore: false }); + state.updateBucketPosition({ bucket: 'global[1]', nextAfter: 1n, hasMore: false }); + state.updateBucketPosition({ bucket: 'global[2]', nextAfter: 1n, hasMore: false }); storage.updateTestChecksum({ bucket: 'global[1]', checksum: 2, count: 2 }); storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: '2', lsn: '2' }, + base: { checkpoint: 2n, lsn: '2' }, writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, @@ -330,7 +330,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 1, count: 1 }); await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }); @@ -339,7 +339,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: '2', lsn: '2' }, + base: { checkpoint: 2n, lsn: '2' }, writeCheckpoint: null, // Invalidate the state - will re-check all buckets update: CHECKPOINT_INVALIDATE_ALL @@ -375,7 +375,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: '3', lsn: '3' }, + base: { checkpoint: 3n, lsn: '3' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -403,19 +403,19 @@ bucket_definitions: // This is the bucket data to be fetched expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual( new Map([ - ['global[1]', '0'], - ['global[2]', '0'] + ['global[1]', 0n], + ['global[2]', 0n] ]) ); // No data changes here. // We simulate partial data sent, before a checkpoint is interrupted. - state.updateBucketPosition({ bucket: 'global[1]', nextAfter: '3', hasMore: false }); - state.updateBucketPosition({ bucket: 'global[2]', nextAfter: '1', hasMore: true }); + state.updateBucketPosition({ bucket: 'global[1]', nextAfter: 3n, hasMore: false }); + state.updateBucketPosition({ bucket: 'global[2]', nextAfter: 1n, hasMore: true }); storage.updateTestChecksum({ bucket: 'global[1]', checksum: 4, count: 4 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: '4', lsn: '4' }, + base: { checkpoint: 4n, lsn: '4' }, writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, @@ -452,8 +452,8 @@ bucket_definitions: expect(state.getFilteredBucketPositions(line2.bucketsToFetch)).toEqual( new Map([ - ['global[1]', '3'], - ['global[2]', '1'] + ['global[1]', 3n], + ['global[2]', 1n] ]) ); }); @@ -472,14 +472,17 @@ bucket_definitions: bucketStorage: storage }); - storage.getParameterSets = async (checkpoint: OpId, lookups: SqliteJsonValue[][]): Promise => { - expect(checkpoint).toEqual('1'); + storage.getParameterSets = async ( + checkpoint: InternalOpId, + lookups: SqliteJsonValue[][] + ): Promise => { + expect(checkpoint).toEqual(1n); expect(lookups).toEqual([['by_project', '1', 'u1']]); return [{ id: 1 }, { id: 2 }]; }; const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: '1', lsn: '1' }, + base: { checkpoint: 1n, lsn: '1' }, writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -506,23 +509,26 @@ bucket_definitions: // This is the bucket data to be fetched expect(state.getFilteredBucketPositions(line.bucketsToFetch)).toEqual( new Map([ - ['by_project[1]', '0'], - ['by_project[2]', '0'] + ['by_project[1]', 0n], + ['by_project[2]', 0n] ]) ); - state.updateBucketPosition({ bucket: 'by_project[1]', nextAfter: '1', hasMore: false }); - state.updateBucketPosition({ bucket: 'by_project[2]', nextAfter: '1', hasMore: false }); + state.updateBucketPosition({ bucket: 'by_project[1]', nextAfter: 1n, hasMore: false }); + state.updateBucketPosition({ bucket: 'by_project[2]', nextAfter: 1n, hasMore: false }); - storage.getParameterSets = async (checkpoint: OpId, lookups: SqliteJsonValue[][]): Promise => { - expect(checkpoint).toEqual('2'); + storage.getParameterSets = async ( + checkpoint: InternalOpId, + lookups: SqliteJsonValue[][] + ): Promise => { + expect(checkpoint).toEqual(2n); expect(lookups).toEqual([['by_project', '1', 'u1']]); return [{ id: 1 }, { id: 2 }, { id: 3 }]; }; // Now we get a new line const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: '2', lsn: '2' }, + base: { checkpoint: 2n, lsn: '2' }, writeCheckpoint: null, update: { invalidateDataBuckets: false, @@ -539,7 +545,7 @@ bucket_definitions: write_checkpoint: undefined } }); - expect(state.getFilteredBucketPositions(line2.bucketsToFetch)).toEqual(new Map([['by_project[3]', '0']])); + expect(state.getFilteredBucketPositions(line2.bucketsToFetch)).toEqual(new Map([['by_project[3]', 0n]])); }); }); @@ -558,7 +564,7 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage { this.filter?.({ invalidate: true }); } - async getChecksums(checkpoint: OpId, buckets: string[]): Promise { + async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { return new Map( buckets.map((bucket) => { const checksum = this.state.get(bucket); @@ -574,7 +580,7 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage { ); } - async getParameterSets(checkpoint: OpId, lookups: SqliteJsonValue[][]): Promise { + async getParameterSets(checkpoint: InternalOpId, lookups: SqliteJsonValue[][]): Promise { throw new Error('Method not implemented.'); } }