diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index bbcebb0de..966eb77be 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -111,9 +111,16 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } protected async lastManagedWriteCheckpoint(filters: ManagedWriteCheckpointFilters) { - const { user_id } = filters; + const { user_id, heads } = filters; + // TODO: support multiple heads when we need to support multiple connections + const lsn = heads['1']; + if (lsn == null) { + // Can happen if we haven't replicated anything yet. + return null; + } const lastWriteCheckpoint = await this.db.write_checkpoints.findOne({ - user_id: user_id + user_id: user_id, + 'lsns.1': { $lte: lsn } }); return lastWriteCheckpoint?.client_id ?? null; } diff --git a/packages/service-core/test/src/sync.test.ts b/packages/service-core/test/src/sync.test.ts index 64e463274..75e5f550f 100644 --- a/packages/service-core/test/src/sync.test.ts +++ b/packages/service-core/test/src/sync.test.ts @@ -1,6 +1,6 @@ import { SaveOperationTag } from '@/storage/storage-index.js'; import { RequestTracker } from '@/sync/RequestTracker.js'; -import { streamResponse } from '@/sync/sync.js'; +import { streamResponse, SyncStreamParameters } from '@/sync/sync.js'; import { StreamingSyncLine } from '@/util/protocol-types.js'; import { JSONBig } from '@powersync/service-jsonbig'; import { RequestParameters } from '@powersync/service-sync-rules'; @@ -381,6 +381,67 @@ function defineTests(factory: StorageFactory) { }) }); }); + + test('write checkpoint', async () => { + const f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: BASIC_SYNC_RULES + }); + + const storage = f.getInstance(syncRules); + await storage.autoActivate(); + + await storage.startBatch(BATCH_OPTIONS, async (batch) => { + // <= the managed write checkpoint LSN below + await batch.commit('0/1'); + }); + + const checkpoint = await storage.createManagedWriteCheckpoint({ + user_id: 'test', + heads: { '1': '1/0' } + }); + + const params: SyncStreamParameters = { + storage: f, + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + parseOptions: PARSE_OPTIONS, + tracker, + syncParams: new RequestParameters({ sub: 'test' }, {}), + token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any + }; + const stream1 = streamResponse(params); + const lines1 = await consumeCheckpointLines(stream1); + + // If write checkpoints are not correctly filtered, this may already + // contain the write checkpoint. + expect(lines1[0]).toMatchObject({ + checkpoint: expect.objectContaining({ + last_op_id: '0', + write_checkpoint: undefined + }) + }); + + await storage.startBatch(BATCH_OPTIONS, async (batch) => { + // must be >= the managed write checkpoint LSN + await batch.commit('1/0'); + }); + + // At this point the LSN has advanced, so the write checkpoint should be + // included in the next checkpoint message. + const stream2 = streamResponse(params); + const lines2 = await consumeCheckpointLines(stream2); + expect(lines2[0]).toMatchObject({ + checkpoint: expect.objectContaining({ + last_op_id: '0', + write_checkpoint: `${checkpoint}` + }) + }); + }); } /**