From 545050cd772301fcfd9775efda73319798c3133e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 16:47:57 +0200 Subject: [PATCH 1/8] Keep current sync rules active when restarting replication. --- .../src/storage/MongoBucketStorage.ts | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 22f71dd7f..dd557e930 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -100,7 +100,8 @@ export class MongoBucketStorage const active = await this.getActiveSyncRulesContent(); // In both the below cases, we create a new sync rules instance. - // The current one will continue erroring until the next one has finished processing. + // In the case that next != null && active.slot_name == slot_name, we ignore this. + // That will happen when this is called again after the new sync rules have been created. if (next != null && next.slot_name == slot_name) { // We need to redo the "next" sync rules await this.updateSyncRules({ @@ -126,18 +127,9 @@ export class MongoBucketStorage validate: false }); - // Pro-actively stop replicating - await this.db.sync_rules.updateOne( - { - _id: active.id, - state: storage.SyncRuleState.ACTIVE - }, - { - $set: { - state: storage.SyncRuleState.STOP - } - } - ); + // In this case we keep the old one as active, so that that existing clients can still get the latest + // data while we replicate the new ones. + // The current one will continue erroring until the next one has finished processing. } } From 394b8e47f070974ee797342ad853979fcd4efb4e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 25 Feb 2025 10:17:40 +0200 Subject: [PATCH 2/8] Add ERRORED state. --- .../src/storage/MongoBucketStorage.ts | 22 ++++++++++++++----- .../implementation/MongoSyncBucketStorage.ts | 6 ++--- .../service-core/src/storage/BucketStorage.ts | 11 +++++++++- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index dd557e930..dd13526d2 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -127,9 +127,21 @@ export class MongoBucketStorage validate: false }); - // In this case we keep the old one as active, so that that existing clients can still get the latest - // data while we replicate the new ones. - // The current one will continue erroring until the next one has finished processing. + // In this case we keep the old one as active for clients, so that that existing clients + // can still get the latest data while we replicate the new ones. + // It will however not replicate anymore. + + await this.db.sync_rules.updateOne( + { + _id: active.id, + state: storage.SyncRuleState.ACTIVE + }, + { + $set: { + state: storage.SyncRuleState.ERRORED + } + } + ); } } @@ -203,7 +215,7 @@ export class MongoBucketStorage async getActiveSyncRulesContent(): Promise { const doc = await this.db.sync_rules.findOne( { - state: storage.SyncRuleState.ACTIVE + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { sort: { _id: -1 }, limit: 1 } ); @@ -241,7 +253,7 @@ export class MongoBucketStorage async getReplicatingSyncRules(): Promise { const docs = await this.db.sync_rules .find({ - $or: [{ state: storage.SyncRuleState.ACTIVE }, { state: storage.SyncRuleState.PROCESSING }] + state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }) .toArray(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index ce8ab8677..efe665424 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -587,7 +587,7 @@ export class MongoSyncBucketStorage await this.db.sync_rules.updateMany( { _id: { $ne: this.group_id }, - state: storage.SyncRuleState.ACTIVE + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { $set: { @@ -640,7 +640,7 @@ export class MongoSyncBucketStorage doc = await this.db.sync_rules.findOne( { _id: syncRulesId, - state: storage.SyncRuleState.ACTIVE + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { session, @@ -711,7 +711,7 @@ export class MongoSyncBucketStorage // Irrelevant update continue; } - if (doc.state != storage.SyncRuleState.ACTIVE) { + if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { // Sync rules have changed - abort and restart. // Should this error instead? break; diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 2a2787fa0..11dc7ab83 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -24,7 +24,16 @@ export enum SyncRuleState { * After sync rules have been stopped, the data needs to be * deleted. Once deleted, the state is TERMINATED. */ - TERMINATED = 'TERMINATED' + TERMINATED = 'TERMINATED', + + /** + * Sync rules has run into a permanent replication error. It + * is still the "active" sync rules for syncing to users, + * but should not replicate anymore. + * + * It will transition to STOP when a new sync rules is activated. + */ + ERRORED = 'ERRORED' } export const DEFAULT_DOCUMENT_BATCH_LIMIT = 1000; From c7efaf19e991124f4c9550ae24cd7a7d48b953fe Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 25 Feb 2025 10:21:43 +0200 Subject: [PATCH 3/8] Add ERRORED state for postgres storage. --- .../src/storage/PostgresBucketStorageFactory.ts | 6 ++++-- .../src/storage/PostgresSyncRulesStorage.ts | 9 +++++++-- packages/service-core/src/storage/BucketStorage.ts | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index 2ef50a697..79f270d63 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -253,11 +253,11 @@ export class PostgresBucketStorageFactory validate: false }); - // Pro-actively stop replicating + // Pro-actively stop replicating, but still serve clients with existing data await this.db.sql` UPDATE sync_rules SET - state = ${{ value: storage.SyncRuleState.STOP, type: 'varchar' }} + state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} WHERE id = ${{ value: active.id, type: 'int4' }} AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} @@ -279,6 +279,7 @@ export class PostgresBucketStorageFactory sync_rules WHERE state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} ORDER BY id DESC LIMIT @@ -329,6 +330,7 @@ export class PostgresBucketStorageFactory sync_rules WHERE state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} OR state = ${{ value: storage.SyncRuleState.PROCESSING, type: 'varchar' }} ` .decoded(models.SyncRules) diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 3edcb6825..71eda806a 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -616,7 +616,10 @@ export class PostgresSyncRulesStorage SET state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} WHERE - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + ( + state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} + ) AND id != ${{ type: 'int4', value: this.group_id }} `.execute(); }); @@ -688,6 +691,7 @@ export class PostgresSyncRulesStorage sync_rules WHERE state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} ORDER BY id DESC LIMIT @@ -750,7 +754,8 @@ export class PostgresSyncRulesStorage FROM sync_rules WHERE - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} LIMIT 1 ` diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 11dc7ab83..7f749c8f9 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -12,7 +12,7 @@ export enum SyncRuleState { /** * Sync rule processing is done, and can be used for sync. * - * Only one set of sync rules should be in ACTIVE state. + * Only one set of sync rules should be in ACTIVE or ERRORED state. */ ACTIVE = 'ACTIVE', /** From 047a2e2a4f1975af090753979aa4f467463dbf40 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 25 Feb 2025 10:24:51 +0200 Subject: [PATCH 4/8] Don't replicate errored sync rules. --- .../module-mongodb-storage/src/storage/MongoBucketStorage.ts | 2 +- .../src/storage/PostgresBucketStorageFactory.ts | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index dd13526d2..4aadbc7f1 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -253,7 +253,7 @@ export class MongoBucketStorage async getReplicatingSyncRules(): Promise { const docs = await this.db.sync_rules .find({ - state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } + state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE] } }) .toArray(); diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index 79f270d63..57f376c55 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -330,7 +330,6 @@ export class PostgresBucketStorageFactory sync_rules WHERE state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} - OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} OR state = ${{ value: storage.SyncRuleState.PROCESSING, type: 'varchar' }} ` .decoded(models.SyncRules) From e2675af7917544a22dc03377e248351e210a6dfe Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 25 Feb 2025 10:42:04 +0200 Subject: [PATCH 5/8] Fixes. --- .../src/storage/MongoBucketStorage.ts | 23 ++++++++++++++----- .../replication/ChangeStreamReplicationJob.ts | 4 ++-- .../storage/PostgresBucketStorageFactory.ts | 20 ++++++++++++---- .../replication/WalStreamReplicationJob.ts | 4 ++-- .../src/replication/AbstractReplicator.ts | 16 +++++-------- .../src/storage/BucketStorageFactory.ts | 4 +--- 6 files changed, 44 insertions(+), 27 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 4aadbc7f1..3b05c5cdb 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -95,14 +95,11 @@ export class MongoBucketStorage } } - async slotRemoved(slot_name: string) { + async restartReplication(sync_rules_group_id: number) { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); - // In both the below cases, we create a new sync rules instance. - // In the case that next != null && active.slot_name == slot_name, we ignore this. - // That will happen when this is called again after the new sync rules have been created. - if (next != null && next.slot_name == slot_name) { + if (next != null && next.id == sync_rules_group_id) { // We need to redo the "next" sync rules await this.updateSyncRules({ content: next.sync_rules_content, @@ -120,7 +117,7 @@ export class MongoBucketStorage } } ); - } else if (next == null && active?.slot_name == slot_name) { + } else if (next == null && active?.id == sync_rules_group_id) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ content: active.sync_rules_content, @@ -131,6 +128,20 @@ export class MongoBucketStorage // can still get the latest data while we replicate the new ones. // It will however not replicate anymore. + await this.db.sync_rules.updateOne( + { + _id: active.id, + state: storage.SyncRuleState.ACTIVE + }, + { + $set: { + state: storage.SyncRuleState.ERRORED + } + } + ); + } else if (next != null && active?.id == sync_rules_group_id) { + // Already have next sync rules, but need to stop replicating the active one. + await this.db.sync_rules.updateOne( { _id: active.id, diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index e2ba6a24a..a9b2fb6dd 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -40,8 +40,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ this.logger.error(`Replication failed`, e); if (e instanceof ChangeStreamInvalidatedError) { - // This stops replication on this slot, and creates a new slot - await this.options.storage.factory.slotRemoved(this.slotName); + // This stops replication and restarts with a new instance + await this.options.storage.factory.restartReplication(this.storage.group_id); } } finally { this.abortController.abort(); diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index 57f376c55..a2db0f180 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -225,13 +225,13 @@ export class PostgresBucketStorageFactory }); } - async slotRemoved(slot_name: string): Promise { + async restartReplication(sync_rules_group_id: number): Promise { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); // In both the below cases, we create a new sync rules instance. - // The current one will continue erroring until the next one has finished processing. - if (next != null && next.slot_name == slot_name) { + // The current one will continue serving sync requests until the next one has finished processing. + if (next != null && next.id == sync_rules_group_id) { // We need to redo the "next" sync rules await this.updateSyncRules({ content: next.sync_rules_content, @@ -246,13 +246,25 @@ export class PostgresBucketStorageFactory id = ${{ value: next.id, type: 'int4' }} AND state = ${{ value: storage.SyncRuleState.PROCESSING, type: 'varchar' }} `.execute(); - } else if (next == null && active?.slot_name == slot_name) { + } else if (next == null && active?.id == sync_rules_group_id) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ content: active.sync_rules_content, validate: false }); + // Pro-actively stop replicating, but still serve clients with existing data + await this.db.sql` + UPDATE sync_rules + SET + state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} + WHERE + id = ${{ value: active.id, type: 'int4' }} + AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + `.execute(); + } else if (next != null && active?.id == sync_rules_group_id) { + // Already have "next" sync rules - don't update any. + // Pro-actively stop replicating, but still serve clients with existing data await this.db.sql` UPDATE sync_rules diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index c487c5757..c7d54785e 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -60,8 +60,8 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob this.logger.error(`Replication failed on ${this.slotName}`, e); if (e instanceof MissingReplicationSlotError) { - // This stops replication on this slot, and creates a new slot - await this.options.storage.factory.slotRemoved(this.slotName); + // This stops replication on this slot and restarts with a new slot + await this.options.storage.factory.restartReplication(this.storage.group_id); } } finally { this.abortController.abort(); diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index b219a3265..0bdc62d22 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -193,15 +193,15 @@ export abstract class AbstractReplicator; diff --git a/packages/service-core/src/storage/BucketStorageFactory.ts b/packages/service-core/src/storage/BucketStorageFactory.ts index 3512b1cb3..d63252c39 100644 --- a/packages/service-core/src/storage/BucketStorageFactory.ts +++ b/packages/service-core/src/storage/BucketStorageFactory.ts @@ -39,10 +39,8 @@ export interface BucketStorageFactory extends ObserverClient; + restartReplication(sync_rules_group_id: number): Promise; /** * Get the sync rules used for querying. From 83a441fea02ee455e60c70248949260f74dcdd97 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 25 Feb 2025 11:47:33 +0200 Subject: [PATCH 6/8] Skip storageCreated events when terminating sync rules. --- .../src/storage/MongoBucketStorage.ts | 12 +++++++----- .../src/storage/PostgresBucketStorageFactory.ts | 11 ++++++++--- .../src/replication/AbstractReplicator.ts | 2 +- .../src/storage/BucketStorageFactory.ts | 14 +++++++++++++- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 3b05c5cdb..23fd90b0b 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -1,6 +1,6 @@ import { SqlSyncRules } from '@powersync/service-sync-rules'; -import { storage } from '@powersync/service-core'; +import { GetIntanceOptions, storage } from '@powersync/service-core'; import { BaseObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework'; import { v4 as uuid } from 'uuid'; @@ -44,13 +44,15 @@ export class MongoBucketStorage // No-op } - getInstance(options: storage.PersistedSyncRulesContent): MongoSyncBucketStorage { - let { id, slot_name } = options; + getInstance(syncRules: storage.PersistedSyncRulesContent, options?: GetIntanceOptions): MongoSyncBucketStorage { + let { id, slot_name } = syncRules; if ((typeof id as any) == 'bigint') { id = Number(id); } - const storage = new MongoSyncBucketStorage(this, id, options, slot_name); - this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name); + if (!options?.skipLifecycleHooks) { + this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + } storage.registerListener({ batchStarted: (batch) => { batch.registerListener({ diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index a2db0f180..cec373475 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -1,5 +1,5 @@ import * as framework from '@powersync/lib-services-framework'; -import { storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core'; +import { GetIntanceOptions, storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core'; import * as pg_wire from '@powersync/service-jpgwire'; import * as sync_rules from '@powersync/service-sync-rules'; import crypto from 'crypto'; @@ -50,14 +50,19 @@ export class PostgresBucketStorageFactory // This has not been implemented yet. } - getInstance(syncRules: storage.PersistedSyncRulesContent): storage.SyncRulesBucketStorage { + getInstance( + syncRules: storage.PersistedSyncRulesContent, + options?: GetIntanceOptions + ): storage.SyncRulesBucketStorage { const storage = new PostgresSyncRulesStorage({ factory: this, db: this.db, sync_rules: syncRules, batchLimits: this.options.config.batch_limits }); - this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + if (!options?.skipLifecycleHooks) { + this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + } storage.registerListener({ batchStarted: (batch) => { batch.registerListener({ diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index 0bdc62d22..ee2822263 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -209,7 +209,7 @@ export abstract class AbstractReplicator Date: Tue, 25 Feb 2025 12:12:47 +0200 Subject: [PATCH 7/8] Fix postgres handling of missing logical replication slot. --- modules/module-postgres/src/replication/WalStream.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 0950efa3c..83c83baf4 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -239,6 +239,11 @@ export class WalStream { needsNewSlot: r.needsNewSlot }; } else { + if (snapshotDone) { + // This will create a new slot, while keeping the current sync rules active + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); + } + // This will clear data and re-create the same slot return { needsInitialSync: true, needsNewSlot: true }; } } From 1b67367810c22f1770a13d6e6fba394e6af002a3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 25 Feb 2025 12:16:10 +0200 Subject: [PATCH 8/8] Add changeset. --- .changeset/proud-geckos-draw.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/proud-geckos-draw.md diff --git a/.changeset/proud-geckos-draw.md b/.changeset/proud-geckos-draw.md new file mode 100644 index 000000000..6d0423fb5 --- /dev/null +++ b/.changeset/proud-geckos-draw.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +--- + +Keep serving current data when restarting replication due to errors.