11import { container , logger } from '@powersync/lib-services-framework' ;
2+ import { ReplicationMetric } from '@powersync/service-types' ;
23import { hrtime } from 'node:process' ;
34import winston from 'winston' ;
5+ import { MetricsEngine } from '../metrics/MetricsEngine.js' ;
46import * as storage from '../storage/storage-index.js' ;
57import { StorageEngine } from '../storage/storage-index.js' ;
68import { SyncRulesProvider } from '../util/config/sync-rules/sync-rules-provider.js' ;
79import { AbstractReplicationJob } from './AbstractReplicationJob.js' ;
810import { ErrorRateLimiter } from './ErrorRateLimiter.js' ;
911import { ConnectionTestResult } from './ReplicationModule.js' ;
10- import { MetricsEngine } from '../metrics/MetricsEngine.js' ;
11- import { ReplicationMetric } from '@powersync/service-types' ;
1212
1313// 5 minutes
1414const PING_INTERVAL = 1_000_000_000n * 300n ;
@@ -40,9 +40,17 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
4040 /**
4141 * Map of replication jobs by sync rule id. Usually there is only one running job, but there could be two when
4242 * transitioning to a new set of sync rules.
43- * @private
4443 */
4544 private replicationJobs = new Map < number , T > ( ) ;
45+
46+ /**
47+ * Map of sync rule ids to promises that are clearing the sync rule configuration.
48+ *
49+ * We primarily do this to keep track of what we're currently clearing, but don't currently
50+ * use the Promise value.
51+ */
52+ private clearingJobs = new Map < number , Promise < void > > ( ) ;
53+
4654 /**
4755 * Used for replication lag computation.
4856 */
@@ -242,16 +250,26 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
242250 }
243251 }
244252
245- // Sync rules stopped previously or by a different process.
253+ // Sync rules stopped previously, including by a different process.
246254 const stopped = await this . storage . getStoppedSyncRules ( ) ;
247255 for ( let syncRules of stopped ) {
248- try {
249- // TODO: Do this in the "background", allowing the periodic refresh to continue
250- const syncRuleStorage = this . storage . getInstance ( syncRules , { skipLifecycleHooks : true } ) ;
251- await this . terminateSyncRules ( syncRuleStorage ) ;
252- } catch ( e ) {
253- this . logger . warn ( `Failed clean up replication config for sync rule: ${ syncRules . id } ` , e ) ;
256+ if ( this . clearingJobs . has ( syncRules . id ) ) {
257+ // Already in progress
258+ continue ;
254259 }
260+
261+ // We clear storage asynchronously.
262+ // It is important to be able to continue running the refresh loop, otherwise we cannot
263+ // retry locked sync rules, for example.
264+ const syncRuleStorage = this . storage . getInstance ( syncRules , { skipLifecycleHooks : true } ) ;
265+ const promise = this . terminateSyncRules ( syncRuleStorage )
266+ . catch ( ( e ) => {
267+ this . logger . warn ( `Failed clean up replication config for sync rule: ${ syncRules . id } ` , e ) ;
268+ } )
269+ . finally ( ( ) => {
270+ this . clearingJobs . delete ( syncRules . id ) ;
271+ } ) ;
272+ this . clearingJobs . set ( syncRules . id , promise ) ;
255273 }
256274 }
257275
@@ -261,6 +279,8 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
261279
262280 protected async terminateSyncRules ( syncRuleStorage : storage . SyncRulesBucketStorage ) {
263281 this . logger . info ( `Terminating sync rules: ${ syncRuleStorage . group_id } ...` ) ;
282+ // This deletes postgres replication slots - should complete quickly.
283+ // It is safe to do before or after clearing the data in the storage.
264284 await this . cleanUp ( syncRuleStorage ) ;
265285 await syncRuleStorage . terminate ( { signal : this . abortController ?. signal , clearStorage : true } ) ;
266286 this . logger . info ( `Successfully terminated sync rules: ${ syncRuleStorage . group_id } ` ) ;
0 commit comments