diff --git a/.changeset/proud-geckos-draw.md b/.changeset/proud-geckos-draw.md new file mode 100644 index 000000000..6d0423fb5 --- /dev/null +++ b/.changeset/proud-geckos-draw.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +--- + +Keep serving current data when restarting replication due to errors. diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 22f71dd7f..23fd90b0b 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -1,6 +1,6 @@ import { SqlSyncRules } from '@powersync/service-sync-rules'; -import { storage } from '@powersync/service-core'; +import { GetIntanceOptions, storage } from '@powersync/service-core'; import { BaseObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework'; import { v4 as uuid } from 'uuid'; @@ -44,13 +44,15 @@ export class MongoBucketStorage // No-op } - getInstance(options: storage.PersistedSyncRulesContent): MongoSyncBucketStorage { - let { id, slot_name } = options; + getInstance(syncRules: storage.PersistedSyncRulesContent, options?: GetIntanceOptions): MongoSyncBucketStorage { + let { id, slot_name } = syncRules; if ((typeof id as any) == 'bigint') { id = Number(id); } - const storage = new MongoSyncBucketStorage(this, id, options, slot_name); - this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name); + if (!options?.skipLifecycleHooks) { + this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + } storage.registerListener({ batchStarted: (batch) => { batch.registerListener({ @@ -95,13 +97,11 @@ export class MongoBucketStorage } } - async slotRemoved(slot_name: string) { + async restartReplication(sync_rules_group_id: number) { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); - // In both the below cases, we create a new sync rules instance. - // The current one will continue erroring until the next one has finished processing. - if (next != null && next.slot_name == slot_name) { + if (next != null && next.id == sync_rules_group_id) { // We need to redo the "next" sync rules await this.updateSyncRules({ content: next.sync_rules_content, @@ -119,14 +119,17 @@ export class MongoBucketStorage } } ); - } else if (next == null && active?.slot_name == slot_name) { + } else if (next == null && active?.id == sync_rules_group_id) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ content: active.sync_rules_content, validate: false }); - // Pro-actively stop replicating + // In this case we keep the old one as active for clients, so that that existing clients + // can still get the latest data while we replicate the new ones. + // It will however not replicate anymore. + await this.db.sync_rules.updateOne( { _id: active.id, @@ -134,7 +137,21 @@ export class MongoBucketStorage }, { $set: { - state: storage.SyncRuleState.STOP + state: storage.SyncRuleState.ERRORED + } + } + ); + } else if (next != null && active?.id == sync_rules_group_id) { + // Already have next sync rules, but need to stop replicating the active one. + + await this.db.sync_rules.updateOne( + { + _id: active.id, + state: storage.SyncRuleState.ACTIVE + }, + { + $set: { + state: storage.SyncRuleState.ERRORED } } ); @@ -211,7 +228,7 @@ export class MongoBucketStorage async getActiveSyncRulesContent(): Promise { const doc = await this.db.sync_rules.findOne( { - state: storage.SyncRuleState.ACTIVE + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { sort: { _id: -1 }, limit: 1 } ); @@ -249,7 +266,7 @@ export class MongoBucketStorage async getReplicatingSyncRules(): Promise { const docs = await this.db.sync_rules .find({ - $or: [{ state: storage.SyncRuleState.ACTIVE }, { state: storage.SyncRuleState.PROCESSING }] + state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE] } }) .toArray(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 0d0187ef8..2a7873653 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -604,7 +604,7 @@ export class MongoSyncBucketStorage await this.db.sync_rules.updateMany( { _id: { $ne: this.group_id }, - state: storage.SyncRuleState.ACTIVE + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { $set: { @@ -657,7 +657,7 @@ export class MongoSyncBucketStorage doc = await this.db.sync_rules.findOne( { _id: syncRulesId, - state: storage.SyncRuleState.ACTIVE + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { session, @@ -728,7 +728,7 @@ export class MongoSyncBucketStorage // Irrelevant update continue; } - if (doc.state != storage.SyncRuleState.ACTIVE) { + if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { // Sync rules have changed - abort and restart. // Should this error instead? break; diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index e2ba6a24a..a9b2fb6dd 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -40,8 +40,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ this.logger.error(`Replication failed`, e); if (e instanceof ChangeStreamInvalidatedError) { - // This stops replication on this slot, and creates a new slot - await this.options.storage.factory.slotRemoved(this.slotName); + // This stops replication and restarts with a new instance + await this.options.storage.factory.restartReplication(this.storage.group_id); } } finally { this.abortController.abort(); diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index 2ef50a697..cec373475 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -1,5 +1,5 @@ import * as framework from '@powersync/lib-services-framework'; -import { storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core'; +import { GetIntanceOptions, storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core'; import * as pg_wire from '@powersync/service-jpgwire'; import * as sync_rules from '@powersync/service-sync-rules'; import crypto from 'crypto'; @@ -50,14 +50,19 @@ export class PostgresBucketStorageFactory // This has not been implemented yet. } - getInstance(syncRules: storage.PersistedSyncRulesContent): storage.SyncRulesBucketStorage { + getInstance( + syncRules: storage.PersistedSyncRulesContent, + options?: GetIntanceOptions + ): storage.SyncRulesBucketStorage { const storage = new PostgresSyncRulesStorage({ factory: this, db: this.db, sync_rules: syncRules, batchLimits: this.options.config.batch_limits }); - this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + if (!options?.skipLifecycleHooks) { + this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); + } storage.registerListener({ batchStarted: (batch) => { batch.registerListener({ @@ -225,13 +230,13 @@ export class PostgresBucketStorageFactory }); } - async slotRemoved(slot_name: string): Promise { + async restartReplication(sync_rules_group_id: number): Promise { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); // In both the below cases, we create a new sync rules instance. - // The current one will continue erroring until the next one has finished processing. - if (next != null && next.slot_name == slot_name) { + // The current one will continue serving sync requests until the next one has finished processing. + if (next != null && next.id == sync_rules_group_id) { // We need to redo the "next" sync rules await this.updateSyncRules({ content: next.sync_rules_content, @@ -246,18 +251,30 @@ export class PostgresBucketStorageFactory id = ${{ value: next.id, type: 'int4' }} AND state = ${{ value: storage.SyncRuleState.PROCESSING, type: 'varchar' }} `.execute(); - } else if (next == null && active?.slot_name == slot_name) { + } else if (next == null && active?.id == sync_rules_group_id) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ content: active.sync_rules_content, validate: false }); - // Pro-actively stop replicating + // Pro-actively stop replicating, but still serve clients with existing data await this.db.sql` UPDATE sync_rules SET - state = ${{ value: storage.SyncRuleState.STOP, type: 'varchar' }} + state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} + WHERE + id = ${{ value: active.id, type: 'int4' }} + AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + `.execute(); + } else if (next != null && active?.id == sync_rules_group_id) { + // Already have "next" sync rules - don't update any. + + // Pro-actively stop replicating, but still serve clients with existing data + await this.db.sql` + UPDATE sync_rules + SET + state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} WHERE id = ${{ value: active.id, type: 'int4' }} AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} @@ -279,6 +296,7 @@ export class PostgresBucketStorageFactory sync_rules WHERE state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} ORDER BY id DESC LIMIT diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 24a85f803..0a18c17bb 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -657,7 +657,10 @@ export class PostgresSyncRulesStorage SET state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} WHERE - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + ( + state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} + ) AND id != ${{ type: 'int4', value: this.group_id }} `.execute(); }); @@ -729,6 +732,7 @@ export class PostgresSyncRulesStorage sync_rules WHERE state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} ORDER BY id DESC LIMIT @@ -791,7 +795,8 @@ export class PostgresSyncRulesStorage FROM sync_rules WHERE - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} LIMIT 1 ` diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 0950efa3c..83c83baf4 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -239,6 +239,11 @@ export class WalStream { needsNewSlot: r.needsNewSlot }; } else { + if (snapshotDone) { + // This will create a new slot, while keeping the current sync rules active + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); + } + // This will clear data and re-create the same slot return { needsInitialSync: true, needsNewSlot: true }; } } diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index c487c5757..c7d54785e 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -60,8 +60,8 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob this.logger.error(`Replication failed on ${this.slotName}`, e); if (e instanceof MissingReplicationSlotError) { - // This stops replication on this slot, and creates a new slot - await this.options.storage.factory.slotRemoved(this.slotName); + // This stops replication on this slot and restarts with a new slot + await this.options.storage.factory.restartReplication(this.storage.group_id); } } finally { this.abortController.abort(); diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index b219a3265..ee2822263 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -193,15 +193,15 @@ export abstract class AbstractReplicator; diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 2a2787fa0..7f749c8f9 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -12,7 +12,7 @@ export enum SyncRuleState { /** * Sync rule processing is done, and can be used for sync. * - * Only one set of sync rules should be in ACTIVE state. + * Only one set of sync rules should be in ACTIVE or ERRORED state. */ ACTIVE = 'ACTIVE', /** @@ -24,7 +24,16 @@ export enum SyncRuleState { * After sync rules have been stopped, the data needs to be * deleted. Once deleted, the state is TERMINATED. */ - TERMINATED = 'TERMINATED' + TERMINATED = 'TERMINATED', + + /** + * Sync rules has run into a permanent replication error. It + * is still the "active" sync rules for syncing to users, + * but should not replicate anymore. + * + * It will transition to STOP when a new sync rules is activated. + */ + ERRORED = 'ERRORED' } export const DEFAULT_DOCUMENT_BATCH_LIMIT = 1000; diff --git a/packages/service-core/src/storage/BucketStorageFactory.ts b/packages/service-core/src/storage/BucketStorageFactory.ts index 3512b1cb3..faf02e4b7 100644 --- a/packages/service-core/src/storage/BucketStorageFactory.ts +++ b/packages/service-core/src/storage/BucketStorageFactory.ts @@ -23,7 +23,7 @@ export interface BucketStorageFactory extends ObserverClient; + restartReplication(sync_rules_group_id: number): Promise; /** * Get the sync rules used for querying. @@ -125,6 +123,18 @@ export interface UpdateSyncRulesOptions { validate?: boolean; } +export interface GetIntanceOptions { + /** + * Set to true to skip trigger any events for creating the instance. + * + * This is used when creating the instance only for clearing data. + * + * When this is used, note that some functionality such as write checkpoint mode + * may not be configured correctly. + */ + skipLifecycleHooks?: boolean; +} + export interface BucketStorageSystemIdentifier { /** * A unique identifier for the system used for storage.