Skip to content
9 changes: 9 additions & 0 deletions .changeset/proud-geckos-draw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-module-postgres': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
---

Keep serving current data when restarting replication due to errors.
45 changes: 31 additions & 14 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SqlSyncRules } from '@powersync/service-sync-rules';

import { storage } from '@powersync/service-core';
import { GetIntanceOptions, storage } from '@powersync/service-core';

import { BaseObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -44,13 +44,15 @@ export class MongoBucketStorage
// No-op
}

getInstance(options: storage.PersistedSyncRulesContent): MongoSyncBucketStorage {
let { id, slot_name } = options;
getInstance(syncRules: storage.PersistedSyncRulesContent, options?: GetIntanceOptions): MongoSyncBucketStorage {
let { id, slot_name } = syncRules;
if ((typeof id as any) == 'bigint') {
id = Number(id);
}
const storage = new MongoSyncBucketStorage(this, id, options, slot_name);
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name);
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}
storage.registerListener({
batchStarted: (batch) => {
batch.registerListener({
Expand Down Expand Up @@ -95,13 +97,11 @@ export class MongoBucketStorage
}
}

async slotRemoved(slot_name: string) {
async restartReplication(sync_rules_group_id: number) {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();

// In both the below cases, we create a new sync rules instance.
// The current one will continue erroring until the next one has finished processing.
if (next != null && next.slot_name == slot_name) {
if (next != null && next.id == sync_rules_group_id) {
// We need to redo the "next" sync rules
await this.updateSyncRules({
content: next.sync_rules_content,
Expand All @@ -119,22 +119,39 @@ export class MongoBucketStorage
}
}
);
} else if (next == null && active?.slot_name == slot_name) {
} else if (next == null && active?.id == sync_rules_group_id) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
content: active.sync_rules_content,
validate: false
});

// Pro-actively stop replicating
// In this case we keep the old one as active for clients, so that that existing clients
// can still get the latest data while we replicate the new ones.
// It will however not replicate anymore.

await this.db.sync_rules.updateOne(
{
_id: active.id,
state: storage.SyncRuleState.ACTIVE
},
{
$set: {
state: storage.SyncRuleState.STOP
state: storage.SyncRuleState.ERRORED
}
}
);
} else if (next != null && active?.id == sync_rules_group_id) {
// Already have next sync rules, but need to stop replicating the active one.

await this.db.sync_rules.updateOne(
{
_id: active.id,
state: storage.SyncRuleState.ACTIVE
},
{
$set: {
state: storage.SyncRuleState.ERRORED
}
}
);
Expand Down Expand Up @@ -211,7 +228,7 @@ export class MongoBucketStorage
async getActiveSyncRulesContent(): Promise<MongoPersistedSyncRulesContent | null> {
const doc = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
},
{ sort: { _id: -1 }, limit: 1 }
);
Expand Down Expand Up @@ -249,7 +266,7 @@ export class MongoBucketStorage
async getReplicatingSyncRules(): Promise<storage.PersistedSyncRulesContent[]> {
const docs = await this.db.sync_rules
.find({
$or: [{ state: storage.SyncRuleState.ACTIVE }, { state: storage.SyncRuleState.PROCESSING }]
state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE] }
})
.toArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ export class MongoSyncBucketStorage
await this.db.sync_rules.updateMany(
{
_id: { $ne: this.group_id },
state: storage.SyncRuleState.ACTIVE
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
},
{
$set: {
Expand Down Expand Up @@ -657,7 +657,7 @@ export class MongoSyncBucketStorage
doc = await this.db.sync_rules.findOne(
{
_id: syncRulesId,
state: storage.SyncRuleState.ACTIVE
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
},
{
session,
Expand Down Expand Up @@ -728,7 +728,7 @@ export class MongoSyncBucketStorage
// Irrelevant update
continue;
}
if (doc.state != storage.SyncRuleState.ACTIVE) {
if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) {
// Sync rules have changed - abort and restart.
// Should this error instead?
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
this.logger.error(`Replication failed`, e);

if (e instanceof ChangeStreamInvalidatedError) {
// This stops replication on this slot, and creates a new slot
await this.options.storage.factory.slotRemoved(this.slotName);
// This stops replication and restarts with a new instance
await this.options.storage.factory.restartReplication(this.storage.group_id);
}
} finally {
this.abortController.abort();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as framework from '@powersync/lib-services-framework';
import { storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core';
import { GetIntanceOptions, storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core';
import * as pg_wire from '@powersync/service-jpgwire';
import * as sync_rules from '@powersync/service-sync-rules';
import crypto from 'crypto';
Expand Down Expand Up @@ -50,14 +50,19 @@ export class PostgresBucketStorageFactory
// This has not been implemented yet.
}

getInstance(syncRules: storage.PersistedSyncRulesContent): storage.SyncRulesBucketStorage {
getInstance(
syncRules: storage.PersistedSyncRulesContent,
options?: GetIntanceOptions
): storage.SyncRulesBucketStorage {
const storage = new PostgresSyncRulesStorage({
factory: this,
db: this.db,
sync_rules: syncRules,
batchLimits: this.options.config.batch_limits
});
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}
storage.registerListener({
batchStarted: (batch) => {
batch.registerListener({
Expand Down Expand Up @@ -225,13 +230,13 @@ export class PostgresBucketStorageFactory
});
}

async slotRemoved(slot_name: string): Promise<void> {
async restartReplication(sync_rules_group_id: number): Promise<void> {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();

// In both the below cases, we create a new sync rules instance.
// The current one will continue erroring until the next one has finished processing.
if (next != null && next.slot_name == slot_name) {
// The current one will continue serving sync requests until the next one has finished processing.
if (next != null && next.id == sync_rules_group_id) {
// We need to redo the "next" sync rules
await this.updateSyncRules({
content: next.sync_rules_content,
Expand All @@ -246,18 +251,30 @@ export class PostgresBucketStorageFactory
id = ${{ value: next.id, type: 'int4' }}
AND state = ${{ value: storage.SyncRuleState.PROCESSING, type: 'varchar' }}
`.execute();
} else if (next == null && active?.slot_name == slot_name) {
} else if (next == null && active?.id == sync_rules_group_id) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
content: active.sync_rules_content,
validate: false
});

// Pro-actively stop replicating
// Pro-actively stop replicating, but still serve clients with existing data
await this.db.sql`
UPDATE sync_rules
SET
state = ${{ value: storage.SyncRuleState.STOP, type: 'varchar' }}
state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
WHERE
id = ${{ value: active.id, type: 'int4' }}
AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
`.execute();
} else if (next != null && active?.id == sync_rules_group_id) {
// Already have "next" sync rules - don't update any.

// Pro-actively stop replicating, but still serve clients with existing data
await this.db.sql`
UPDATE sync_rules
SET
state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
WHERE
id = ${{ value: active.id, type: 'int4' }}
AND state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
Expand All @@ -279,6 +296,7 @@ export class PostgresBucketStorageFactory
sync_rules
WHERE
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
ORDER BY
id DESC
LIMIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,10 @@ export class PostgresSyncRulesStorage
SET
state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }}
WHERE
state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }}
(
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
)
AND id != ${{ type: 'int4', value: this.group_id }}
`.execute();
});
Expand Down Expand Up @@ -729,6 +732,7 @@ export class PostgresSyncRulesStorage
sync_rules
WHERE
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
ORDER BY
id DESC
LIMIT
Expand Down Expand Up @@ -791,7 +795,8 @@ export class PostgresSyncRulesStorage
FROM
sync_rules
WHERE
state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }}
state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }}
OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }}
LIMIT
1
`
Expand Down
5 changes: 5 additions & 0 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ export class WalStream {
needsNewSlot: r.needsNewSlot
};
} else {
if (snapshotDone) {
// This will create a new slot, while keeping the current sync rules active
throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`);
}
// This will clear data and re-create the same slot
return { needsInitialSync: true, needsNewSlot: true };
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
this.logger.error(`Replication failed on ${this.slotName}`, e);

if (e instanceof MissingReplicationSlotError) {
// This stops replication on this slot, and creates a new slot
await this.options.storage.factory.slotRemoved(this.slotName);
// This stops replication on this slot and restarts with a new slot
await this.options.storage.factory.restartReplication(this.storage.group_id);
}
} finally {
this.abortController.abort();
Expand Down
18 changes: 7 additions & 11 deletions packages/service-core/src/replication/AbstractReplicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,23 +193,23 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst

this.replicationJobs = newJobs;

// Terminate any orphaned jobs that no longer have sync rules
// Stop any orphaned jobs that no longer have sync rules.
// Termination happens below
for (let job of existingJobs.values()) {
// Old - stop and clean up
try {
await job.stop();
await this.terminateSyncRules(job.storage);
} catch (e) {
// This will be retried
this.logger.warn('Failed to terminate old replication job}', e);
this.logger.warn('Failed to stop old replication job}', e);
}
}

// Sync rules stopped previously or by a different process.
const stopped = await this.storage.getStoppedSyncRules();
for (let syncRules of stopped) {
try {
const syncRuleStorage = this.storage.getInstance(syncRules);
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
await this.terminateSyncRules(syncRuleStorage);
} catch (e) {
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
Expand All @@ -223,13 +223,9 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst

protected async terminateSyncRules(syncRuleStorage: storage.SyncRulesBucketStorage) {
this.logger.info(`Terminating sync rules: ${syncRuleStorage.group_id}...`);
try {
await this.cleanUp(syncRuleStorage);
await syncRuleStorage.terminate();
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
} catch (e) {
this.logger.warn(`Failed clean up replication config for sync rules: ${syncRuleStorage.group_id}`, e);
}
await this.cleanUp(syncRuleStorage);
await syncRuleStorage.terminate();
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
}

abstract testConnection(): Promise<ConnectionTestResult>;
Expand Down
13 changes: 11 additions & 2 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export enum SyncRuleState {
/**
* Sync rule processing is done, and can be used for sync.
*
* Only one set of sync rules should be in ACTIVE state.
* Only one set of sync rules should be in ACTIVE or ERRORED state.
*/
ACTIVE = 'ACTIVE',
/**
Expand All @@ -24,7 +24,16 @@ export enum SyncRuleState {
* After sync rules have been stopped, the data needs to be
* deleted. Once deleted, the state is TERMINATED.
*/
TERMINATED = 'TERMINATED'
TERMINATED = 'TERMINATED',

/**
* Sync rules has run into a permanent replication error. It
* is still the "active" sync rules for syncing to users,
* but should not replicate anymore.
*
* It will transition to STOP when a new sync rules is activated.
*/
ERRORED = 'ERRORED'
}

export const DEFAULT_DOCUMENT_BATCH_LIMIT = 1000;
Expand Down
Loading
Loading