@@ -48,11 +48,11 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
4848 */
4949 private activeReplicationJob : T | undefined = undefined ;
5050
51- private stopped = false ;
52-
5351 // First ping is only after 5 minutes, not when starting
5452 private lastPing = hrtime . bigint ( ) ;
5553
54+ private abortController : AbortController | undefined ;
55+
5656 protected constructor ( private options : AbstractReplicatorOptions ) {
5757 this . logger = logger . child ( { name : `Replicator:${ options . id } ` } ) ;
5858 }
@@ -85,7 +85,12 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
8585 return this . options . metricsEngine ;
8686 }
8787
88+ protected get stopped ( ) {
89+ return this . abortController ?. signal . aborted ;
90+ }
91+
8892 public async start ( ) : Promise < void > {
93+ this . abortController = new AbortController ( ) ;
8994 this . runLoop ( ) . catch ( ( e ) => {
9095 this . logger . error ( 'Data source fatal replication error' , e ) ;
9196 container . reporter . captureException ( e ) ;
@@ -107,7 +112,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
107112 }
108113
109114 public async stop ( ) : Promise < void > {
110- this . stopped = true ;
115+ this . abortController ?. abort ( ) ;
111116 let promises : Promise < void > [ ] = [ ] ;
112117 for ( const job of this . replicationJobs . values ( ) ) {
113118 promises . push ( job . stop ( ) ) ;
@@ -241,6 +246,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
241246 const stopped = await this . storage . getStoppedSyncRules ( ) ;
242247 for ( let syncRules of stopped ) {
243248 try {
249+ // TODO: Do this in the "background", allowing the periodic refresh to continue
244250 const syncRuleStorage = this . storage . getInstance ( syncRules , { skipLifecycleHooks : true } ) ;
245251 await this . terminateSyncRules ( syncRuleStorage ) ;
246252 } catch ( e ) {
@@ -256,7 +262,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
256262 protected async terminateSyncRules ( syncRuleStorage : storage . SyncRulesBucketStorage ) {
257263 this . logger . info ( `Terminating sync rules: ${ syncRuleStorage . group_id } ...` ) ;
258264 await this . cleanUp ( syncRuleStorage ) ;
259- await syncRuleStorage . terminate ( ) ;
265+ await syncRuleStorage . terminate ( { signal : this . abortController ?. signal , clearStorage : true } ) ;
260266 this . logger . info ( `Successfully terminated sync rules: ${ syncRuleStorage . group_id } ` ) ;
261267 }
262268
0 commit comments