@@ -87,7 +87,7 @@ export class CompositeDataSource implements DataSource {
8787 // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after
8888 // secondary has been valid for N many seconds)
8989 let lastState: DataSourceState | undefined;
90- let cancelScheduledTransition: (( ) => void) | undefined ;
90+ let cancelScheduledTransition: () => void = () => {} ;
9191
9292 // this callback handler can be disabled and ensures only one transition request occurs
9393 const callbackHandler = new CallbackHandler(
@@ -97,7 +97,7 @@ export class CompositeDataSource implements DataSource {
9797 if (basis && this._initPhaseActive) {
9898 // transition to sync if we get basis during init
9999 callbackHandler.disable();
100- cancelScheduledTransition?.( );
100+ this._consumeCancelToken(cancelScheduledTransition );
101101 transitionResolve({ transition: 'switchToSync' });
102102 }
103103 },
@@ -109,19 +109,19 @@ export class CompositeDataSource implements DataSource {
109109 if (err || state === DataSourceState.Closed) {
110110 callbackHandler.disable();
111111 statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition
112- cancelScheduledTransition?.( );
112+ this._consumeCancelToken(cancelScheduledTransition );
113113 transitionResolve({ transition: 'fallback', err }); // unrecoverable error has occurred, so fallback
114114 } else {
115115 statusCallback(state, null); // report the status upward
116116 if (state !== lastState) {
117117 lastState = state;
118- cancelScheduledTransition?.( ); // cancel previously scheduled status transition if one was scheduled
118+ this._consumeCancelToken(cancelScheduledTransition ); // cancel previously scheduled status transition if one was scheduled
119119 const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it
120120 const condition = this._lookupTransitionCondition(state, excludeRecovery);
121121 if (condition) {
122122 const { promise, cancel } = this._cancellableDelay(condition.durationMS);
123- this._cancelTokens.push(cancel);
124123 cancelScheduledTransition = cancel;
124+ this._cancelTokens.push(cancelScheduledTransition);
125125 promise.then(() => {
126126 callbackHandler.disable();
127127 transitionResolve({ transition: condition.transition });
@@ -161,15 +161,18 @@ export class CompositeDataSource implements DataSource {
161161 if (transitionRequest.err && transitionRequest.transition !== 'stop') {
162162 // if the transition was due to an error, throttle the transition
163163 const delay = this._backoff.fail();
164- const { promise, cancel } = this._cancellableDelay(delay);
165- this._cancelTokens.push(cancel );
164+ const { promise, cancel: cancelDelay } = this._cancellableDelay(delay);
165+ this._cancelTokens.push(cancelDelay );
166166 const delayedTransition = promise.then(() => transitionRequest);
167167
168168 // race the delayed transition and external transition requests to be responsive
169169 transitionRequest = await Promise.race([
170170 delayedTransition,
171171 this._externalTransitionPromise,
172172 ]);
173+
174+ // consume the delay cancel token (even if it resolved, need to stop tracking its token)
175+ this._consumeCancelToken(cancelDelay);
173176 }
174177
175178 if (transitionRequest.transition === 'stop') {
@@ -188,6 +191,7 @@ export class CompositeDataSource implements DataSource {
188191
189192 async stop() {
190193 this._cancelTokens.forEach((cancel) => cancel());
194+ this._cancelTokens = [];
191195 this._externalTransitionResolve?.({ transition: 'stop' });
192196 }
193197
@@ -274,4 +278,12 @@ export class CompositeDataSource implements DataSource {
274278 },
275279 };
276280 };
281+
282+ private _consumeCancelToken(cancel: () => void) {
283+ cancel();
284+ const index = this._cancelTokens.indexOf(cancel, 0);
285+ if (index > -1) {
286+ this._cancelTokens.splice(index, 1);
287+ }
288+ }
277289}
0 commit comments