Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/orange-comics-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix sync rule clearing process to not block sync rule processing.
40 changes: 30 additions & 10 deletions packages/service-core/src/replication/AbstractReplicator.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { container, logger } from '@powersync/lib-services-framework';
import { ReplicationMetric } from '@powersync/service-types';
import { hrtime } from 'node:process';
import winston from 'winston';
import { MetricsEngine } from '../metrics/MetricsEngine.js';
import * as storage from '../storage/storage-index.js';
import { StorageEngine } from '../storage/storage-index.js';
import { SyncRulesProvider } from '../util/config/sync-rules/sync-rules-provider.js';
import { AbstractReplicationJob } from './AbstractReplicationJob.js';
import { ErrorRateLimiter } from './ErrorRateLimiter.js';
import { ConnectionTestResult } from './ReplicationModule.js';
import { MetricsEngine } from '../metrics/MetricsEngine.js';
import { ReplicationMetric } from '@powersync/service-types';

// 5 minutes
const PING_INTERVAL = 1_000_000_000n * 300n;
Expand Down Expand Up @@ -40,9 +40,17 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
/**
* Map of replication jobs by sync rule id. Usually there is only one running job, but there could be two when
* transitioning to a new set of sync rules.
* @private
*/
private replicationJobs = new Map<number, T>();

/**
* Map of sync rule ids to promises that are clearing the sync rule configuration.
*
* We primarily do this to keep track of what we're currently clearning, but don't currently
* use the Promise value.
*/
private clearingJobs = new Map<number, Promise<void>>();

/**
* Used for replication lag computation.
*/
Expand Down Expand Up @@ -242,16 +250,26 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
}
}

// Sync rules stopped previously or by a different process.
// Sync rules stopped previously, including by a different process.
const stopped = await this.storage.getStoppedSyncRules();
for (let syncRules of stopped) {
try {
// TODO: Do this in the "background", allowing the periodic refresh to continue
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
await this.terminateSyncRules(syncRuleStorage);
} catch (e) {
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
if (this.clearingJobs.has(syncRules.id)) {
// Already in progress
continue;
}

// We clear storage asynchronously.
// It is important to be able to continue running the refresh loop, otherwise we cannot
// retry locked sync rules, for example.
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
const promise = this.terminateSyncRules(syncRuleStorage)
.catch((e) => {
this.logger.warn(`Failed clean up replication config for sync rule: ${syncRules.id}`, e);
})
.finally(() => {
this.clearingJobs.delete(syncRules.id);
});
this.clearingJobs.set(syncRules.id, promise);
}
}

Expand All @@ -261,6 +279,8 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst

protected async terminateSyncRules(syncRuleStorage: storage.SyncRulesBucketStorage) {
this.logger.info(`Terminating sync rules: ${syncRuleStorage.group_id}...`);
// This deletes postgres replication slots - should complete quickly.
// It is safe to do before or after clearing the data in the storage.
await this.cleanUp(syncRuleStorage);
await syncRuleStorage.terminate({ signal: this.abortController?.signal, clearStorage: true });
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
Expand Down