Skip to content

Commit c588665

Browse files
committed
Refactor internal MongoSyncBucketStorage checkpoint lookup.
1 parent 19e6dd5 commit c588665

File tree

1 file changed

+43
-47
lines changed

1 file changed

+43
-47
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ import * as lib_mongo from '@powersync/lib-service-mongodb';
22
import { mongo } from '@powersync/lib-service-mongodb';
33
import {
44
BaseObserver,
5-
ErrorCode,
65
logger,
76
ReplicationAbortedError,
8-
ServiceAssertionError,
9-
ServiceError
7+
ServiceAssertionError
108
} from '@powersync/lib-services-framework';
119
import {
1210
BroadcastIterable,
@@ -30,14 +28,7 @@ import { LRUCache } from 'lru-cache';
3028
import * as timers from 'timers/promises';
3129
import { MongoBucketStorage } from '../MongoBucketStorage.js';
3230
import { PowerSyncMongo } from './db.js';
33-
import {
34-
BucketDataDocument,
35-
BucketDataKey,
36-
BucketStateDocument,
37-
SourceKey,
38-
SourceTableDocument,
39-
SyncRuleCheckpointState
40-
} from './models.js';
31+
import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js';
4132
import { MongoBucketBatch } from './MongoBucketBatch.js';
4233
import { MongoCompactor } from './MongoCompactor.js';
4334
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
@@ -106,16 +97,35 @@ export class MongoSyncBucketStorage
10697
}
10798

10899
async getCheckpoint(): Promise<storage.ReplicationCheckpoint> {
109-
const doc = await this.db.sync_rules.findOne(
110-
{ _id: this.group_id },
111-
{
112-
projection: { last_checkpoint: 1, last_checkpoint_lsn: 1, snapshot_done: 1 }
100+
return (await this.getCheckpointInternal()) ?? new EmptyReplicationCheckpoint();
101+
}
102+
103+
async getCheckpointInternal(): Promise<storage.ReplicationCheckpoint | null> {
104+
return await this.db.client.withSession(async (session) => {
105+
const doc = await this.db.sync_rules.findOne(
106+
{ _id: this.group_id },
107+
{
108+
session,
109+
projection: { _id: 1, state: 1, last_checkpoint: 1, last_checkpoint_lsn: 1, snapshot_done: 1 }
110+
}
111+
);
112+
if (!doc?.snapshot_done || !['ACTIVE', 'ERRORED'].includes(doc.state)) {
113+
// Sync rules not active - return null
114+
return null;
113115
}
114-
);
115-
if (!doc?.snapshot_done) {
116-
return new MongoReplicationCheckpoint(this, 0n, null);
117-
}
118-
return new MongoReplicationCheckpoint(this, doc.last_checkpoint ?? 0n, doc.last_checkpoint_lsn ?? null);
116+
117+
const clusterTime = session.clusterTime;
118+
if (clusterTime == null) {
119+
throw new ServiceAssertionError('Missing clusterTime in getCheckpoint()');
120+
}
121+
return new MongoReplicationCheckpoint(
122+
this,
123+
// null/0n is a valid checkpoint in some cases, for example if the initial snapshot was empty
124+
doc.last_checkpoint ?? 0n,
125+
doc.last_checkpoint_lsn ?? null,
126+
clusterTime.clusterTime
127+
);
128+
});
119129
}
120130

121131
async startBatch(
@@ -655,10 +665,6 @@ export class MongoSyncBucketStorage
655665
return new MongoCompactor(this.db, this.group_id, options).compact();
656666
}
657667

658-
private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null): ReplicationCheckpoint {
659-
return new MongoReplicationCheckpoint(this, doc?.last_checkpoint ?? 0n, doc?.last_checkpoint_lsn ?? null);
660-
}
661-
662668
/**
663669
* Instance-wide watch on the latest available checkpoint (op_id + lsn).
664670
*/
@@ -678,33 +684,13 @@ export class MongoSyncBucketStorage
678684
break;
679685
}
680686

681-
const doc = await this.db.sync_rules.findOne(
682-
{
683-
_id: this.group_id,
684-
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
685-
},
686-
{
687-
limit: 1,
688-
projection: {
689-
_id: 1,
690-
state: 1,
691-
last_checkpoint: 1,
692-
last_checkpoint_lsn: 1
693-
}
694-
}
695-
);
696-
697-
if (doc == null) {
698-
// Sync rules not present or not active.
699-
// Abort the connections - clients will have to retry later.
700-
throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available');
701-
} else if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) {
687+
const op = await this.getCheckpointInternal();
688+
if (op == null) {
702689
// Sync rules have changed - abort and restart.
703690
// We do a soft close of the stream here - no error
704691
break;
705692
}
706693

707-
const op = this.makeActiveCheckpoint(doc);
708694
// Check for LSN / checkpoint changes - ignore other metadata changes
709695
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
710696
lastOp = op;
@@ -976,10 +962,20 @@ class MongoReplicationCheckpoint implements ReplicationCheckpoint {
976962
constructor(
977963
private storage: MongoSyncBucketStorage,
978964
public readonly checkpoint: InternalOpId,
979-
public readonly lsn: string | null
965+
public readonly lsn: string | null,
966+
private clusterTime: mongo.Timestamp
980967
) {}
981968

982969
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
983970
return this.storage.getParameterSets(this, lookups);
984971
}
985972
}
973+
974+
class EmptyReplicationCheckpoint implements ReplicationCheckpoint {
975+
readonly checkpoint: InternalOpId = 0n;
976+
readonly lsn: string | null = null;
977+
978+
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
979+
return [];
980+
}
981+
}

0 commit comments

Comments
 (0)