Skip to content

Commit d56eeb9

Browse files
authored
Fix replication switchover (#308)
* Refactor autoActivate logic to execute on the first consistent commit. * Refactor resuming of replication. * Fix some postgres tests. * Add missing file. * Postgres storage: activate on keepalive. * We do need a commit after snapshots. * Only trigger resnapshot if needed. * Improve test stability. * Fix metrics test. * Improve GA labels. * Add changeset. * Further test stability improvements. * And more test fixes. * Periodically persist replication progress in absense of commits. * Avoid waiting for probes.touch(). * Another attempt at making tests more stable.
1 parent 4ebc3bf commit d56eeb9

File tree

25 files changed

+333
-187
lines changed

25 files changed

+333
-187
lines changed

.changeset/shiny-pugs-train.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mysql': minor
9+
---
10+
11+
Delay switching over to new sync rules until we have a consistent checkpoint.

.github/workflows/test.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ jobs:
168168
shell: bash
169169
run: pnpm build
170170

171-
- name: Test
171+
- name: Test Replication
172172
run: pnpm test --filter='./modules/module-postgres'
173173

174-
- name: Test
174+
- name: Test Storage
175175
run: pnpm test --filter='./modules/module-postgres-storage'
176176

177177
run-mysql-tests:
@@ -252,7 +252,7 @@ jobs:
252252
shell: bash
253253
run: pnpm build
254254

255-
- name: Test
255+
- name: Test Replication
256256
run: pnpm test --filter='./modules/module-mysql'
257257

258258
run-mongodb-tests:
@@ -320,7 +320,7 @@ jobs:
320320
shell: bash
321321
run: pnpm build
322322

323-
- name: Test
323+
- name: Test Replication
324324
run: pnpm test --filter='./modules/module-mongodb'
325325

326326
- name: Test Storage

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
BucketStorageMarkRecordUnavailable,
1717
deserializeBson,
1818
InternalOpId,
19+
isCompleteRow,
1920
SaveOperationTag,
2021
storage,
2122
utils
@@ -49,6 +50,7 @@ export interface MongoBucketBatchOptions {
4950
lastCheckpointLsn: string | null;
5051
keepaliveOp: InternalOpId | null;
5152
noCheckpointBeforeLsn: string;
53+
resumeFromLsn: string | null;
5254
storeCurrentData: boolean;
5355
/**
5456
* Set to true for initial replication.
@@ -99,6 +101,20 @@ export class MongoBucketBatch
99101
*/
100102
public last_flushed_op: InternalOpId | null = null;
101103

104+
/**
105+
* lastCheckpointLsn is the last consistent commit.
106+
*
107+
* While that is generally a "safe" point to resume from, there are cases where we may want to resume from a different point:
108+
* 1. After an initial snapshot, we don't have a consistent commit yet, but need to resume from the snapshot LSN.
109+
* 2. If "no_checkpoint_before_lsn" is set far in advance, it may take a while to reach that point. We
110+
* may want to resume at incremental points before that.
111+
*
112+
* This is set when creating the batch, but may not be updated afterwards.
113+
*/
114+
public resumeFromLsn: string | null = null;
115+
116+
private needsActivation = true;
117+
102118
constructor(options: MongoBucketBatchOptions) {
103119
super();
104120
this.logger = options.logger ?? defaultLogger;
@@ -107,6 +123,7 @@ export class MongoBucketBatch
107123
this.group_id = options.groupId;
108124
this.last_checkpoint_lsn = options.lastCheckpointLsn;
109125
this.no_checkpoint_before_lsn = options.noCheckpointBeforeLsn;
126+
this.resumeFromLsn = options.resumeFromLsn;
110127
this.session = this.client.startSession();
111128
this.slot_name = options.slotName;
112129
this.sync_rules = options.syncRules;
@@ -332,7 +349,7 @@ export class MongoBucketBatch
332349
// Not an error if we re-apply a transaction
333350
existing_buckets = [];
334351
existing_lookups = [];
335-
if (this.storeCurrentData) {
352+
if (!isCompleteRow(this.storeCurrentData, after!)) {
336353
if (this.markRecordUnavailable != null) {
337354
// This will trigger a "resnapshot" of the record.
338355
// This is not relevant if storeCurrentData is false, since we'll get the full row
@@ -685,6 +702,7 @@ export class MongoBucketBatch
685702

686703
if (!createEmptyCheckpoints && this.persisted_op == null) {
687704
// Nothing to commit - also return true
705+
await this.autoActivate(lsn);
688706
return true;
689707
}
690708

@@ -729,12 +747,65 @@ export class MongoBucketBatch
729747
},
730748
{ session: this.session }
731749
);
750+
await this.autoActivate(lsn);
732751
await this.db.notifyCheckpoint();
733752
this.persisted_op = null;
734753
this.last_checkpoint_lsn = lsn;
735754
return true;
736755
}
737756

757+
/**
758+
* Switch from processing -> active if relevant.
759+
*
760+
* Called on new commits.
761+
*/
762+
private async autoActivate(lsn: string) {
763+
if (!this.needsActivation) {
764+
return;
765+
}
766+
767+
// Activate the batch, so it can start processing.
768+
// This is done automatically when the first save() is called.
769+
770+
const session = this.session;
771+
let activated = false;
772+
await session.withTransaction(async () => {
773+
const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session });
774+
if (doc && doc.state == 'PROCESSING') {
775+
await this.db.sync_rules.updateOne(
776+
{
777+
_id: this.group_id
778+
},
779+
{
780+
$set: {
781+
state: storage.SyncRuleState.ACTIVE
782+
}
783+
},
784+
{ session }
785+
);
786+
787+
await this.db.sync_rules.updateMany(
788+
{
789+
_id: { $ne: this.group_id },
790+
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
791+
},
792+
{
793+
$set: {
794+
state: storage.SyncRuleState.STOP
795+
}
796+
},
797+
{ session }
798+
);
799+
activated = true;
800+
}
801+
});
802+
if (activated) {
803+
this.logger.info(`Activated new sync rules at ${lsn}`);
804+
await this.db.notifyCheckpoint();
805+
}
806+
this.needsActivation = false;
807+
}
808+
738809
async keepalive(lsn: string): Promise<boolean> {
739810
if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) {
740811
// No-op
@@ -782,13 +853,14 @@ export class MongoBucketBatch
782853
},
783854
{ session: this.session }
784855
);
856+
await this.autoActivate(lsn);
785857
await this.db.notifyCheckpoint();
786858
this.last_checkpoint_lsn = lsn;
787859

