Skip to content

Commit 0a8acc2

Browse files
authored
Fix custom write checkpoints (#280)
* Fix for flushing custom write checkpoints. * Add test.
1 parent 1907356 commit 0a8acc2

File tree

2 files changed

+50
-7
lines changed

2 files changed

+50
-7
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ export class MongoBucketBatch
133133
let result: storage.FlushedResult | null = null;
134134
// One flush may be split over multiple transactions.
135135
// Each flushInner() is one transaction.
136-
while (this.batch != null) {
136+
while (this.batch != null || this.write_checkpoint_batch.length > 0) {
137137
let r = await this.flushInner(options);
138138
if (r) {
139139
result = r;
@@ -144,17 +144,16 @@ export class MongoBucketBatch
144144

145145
private async flushInner(options?: storage.BatchBucketFlushOptions): Promise<storage.FlushedResult | null> {
146146
const batch = this.batch;
147-
if (batch == null) {
148-
return null;
149-
}
150-
151147
let last_op: InternalOpId | null = null;
152148
let resumeBatch: OperationBatch | null = null;
153149

154-
await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => {
155-
resumeBatch = await this.replicateBatch(session, batch, opSeq, options);
150+
await this.withReplicationTransaction(`Flushing ${batch?.length ?? 0} ops`, async (session, opSeq) => {
151+
if (batch != null) {
152+
resumeBatch = await this.replicateBatch(session, batch, opSeq, options);
153+
}
156154

157155
if (this.write_checkpoint_batch.length > 0) {
156+
this.logger.info(`Writing ${this.write_checkpoint_batch.length} custom write checkpoints`);
158157
await batchCreateCustomWriteCheckpoints(this.db, session, this.write_checkpoint_batch, opSeq.next());
159158
this.write_checkpoint_batch = [];
160159
}

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1790,6 +1790,50 @@ bucket_definitions:
17901790
});
17911791
});
17921792

1793+
test('custom write checkpoints - standalone checkpoint', async (context) => {
1794+
await using factory = await generateStorageFactory();
1795+
const r = await factory.configureSyncRules({
1796+
content: `
1797+
bucket_definitions:
1798+
mybucket:
1799+
data: []
1800+
`,
1801+
validate: false
1802+
});
1803+
const bucketStorage = factory.getInstance(r.persisted_sync_rules!);
1804+
await bucketStorage.autoActivate();
1805+
bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM);
1806+
1807+
const abortController = new AbortController();
1808+
context.onTestFinished(() => abortController.abort());
1809+
const iter = bucketStorage
1810+
.watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal })
1811+
[Symbol.asyncIterator]();
1812+
1813+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1814+
// Flush to clear state
1815+
await batch.flush();
1816+
1817+
await batch.addCustomWriteCheckpoint({
1818+
checkpoint: 5n,
1819+
user_id: 'user1'
1820+
});
1821+
await batch.flush();
1822+
await batch.keepalive('5/0');
1823+
});
1824+
1825+
const result = await iter.next();
1826+
expect(result).toMatchObject({
1827+
done: false,
1828+
value: {
1829+
base: {
1830+
lsn: '5/0'
1831+
},
1832+
writeCheckpoint: 5n
1833+
}
1834+
});
1835+
});
1836+
17931837
test('custom write checkpoints - write after checkpoint', async (context) => {
17941838
await using factory = await generateStorageFactory();
17951839
const r = await factory.configureSyncRules({

0 commit comments

Comments
 (0)