@@ -188,7 +188,6 @@ enum TheAction
188188 IConnectableObservable < T > dataPump ;
189189 IConnectableObservable < Unit > cachePump ;
190190 ConcurrentQueue < ActionData > cache ;
191- Queue < ActionData > processingQueue ;
192191
193192 ReplaySubject < Unit > signalHaveData ;
194193 ReplaySubject < Unit > signalNeedData ;
@@ -213,7 +212,6 @@ enum TheAction
213212 readonly Dictionary < T , int > filteredIndexCache = new Dictionary < T , int > ( ) ;
214213
215214 bool originalSourceIsCompleted ;
216- bool signaledOriginalSourceCompletion ;
217215 bool sourceHasData ;
218216 ReplaySubject < Unit > originalSourceCompleted ;
219217 public IObservable < Unit > OriginalCompleted => originalSourceCompleted ;
@@ -239,7 +237,6 @@ public TrackingCollection(Func<T, T, int> comparer = null, Func<T, int, IList<T>
239237 Func < T , T , int > newer = null , IScheduler scheduler = null )
240238 {
241239 cache = new ConcurrentQueue < ActionData > ( ) ;
242- processingQueue = new Queue < ActionData > ( ) ;
243240 ProcessingDelay = TimeSpan . FromMilliseconds ( 10 ) ;
244241 fuzziness = TimeSpan . FromMilliseconds ( 1 ) ;
245242
@@ -303,8 +300,13 @@ public IObservable<T> Listen(IObservable<T> obs)
303300 originalSourceIsCompleted = true ;
304301 if ( ! sourceHasData )
305302 {
306- var end = new ActionData ( TheAction . End , null ) ;
307- dataListener . OnNext ( end ) ;
303+ originalSourceCompleted . OnNext ( Unit . Default ) ;
304+ originalSourceCompleted . OnCompleted ( ) ;
305+ }
306+ else
307+ {
308+ cache . Enqueue ( new ActionData ( TheAction . End , null ) ) ;
309+ signalHaveData . OnNext ( Unit . Default ) ;
308310 }
309311 } )
310312 . Publish ( ) ;
@@ -323,7 +325,6 @@ public IObservable<T> Listen(IObservable<T> obs)
323325 var data = GetFromQueue ( ) ;
324326 if ( ! data . Equals ( ActionData . Default ) )
325327 {
326- processingQueue . Enqueue ( data ) ;
327328 dataListener . OnNext ( data ) ;
328329 }
329330 return Unit . Default ;
@@ -360,32 +361,13 @@ public IObservable<T> Listen(IObservable<T> obs)
360361 } )
361362 . Do ( data =>
362363 {
363- // only objects coming from the original observable go into this queue
364- if ( processingQueue . Count > 0 )
365- processingQueue . Dequeue ( ) ;
366- if ( ManualProcessing && processingQueue . Count == 0 )
364+ if ( data . TheAction == TheAction . End )
367365 {
368- // if we've finished processing we need to raise the Completed event on
369- // the originalSourceCompleted subject, but we want to do this only
370- // after listeners have received the last item, so set a flag and
371- // insert a fake object in the queue so it triggers the Completed
372- // event on the next processing loop
373- if ( ! signaledOriginalSourceCompletion )
374- {
375- if ( data . TheAction != TheAction . End )
376- {
377- var end = new ActionData ( TheAction . End , null ) ;
378- dataListener . OnNext ( end ) ;
379- }
380- else
381- {
382- signaledOriginalSourceCompletion = true ;
383- originalSourceCompleted . OnNext ( Unit . Default ) ;
384- originalSourceCompleted . OnCompleted ( ) ;
385- }
386- }
366+ originalSourceCompleted . OnNext ( Unit . Default ) ;
367+ originalSourceCompleted . OnCompleted ( ) ;
387368 }
388- else
369+
370+ if ( ! ManualProcessing )
389371 signalNeedData . OnNext ( Unit . Default ) ;
390372 } )
391373 . Where ( data => data . Item != null )
@@ -1197,10 +1179,8 @@ void Reset()
11971179 pumpDisposables . Clear ( ) ;
11981180 disposables . Clear ( ) ;
11991181 originalSourceIsCompleted = false ;
1200- signaledOriginalSourceCompletion = false ;
12011182 sourceHasData = false ;
12021183 cache = new ConcurrentQueue < ActionData > ( ) ;
1203- processingQueue = new Queue < ActionData > ( ) ;
12041184 dataListener = new ReplaySubject < ActionData > ( ) ;
12051185 disposables . Add ( dataListener ) ;
12061186 signalHaveData = new ReplaySubject < Unit > ( ) ;
0 commit comments