788860
return true;
789861
}
790862

791-
async setSnapshotLsn(lsn: string): Promise<void> {
863+
async setResumeLsn(lsn: string): Promise<void> {
792864
const update: Partial<SyncRuleDocument> = {
793865
snapshot_lsn: lsn
794866
};

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
GetCheckpointChangesOptions,
1717
InternalOpId,
1818
internalToExternalOpId,
19+
maxLsn,
1920
ProtocolOpId,
2021
ReplicationCheckpoint,
2122
storage,
@@ -131,7 +132,7 @@ export class MongoSyncBucketStorage
131132
{
132133
_id: this.group_id
133134
},
134-
{ projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1 } }
135+
{ projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1, snapshot_lsn: 1 } }
135136
);
136137
const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null;
137138

@@ -142,6 +143,7 @@ export class MongoSyncBucketStorage
142143
groupId: this.group_id,
143144
slotName: this.slot_name,
144145
lastCheckpointLsn: checkpoint_lsn,
146+
resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn),
145147
noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN,
146148
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null,
147149
storeCurrentData: options.storeCurrentData,
@@ -640,41 +642,6 @@ export class MongoSyncBucketStorage
640642
);
641643
}
642644

643-
async autoActivate(): Promise<void> {
644-
await this.db.client.withSession(async (session) => {
645-
await session.withTransaction(async () => {
646-
const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session });
647-
if (doc && doc.state == 'PROCESSING') {
648-
await this.db.sync_rules.updateOne(
649-
{
650-
_id: this.group_id
651-
},
652-
{
653-
$set: {
654-
state: storage.SyncRuleState.ACTIVE
655-
}
656-
},
657-
{ session }
658-
);
659-
660-
await this.db.sync_rules.updateMany(
661-
{
662-
_id: { $ne: this.group_id },
663-
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
664-
},
665-
{
666-
$set: {
667-
state: storage.SyncRuleState.STOP
668-
}
669-
},
670-
{ session }
671-
);
672-
await this.db.notifyCheckpoint();
673-
}
674-
});
675-
});
676-
}
677-
678645
async reportError(e: any): Promise<void> {
679646
const message = String(e.message ?? 'Replication failure');
680647
await this.db.sync_rules.updateOne(

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,15 @@ export interface SyncRuleDocument {
118118
snapshot_done: boolean;
119119

120120
/**
121+
* This is now used for "resumeLsn".
122+
*
121123
* If snapshot_done = false, this may be the lsn at which we started the snapshot.
122124
*
123125
* This can be used for resuming the snapshot after a restart.
126+
*
127+
* If snapshot_done is true, this is treated as the point to restart replication from.
128+
*
129+
* More specifically, we resume replication from max(snapshot_lsn, last_checkpoint_lsn).
124130
*/
125131
snapshot_lsn: string | undefined;
126132

0 commit comments

Comments
 (0)