@@ -174,7 +174,8 @@ enum TheAction
174174 Add ,
175175 Insert ,
176176 Remove ,
177- Ignore
177+ Ignore ,
178+ End
178179 }
179180
180181 bool isChanging ;
@@ -211,7 +212,7 @@ enum TheAction
211212 readonly Dictionary < T , int > filteredIndexCache = new Dictionary < T , int > ( ) ;
212213
213214 bool originalSourceIsCompleted ;
214- bool signalOriginalSourceCompletion ;
215+ bool sourceHasData ;
215216 ReplaySubject < Unit > originalSourceCompleted ;
216217 public IObservable < Unit > OriginalCompleted => originalSourceCompleted ;
217218
@@ -287,6 +288,7 @@ public IObservable<T> Listen(IObservable<T> obs)
287288 } )
288289 . Do ( data =>
289290 {
291+ sourceHasData = true ;
290292 cache . Enqueue ( new ActionData ( data ) ) ;
291293 signalHaveData . OnNext ( Unit . Default ) ;
292294 } )
@@ -296,16 +298,15 @@ public IObservable<T> Listen(IObservable<T> obs)
296298 return ;
297299
298300 originalSourceIsCompleted = true ;
299- if ( ! cache . IsEmpty )
301+ if ( ! sourceHasData )
300302 {
301- signalOriginalSourceCompletion = true ;
303+ originalSourceCompleted . OnNext ( Unit . Default ) ;
304+ originalSourceCompleted . OnCompleted ( ) ;
302305 }
303306 else
304307 {
305- originalSourceCompleted . OnNext ( Unit . Default ) ;
306- originalSourceCompleted . OnCompleted ( ) ;
307- signalNeedData . OnCompleted ( ) ;
308- signalHaveData . OnCompleted ( ) ;
308+ cache . Enqueue ( new ActionData ( TheAction . End , null ) ) ;
309+ signalHaveData . OnNext ( Unit . Default ) ;
309310 }
310311 } )
311312 . Publish ( ) ;
@@ -321,16 +322,23 @@ public IObservable<T> Listen(IObservable<T> obs)
321322 {
322323 var delay = CalculateProcessingDelay ( interval ) ;
323324 waitHandle . Wait ( delay ) ;
324- dataListener . OnNext ( GetFromQueue ( ) ) ;
325+ var data = GetFromQueue ( ) ;
326+ if ( ! data . Equals ( ActionData . Default ) )
327+ {
328+ dataListener . OnNext ( data ) ;
329+ }
325330 return Unit . Default ;
326331 } )
327332 . Publish ( ) ;
328333
329334 source = dataListener
330- . Where ( data => data . Item != null )
335+ . Where ( data => data . Item != null || data . TheAction == TheAction . End )
331336 . ObserveOn ( scheduler )
332337 . Select ( data =>
333338 {
339+ if ( data . TheAction == TheAction . End )
340+ return data ;
341+
334342 data = ProcessItem ( data , original ) ;
335343
336344 // if we're removing an item that doesn't exist, ignore it
@@ -351,18 +359,15 @@ public IObservable<T> Listen(IObservable<T> obs)
351359 data = FilteredRemove ( data ) ;
352360 return data ;
353361 } )
354- . Do ( _ =>
362+ . Do ( data =>
355363 {
356- if ( ManualProcessing )
364+ if ( data . TheAction == TheAction . End )
357365 {
358- if ( signalOriginalSourceCompletion )
359- {
360- signalOriginalSourceCompletion = false ;
361- originalSourceCompleted . OnNext ( Unit . Default ) ;
362- originalSourceCompleted . OnCompleted ( ) ;
363- }
366+ originalSourceCompleted . OnNext ( Unit . Default ) ;
367+ originalSourceCompleted . OnCompleted ( ) ;
364368 }
365- else
369+
370+ if ( ! ManualProcessing )
366371 signalNeedData . OnNext ( Unit . Default ) ;
367372 } )
368373 . Where ( data => data . Item != null )
@@ -1174,7 +1179,7 @@ void Reset()
11741179 pumpDisposables . Clear ( ) ;
11751180 disposables . Clear ( ) ;
11761181 originalSourceIsCompleted = false ;
1177- signalOriginalSourceCompletion = false ;
1182+ sourceHasData = false ;
11781183 cache = new ConcurrentQueue < ActionData > ( ) ;
11791184 dataListener = new ReplaySubject < ActionData > ( ) ;
11801185 disposables . Add ( dataListener ) ;
0 commit comments