Skip to content

Commit 8defa6e

Browse files
committed
Query parameters at the checkpoint's clusterTime.
1 parent c588665 commit 8defa6e

File tree

3 files changed

+61
-35
lines changed

3 files changed

+61
-35
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, Sour
3232
import { MongoBucketBatch } from './MongoBucketBatch.js';
3333
import { MongoCompactor } from './MongoCompactor.js';
3434
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
35-
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
35+
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js';
3636

3737
export class MongoSyncBucketStorage
3838
extends BaseObserver<storage.SyncRulesBucketStorageListener>
@@ -123,7 +123,7 @@ export class MongoSyncBucketStorage
123123
// null/0n is a valid checkpoint in some cases, for example if the initial snapshot was empty
124124
doc.last_checkpoint ?? 0n,
125125
doc.last_checkpoint_lsn ?? null,
126-
clusterTime.clusterTime
126+
clusterTime
127127
);
128128
});
129129
}
@@ -265,38 +265,53 @@ export class MongoSyncBucketStorage
265265
return result!;
266266
}
267267

268-
async getParameterSets(checkpoint: ReplicationCheckpoint, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
269-
const lookupFilter = lookups.map((lookup) => {
270-
return storage.serializeLookup(lookup);
271-
});
272-
const rows = await this.db.bucket_parameters
273-
.aggregate([
274-
{
275-
$match: {
276-
'key.g': this.group_id,
277-
lookup: { $in: lookupFilter },
278-
_id: { $lte: checkpoint.checkpoint }
279-
}
280-
},
281-
{
282-
$sort: {
283-
_id: -1
284-
}
285-
},
286-
{
287-
$group: {
288-
_id: { key: '$key', lookup: '$lookup' },
289-
bucket_parameters: {
290-
$first: '$bucket_parameters'
268+
async getParameterSets(checkpoint: MongoReplicationCheckpoint, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
269+
return this.db.client.withSession({ snapshot: true }, async (session) => {
270+
// Set the session's snapshot time to the checkpoint's cluster time.
271+
// An alternative would be to create the session when the checkpoint is created, but managing
272+
// the session lifetime would become more complex.
273+
// Starting and ending sessions are cheap (synchronous when no transactions are used),
274+
// so this should be fine.
275+
// This is a roundabout way of setting {readConcern: {atClusterTime: clusterTime}}, since
276+
// that is not exposed directly by the driver.
277+
// Future versions of the driver may change the snapshotTime behavior, so we need tests to
278+
// validate that this works as expected.
279+
setSessionSnapshotTime(session, checkpoint.clusterTime.clusterTime);
280+
const lookupFilter = lookups.map((lookup) => {
281+
return storage.serializeLookup(lookup);
282+
});
283+
const rows = await this.db.bucket_parameters
284+
.aggregate(
285+
[
286+
{
287+
$match: {
288+
'key.g': this.group_id,
289+
lookup: { $in: lookupFilter },
290+
_id: { $lte: checkpoint.checkpoint }
291+
}
292+
},
293+
{
294+
$sort: {
295+
_id: -1
296+
}
297+
},
298+
{
299+
$group: {
300+
_id: { key: '$key', lookup: '$lookup' },
301+
bucket_parameters: {
302+
$first: '$bucket_parameters'
303+
}
304+
}
291305
}
292-
}
293-
}
294-
])
295-
.toArray();
296-
const groupedParameters = rows.map((row) => {
297-
return row.bucket_parameters;
306+
],
307+
{ session, readConcern: 'snapshot' }
308+
)
309+
.toArray();
310+
const groupedParameters = rows.map((row) => {
311+
return row.bucket_parameters;
312+
});
313+
return groupedParameters.flat();
298314
});
299-
return groupedParameters.flat();
300315
}
301316

302317
async *getBucketDataBatch(
@@ -963,7 +978,7 @@ class MongoReplicationCheckpoint implements ReplicationCheckpoint {
963978
private storage: MongoSyncBucketStorage,
964979
public readonly checkpoint: InternalOpId,
965980
public readonly lsn: string | null,
966-
private clusterTime: mongo.Timestamp
981+
public clusterTime: mongo.ClusterTime
967982
) {}
968983

969984
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {

modules/module-mongodb-storage/src/storage/implementation/util.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { storage, utils } from '@powersync/service-core';
77

88
import { PowerSyncMongo } from './db.js';
99
import { BucketDataDocument } from './models.js';
10+
import { ServiceAssertionError } from '@powersync/lib-services-framework';
1011

1112
export function idPrefixFilter<T>(prefix: Partial<T>, rest: (keyof T)[]): mongo.Condition<T> {
1213
let filter = {
@@ -117,3 +118,15 @@ export const connectMongoForTests = (url: string, isCI: boolean) => {
117118
});
118119
return new PowerSyncMongo(client);
119120
};
121+
122+
export function setSessionSnapshotTime(session: mongo.ClientSession, time: bson.Timestamp) {
123+
// This is a workaround for the lack of direct support for snapshot reads in the MongoDB driver.
124+
if (!session.snapshotEnabled) {
125+
throw new ServiceAssertionError(`Session must be a snapshot session`);
126+
}
127+
if ((session as any).snapshotTime == null) {
128+
(session as any).snapshotTime = time;
129+
} else {
130+
throw new ServiceAssertionError(`Session snapshotTime is already set`);
131+
}
132+
}

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ bucket_definitions:
7575
});
7676

7777
const checkpoint = await bucketStorage.getCheckpoint();
78-
console.log('Checkpoint:', checkpoint);
79-
console.log(await bucketStorage.getStatus());
8078
const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]);
8179
expect(parameters).toEqual([
8280
{

0 commit comments

Comments
 (0)