@@ -33,12 +33,21 @@ const reportJsonError = (
3333 ) ;
3434} ;
3535
36+ function reportEventClosed ( eventName : string , logger ?: LDLogger ) {
37+ logger ?. debug ( `Received ${ eventName } event after processor was closed. Skipping processing.` ) ;
38+ }
39+
40+ function reportPingClosed ( logger ?: LDLogger ) {
41+ logger ?. debug ( 'Ping completed after processor was closed. Skipping processing.' ) ;
42+ }
43+
3644class StreamingProcessor implements subsystem . LDStreamProcessor {
3745 private readonly _headers : { [ key : string ] : string | string [ ] } ;
3846 private readonly _streamUri : string ;
3947
4048 private _eventSource ?: EventSource ;
4149 private _connectionAttemptStartTime ?: number ;
50+ private _stopped = false ;
4251
4352 constructor (
4453 private readonly _plainContextString : string ,
@@ -157,6 +166,13 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
157166
158167 this . _listeners . forEach ( ( { deserializeData, processJson } , eventName ) => {
159168 eventSource . addEventListener ( eventName , ( event ) => {
169+ // If an event comes in after the processor has been stopped, we skip processing it.
170+ // This event could be for a context which is no longer active.
171+ if ( this . _stopped ) {
172+ reportEventClosed ( eventName , this . _logger ) ;
173+ return ;
174+ }
175+
160176 this . _logger ?. debug ( `Received ${ eventName } event` ) ;
161177
162178 if ( event ?. data ) {
@@ -186,6 +202,12 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
186202 try {
187203 const res = await this . _pollingRequestor . requestPayload ( ) ;
188204 try {
205+ // If the ping completes after the processor has been stopped, then we discard it.
206+ // This event could be for a context which is no longer active.
207+ if ( this . _stopped ) {
208+ reportPingClosed ( this . _logger ) ;
209+ return ;
210+ }
189211 const payload = JSON . parse ( res ) ;
190212 try {
191213 // forward the payload on to the PUT listener
@@ -204,6 +226,12 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
204226 ) ;
205227 }
206228 } catch ( err ) {
229+ if ( this . _stopped ) {
230+ // If the ping errors after the processor has been stopped, then we discard it.
231+ // The original caller would consider this connection no longer active.
232+ reportPingClosed ( this . _logger ) ;
233+ return ;
234+ }
207235 const requestError = err as LDRequestError ;
208236 this . _errorHandler ?.(
209237 new LDPollingError (
@@ -219,6 +247,7 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
219247 stop ( ) {
220248 this . _eventSource ?. close ( ) ;
221249 this . _eventSource = undefined ;
250+ this . _stopped = true ;
222251 }
223252
224253 close ( ) {
0 commit comments