diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 01e7b6497..6ad06a327 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -133,7 +133,7 @@ export class MongoBucketBatch let result: storage.FlushedResult | null = null; // One flush may be split over multiple transactions. // Each flushInner() is one transaction. - while (this.batch != null) { + while (this.batch != null || this.write_checkpoint_batch.length > 0) { let r = await this.flushInner(options); if (r) { result = r; @@ -144,17 +144,16 @@ export class MongoBucketBatch private async flushInner(options?: storage.BatchBucketFlushOptions): Promise { const batch = this.batch; - if (batch == null) { - return null; - } - let last_op: InternalOpId | null = null; let resumeBatch: OperationBatch | null = null; - await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => { - resumeBatch = await this.replicateBatch(session, batch, opSeq, options); + await this.withReplicationTransaction(`Flushing ${batch?.length ?? 0} ops`, async (session, opSeq) => { + if (batch != null) { + resumeBatch = await this.replicateBatch(session, batch, opSeq, options); + } if (this.write_checkpoint_batch.length > 0) { + this.logger.info(`Writing ${this.write_checkpoint_batch.length} custom write checkpoints`); await batchCreateCustomWriteCheckpoints(this.db, session, this.write_checkpoint_batch, opSeq.next()); this.write_checkpoint_batch = []; } diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index e8ff90fdb..a766fef47 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1790,6 +1790,50 @@ bucket_definitions: }); }); + test('custom write checkpoints - standalone checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // Flush to clear state + await batch.flush(); + + await batch.addCustomWriteCheckpoint({ + checkpoint: 5n, + user_id: 'user1' + }); + await batch.flush(); + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + lsn: '5/0' + }, + writeCheckpoint: 5n + } + }); + }); + test('custom write checkpoints - write after checkpoint', async (context) => { await using factory = await generateStorageFactory(); const r = await factory.configureSyncRules({