11/* eslint-disable no-await-in-loop */
22import { Backoff } from '../../../datasource/Backoff' ;
3+ import { LDLogger } from '../../logging' ;
34import { CallbackHandler } from './CallbackHandler' ;
45import {
56 Data ,
@@ -46,6 +47,7 @@ export class CompositeDataSource implements DataSource {
4647 private _stopped : boolean = true ;
4748 private _externalTransitionPromise : Promise < TransitionRequest > ;
4849 private _externalTransitionResolve ?: ( value : TransitionRequest ) => void ;
50+ private _cancelTokens : ( ( ) => void ) [ ] = [ ] ;
4951
5052 /**
5153 * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order.
@@ -56,6 +58,7 @@ export class CompositeDataSource implements DataSource {
5658 private readonly _synchronizers : SynchronizerFactory [ ] ,
5759 private readonly _transitionConditions : TransitionConditions ,
5860 private readonly _backoff : Backoff ,
61+ private readonly _logger ?: LDLogger ,
5962 ) {
6063 this . _externalTransitionPromise = new Promise < TransitionRequest > ( ( resolveTransition ) => {
6164 this . _externalTransitionResolve = resolveTransition ;
@@ -70,6 +73,7 @@ export class CompositeDataSource implements DataSource {
7073 ) : Promise < void > {
7174 if ( ! this . _stopped ) {
7275 // don't allow multiple simultaneous runs
76+ this . _logger ?. info ( 'CompositeDataSource already running. Ignoring call to start.' ) ;
7377 return ;
7478 }
7579 this . _stopped = false ;
@@ -116,6 +120,7 @@ export class CompositeDataSource implements DataSource {
116120 const condition = this . _lookupTransitionCondition ( state , excludeRecovery ) ;
117121 if ( condition ) {
118122 const { promise, cancel } = this . _cancellableDelay ( condition . durationMS ) ;
123+ this . _cancelTokens . push ( cancel ) ;
119124 cancelScheduledTransition = cancel ;
120125 promise . then ( ( ) => {
121126 callbackHandler . disable ( ) ;
@@ -129,7 +134,7 @@ export class CompositeDataSource implements DataSource {
129134 } ,
130135 ) ;
131136 currentDS . run (
132- ( basis , data ) => callbackHandler . dataHanlder ( basis , data ) ,
137+ ( basis , data ) => callbackHandler . dataHandler ( basis , data ) ,
133138 ( status , err ) => callbackHandler . statusHandler ( status , err ) ,
134139 ) ;
135140 } else {
@@ -156,9 +161,9 @@ export class CompositeDataSource implements DataSource {
156161 if ( transitionRequest . err && transitionRequest . transition !== 'stop' ) {
157162 // if the transition was due to an error, throttle the transition
158163 const delay = this . _backoff . fail ( ) ;
159- const delayedTransition = new Promise ( ( resolve ) => {
160- setTimeout ( resolve , delay ) ;
161- } ) . then ( ( ) => transitionRequest ) ;
164+ const { promise , cancel } = this . _cancellableDelay ( delay ) ;
165+ this . _cancelTokens . push ( cancel ) ;
166+ const delayedTransition = promise . then ( ( ) => transitionRequest ) ;
162167
163168 // race the delayed transition and external transition requests to be responsive
164169 transitionRequest = await Promise . race ( [
@@ -170,6 +175,7 @@ export class CompositeDataSource implements DataSource {
170175 if ( transitionRequest . transition === 'stop' ) {
171176 // exit the loop
172177 statusCallback ( DataSourceState . Closed , transitionRequest . err ) ;
178+ lastTransition = transitionRequest . transition ;
173179 break ;
174180 }
175181
@@ -181,6 +187,7 @@ export class CompositeDataSource implements DataSource {
181187 }
182188
183189 async stop ( ) {
190+ this . _cancelTokens . forEach ( ( cancel ) => cancel ( ) ) ;
184191 this . _externalTransitionResolve ?.( { transition : 'stop' } ) ;
185192 }
186193
@@ -254,19 +261,15 @@ export class CompositeDataSource implements DataSource {
254261
255262 private _cancellableDelay = ( delayMS : number ) => {
256263 let timeout : ReturnType < typeof setTimeout > | undefined ;
257- let reject : ( ( reason ?: any ) => void ) | undefined ;
258- const promise = new Promise ( ( res , rej ) => {
264+ const promise = new Promise ( ( res , _ ) => {
259265 timeout = setTimeout ( res , delayMS ) ;
260- reject = rej ;
261266 } ) ;
262267 return {
263268 promise,
264269 cancel ( ) {
265270 if ( timeout ) {
266271 clearTimeout ( timeout ) ;
267- reject ?.( ) ;
268272 timeout = undefined ;
269- reject = undefined ;
270273 }
271274 } ,
272275 } ;
0 commit comments