Skip to content

Commit 88ab679

Browse files
authored
Keep current sync rule storage active when restarting replication due to errors (#216)
* Keep current sync rules active when restarting replication. * Add ERRORED state. * Add ERRORED state for postgres storage. * Don't replicate errored sync rules. * Fixes. * Skip storageCreated events when terminating sync rules. * Fix postgres handling of missing logical replication slot. * Add changeset.
1 parent 2f75fd7 commit 88ab679

File tree

11 files changed

+118
-49
lines changed

11 files changed

+118
-49
lines changed

.changeset/proud-geckos-draw.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-module-postgres': patch
5+
'@powersync/service-module-mongodb': patch
6+
'@powersync/service-core': patch
7+
---
8+
9+
Keep serving current data when restarting replication due to errors.

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { SqlSyncRules } from '@powersync/service-sync-rules';
22

3-
import { storage } from '@powersync/service-core';
3+
import { GetIntanceOptions, storage } from '@powersync/service-core';
44

55
import { BaseObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
66
import { v4 as uuid } from 'uuid';
@@ -44,13 +44,15 @@ export class MongoBucketStorage
4444
// No-op
4545
}
4646

47-
getInstance(options: storage.PersistedSyncRulesContent): MongoSyncBucketStorage {
48-
let { id, slot_name } = options;
47+
getInstance(syncRules: storage.PersistedSyncRulesContent, options?: GetIntanceOptions): MongoSyncBucketStorage {
48+
let { id, slot_name } = syncRules;
4949
if ((typeof id as any) == 'bigint') {
5050
id = Number(id);
5151
}
52-
const storage = new MongoSyncBucketStorage(this, id, options, slot_name);
53-
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
52+
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name);
53+
if (!options?.skipLifecycleHooks) {
54+
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
55+
}
5456
storage.registerListener({
5557
batchStarted: (batch) => {
5658
batch.registerListener({
@@ -95,13 +97,11 @@ export class MongoBucketStorage
9597
}
9698
}
9799

98-
async slotRemoved(slot_name: string) {
100+
async restartReplication(sync_rules_group_id: number) {
99101
const next = await this.getNextSyncRulesContent();
100102
const active = await this.getActiveSyncRulesContent();
101103

102-
// In both the below cases, we create a new sync rules instance.
103-
// The current one will continue erroring until the next one has finished processing.
104-
if (next != null && next.slot_name == slot_name) {
104+
if (next != null && next.id == sync_rules_group_id) {
105105
// We need to redo the "next" sync rules
106106
await this.updateSyncRules({
107107
content: next.sync_rules_content,
@@ -119,22 +119,39 @@ export class MongoBucketStorage
119119
}
120120
}
121121
);
122-
} else if (next == null && active?.slot_name == slot_name) {
122+
} else if (next == null && active?.id == sync_rules_group_id) {
123123
// Slot removed for "active" sync rules, while there is no "next" one.
124124
await this.updateSyncRules({
125125
content: active.sync_rules_content,
126126
validate: false
127127
});
128128

129-
// Pro-actively stop replicating
129+
// In this case we keep the old one as active for clients, so that that existing clients
130+
// can still get the latest data while we replicate the new ones.
131+
// It will however not replicate anymore.
132+
130133
await this.db.sync_rules.updateOne(
131134
{
132135
_id: active.id,
133136
state: storage.SyncRuleState.ACTIVE
134137
},
135138
{
136139
$set: {
137-
state: storage.SyncRuleState.STOP
140+
state: storage.SyncRuleState.ERRORED
141+
}
142+
}
143+
);
144+
} else if (next != null && active?.id == sync_rules_group_id) {
145+
// Already have next sync rules, but need to stop replicating the active one.
146+
147+
await this.db.sync_rules.updateOne(
148+
{
149+
_id: active.id,
150+
state: storage.SyncRuleState.ACTIVE
151+
},
152+
{
153+
$set: {
154+
state: storage.SyncRuleState.ERRORED
138155
}
139156
}
140157
);
@@ -211,7 +228,7 @@ export class MongoBucketStorage
211228
async getActiveSyncRulesContent(): Promise<MongoPersistedSyncRulesContent | null> {
212229
const doc = await this.db.sync_rules.findOne(
213230
{
214-
state: storage.SyncRuleState.ACTIVE
231+
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
215232
},
216233
{ sort: { _id: -1 }, limit: 1 }
217234
);
@@ -249,7 +266,7 @@ export class MongoBucketStorage
249266
async getReplicatingSyncRules(): Promise<storage.PersistedSyncRulesContent[]> {
250267
const docs = await this.db.sync_rules
251268
.find({
252-
$or: [{ state: storage.SyncRuleState.ACTIVE }, { state: storage.SyncRuleState.PROCESSING }]
269+
state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE] }
253270
})
254271
.toArray();
255272

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ export class MongoSyncBucketStorage
604604
await this.db.sync_rules.updateMany(
605605
{
606606
_id: { $ne: this.group_id },
607-
state: storage.SyncRuleState.ACTIVE
607+
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
608608
},
609609
{
610610
$set: {
@@ -657,7 +657,7 @@ export class MongoSyncBucketStorage
657657
doc = await this.db.sync_rules.findOne(
658658
{
659659
_id: syncRulesId,
660-
state: storage.SyncRuleState.ACTIVE
660+
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
661661
},
662662
{
663663
session,
@@ -728,7 +728,7 @@ export class MongoSyncBucketStorage
728728
// Irrelevant update
729729
continue;
730730
}
731-
if (doc.state != storage.SyncRuleState.ACTIVE) {
731+
if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) {
732732
// Sync rules have changed - abort and restart.
733733
// Should this error instead?
734734
break;

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
4040
this.logger.error(`Replication failed`, e);
4141

4242
if (e instanceof ChangeStreamInvalidatedError) {
43-
// This stops replication on this slot, and creates a new slot
44-
await this.options.storage.factory.slotRemoved(this.slotName);
43+
// This stops replication and restarts with a new instance
44+
await this.options.storage.factory.restartReplication(this.storage.group_id);
4545
}
4646
} finally {
4747
this.abortController.abort();

modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as framework from '@powersync/lib-services-framework';
2-
import { storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core';
2+
import { GetIntanceOptions, storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core';
33
import * as pg_wire from '@powersync/service-jpgwire';
44
import * as sync_rules from '@powersync/service-sync-rules';
55
import crypto from 'crypto';
@@ -50,14 +50,19 @@ export class PostgresBucketStorageFactory
5050
// This has not been implemented yet.
5151
}
5252

53-
getInstance(syncRules: storage.PersistedSyncRulesContent): storage.SyncRulesBucketStorage {
53+
getInstance(
54+
syncRules: storage.PersistedSyncRulesContent,
55+
options?: GetIntanceOptions
56+
): storage.SyncRulesBucketStorage {
5457
const storage = new PostgresSyncRulesStorage({
5558
factory: this,
5659
db: this.db,
5760
sync_rules: syncRules,
5861
batchLimits: this.options.config.batch_limits
5962
});
60-
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
63+
if (!options?.skipLifecycleHooks) {
64+
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
65+
}
6166
storage.registerListener({
6267
batchStarted: (batch) => {
6368
batch.registerListener({
@@ -225,13 +230,13 @@ export class PostgresBucketStorageFactory
225230
});
226231
}
227232

228-
async slotRemoved(slot_name: string): Promise<void> {
233+
async restartReplication(sync_rules_group_id: number): Promise<void> {
229234
const next = await this.getNextSyncRulesContent();
230235
const active = await this.getActiveSyncRulesContent();
231236

232237
// In both the below cases, we create a new sync rules instance.
233-
// The current one will continue erroring until the next one has finished processing.
234-
if (next != null && next.slot_name == slot_name) {
238+
// The current one will continue serving sync requests until the next one has finished processing.
239+
if (next != null && next.id == sync_rules_group_id) {
235240
// We need to redo the "next" sync rules
236241
await this.updateSyncRules({
237242
content: next.sync_rules_content,
@@ -246,18 +251,30 @@ export class PostgresBucketStorageFactory
246251
id = ${{ value: next.id, type: 'int4' }}
247252
AND state = ${{ value: storage.SyncRuleState.PROCESSING, type: 'varchar' }}
248253
`.execute();
249-
} else if (next == null && active?.slot_name == slot_name) {
254+
} else if (next == null && active?.id == sync_rules_group_id) {
250255
// Slot removed for "active" sync rules, while there is no "next" one.
251256
await this.updateSyncRules({
252257
content: active.sync_rules_content,
253258
validate: false
254259
});
255260

256-
// Pro-actively stop replicating
261+
// Pro-actively stop replicating, but still serve clients with existing data
257262
await this.db.sql`
258263
UPDATE sync_rules
259264
SET
260-
state = ${{ value: storage.SyncRuleState.STOP, type: 'varchar' }}
265+
state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
266+
WHERE
267+
id = ${{ value: active.id, type: 'int4' }}
268+
AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
269+
`.execute();
270+
} else if (next != null && active?.id == sync_rules_group_id) {
271+
// Already have "next" sync rules - don't update any.
272+
273+
// Pro-actively stop replicating, but still serve clients with existing data
274+
await this.db.sql`
275+
UPDATE sync_rules
276+
SET
277+
state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
261278
WHERE
262279
id = ${{ value: active.id, type: 'int4' }}
263280
AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
@@ -279,6 +296,7 @@ export class PostgresBucketStorageFactory
279296
sync_rules
280297
WHERE
281298
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
299+
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
282300
ORDER BY
283301
id DESC
284302
LIMIT

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,10 @@ export class PostgresSyncRulesStorage
657657
SET
658658
state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }}
659659
WHERE
660-
state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }}
660+
(
661+
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
662+
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
663+
)
661664
AND id != ${{ type: 'int4', value: this.group_id }}
662665
`.execute();
663666
});
@@ -729,6 +732,7 @@ export class PostgresSyncRulesStorage
729732
sync_rules
730733
WHERE
731734
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
735+
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
732736
ORDER BY
733737
id DESC
734738
LIMIT
@@ -791,7 +795,8 @@ export class PostgresSyncRulesStorage
791795
FROM
792796
sync_rules
793797
WHERE
794-
state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }}
798+
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
799+
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
795800
LIMIT
796801
1
797802
`

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ export class WalStream {
239239
needsNewSlot: r.needsNewSlot
240240
};
241241
} else {
242+
if (snapshotDone) {
243+
// This will create a new slot, while keeping the current sync rules active
244+
throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`);
245+
}
246+
// This will clear data and re-create the same slot
242247
return { needsInitialSync: true, needsNewSlot: true };
243248
}
244249
}

modules/module-postgres/src/replication/WalStreamReplicationJob.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
6060
this.logger.error(`Replication failed on ${this.slotName}`, e);
6161

6262
if (e instanceof MissingReplicationSlotError) {
63-
// This stops replication on this slot, and creates a new slot
64-
await this.options.storage.factory.slotRemoved(this.slotName);
63+
// This stops replication on this slot and restarts with a new slot
64+
await this.options.storage.factory.restartReplication(this.storage.group_id);
6565
}
6666
} finally {
6767
this.abortController.abort();

packages/service-core/src/replication/AbstractReplicator.ts

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,23 +193,23 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
193193

194194
this.replicationJobs = newJobs;
195195

196-
// Terminate any orphaned jobs that no longer have sync rules
196+
// Stop any orphaned jobs that no longer have sync rules.
197+
// Termination happens below
197198
for (let job of existingJobs.values()) {
198199
// Old - stop and clean up
199200
try {
200201
await job.stop();
201-
await this.terminateSyncRules(job.storage);
202202
} catch (e) {
203203
// This will be retried
204-
this.logger.warn('Failed to terminate old replication job}', e);
204+
this.logger.warn('Failed to stop old replication job}', e);
205205
}
206206
}
207207

208208
// Sync rules stopped previously or by a different process.
209209
const stopped = await this.storage.getStoppedSyncRules();
210210
for (let syncRules of stopped) {
211211
try {
212-
const syncRuleStorage = this.storage.getInstance(syncRules);
212+
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
213213
await this.terminateSyncRules(syncRuleStorage);
214214
} catch (e) {
215215
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
@@ -223,13 +223,9 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
223223

224224
protected async terminateSyncRules(syncRuleStorage: storage.SyncRulesBucketStorage) {
225225
this.logger.info(`Terminating sync rules: ${syncRuleStorage.group_id}...`);
226-
try {
227-
await this.cleanUp(syncRuleStorage);
228-
await syncRuleStorage.terminate();
229-
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
230-
} catch (e) {
231-
this.logger.warn(`Failed clean up replication config for sync rules: ${syncRuleStorage.group_id}`, e);
232-
}
226+
await this.cleanUp(syncRuleStorage);
227+
await syncRuleStorage.terminate();
228+
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
233229
}
234230

235231
abstract testConnection(): Promise<ConnectionTestResult>;

packages/service-core/src/storage/BucketStorage.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export enum SyncRuleState {
1212
/**
1313
* Sync rule processing is done, and can be used for sync.
1414
*
15-
* Only one set of sync rules should be in ACTIVE state.
15+
* Only one set of sync rules should be in ACTIVE or ERRORED state.
1616
*/
1717
ACTIVE = 'ACTIVE',
1818
/**
@@ -24,7 +24,16 @@ export enum SyncRuleState {
2424
* After sync rules have been stopped, the data needs to be
2525
* deleted. Once deleted, the state is TERMINATED.
2626
*/
27-
TERMINATED = 'TERMINATED'
27+
TERMINATED = 'TERMINATED',
28+
29+
/**
30+
* Sync rules has run into a permanent replication error. It
31+
* is still the "active" sync rules for syncing to users,
32+
* but should not replicate anymore.
33+
*
34+
* It will transition to STOP when a new sync rules is activated.
35+
*/
36+
ERRORED = 'ERRORED'
2837
}
2938

3039
export const DEFAULT_DOCUMENT_BATCH_LIMIT = 1000;

0 commit comments

Comments
 (0)