Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/orange-comics-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix sync rule clearing process to not block sync rule processing.
40 changes: 30 additions & 10 deletions packages/service-core/src/replication/AbstractReplicator.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { container, logger } from '@powersync/lib-services-framework';
import { ReplicationMetric } from '@powersync/service-types';
import { hrtime } from 'node:process';
import winston from 'winston';
import { MetricsEngine } from '../metrics/MetricsEngine.js';
import * as storage from '../storage/storage-index.js';
import { StorageEngine } from '../storage/storage-index.js';
import { SyncRulesProvider } from '../util/config/sync-rules/sync-rules-provider.js';
import { AbstractReplicationJob } from './AbstractReplicationJob.js';
import { ErrorRateLimiter } from './ErrorRateLimiter.js';
import { ConnectionTestResult } from './ReplicationModule.js';
import { MetricsEngine } from '../metrics/MetricsEngine.js';
import { ReplicationMetric } from '@powersync/service-types';

// 5 minutes
const PING_INTERVAL = 1_000_000_000n * 300n;
Expand Down Expand Up @@ -40,9 +40,17 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
/**
* Map of replication jobs by sync rule id. Usually there is only one running job, but there could be two when
* transitioning to a new set of sync rules.
* @private
*/
private replicationJobs = new Map<number, T>();

/**
* Map of sync rule ids to promises that are clearing the sync rule configuration.
*
* We primarily do this to keep track of what we're currently clearing, but don't currently
* use the Promise value.
*/
private clearingJobs = new Map<number, Promise<void>>();

/**
* Used for replication lag computation.
*/
Expand Down Expand Up @@ -242,16 +250,26 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
}
}

// Sync rules stopped previously or by a different process.
// Sync rules stopped previously, including by a different process.
const stopped = await this.storage.getStoppedSyncRules();
for (let syncRules of stopped) {
try {
// TODO: Do this in the "background", allowing the periodic refresh to continue
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
await this.terminateSyncRules(syncRuleStorage);
} catch (e) {
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
if (this.clearingJobs.has(syncRules.id)) {
// Already in progress
continue;
}

// We clear storage asynchronously.
// It is important to be able to continue running the refresh loop, otherwise we cannot
// retry locked sync rules, for example.
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
const promise = this.terminateSyncRules(syncRuleStorage)
.catch((e) => {
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
})
.finally(() => {
this.clearingJobs.delete(syncRules.id);
});
this.clearingJobs.set(syncRules.id, promise);
}
}

Expand All @@ -261,6 +279,8 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst

protected async terminateSyncRules(syncRuleStorage: storage.SyncRulesBucketStorage) {
this.logger.info(`Terminating sync rules: ${syncRuleStorage.group_id}...`);
// This deletes postgres replication slots - should complete quickly.
// It is safe to do before or after clearing the data in the storage.
await this.cleanUp(syncRuleStorage);
await syncRuleStorage.terminate({ signal: this.abortController?.signal, clearStorage: true });
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
Expand Down