Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 799cc75

Browse files
committed
Only signal completion in TrackingCollection when all items have really been processed
1 parent 204c548 commit 799cc75

File tree

1 file changed

+45
-20
lines changed

1 file changed

+45
-20
lines changed

src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ enum TheAction
174174
Add,
175175
Insert,
176176
Remove,
177-
Ignore
177+
Ignore,
178+
End
178179
}
179180

180181
bool isChanging;
@@ -187,6 +188,7 @@ enum TheAction
187188
IConnectableObservable<T> dataPump;
188189
IConnectableObservable<Unit> cachePump;
189190
ConcurrentQueue<ActionData> cache;
191+
Queue<ActionData> processingQueue;
190192

191193
ReplaySubject<Unit> signalHaveData;
192194
ReplaySubject<Unit> signalNeedData;
@@ -211,7 +213,8 @@ enum TheAction
211213
readonly Dictionary<T, int> filteredIndexCache = new Dictionary<T, int>();
212214

213215
bool originalSourceIsCompleted;
214-
bool signalOriginalSourceCompletion;
216+
bool signaledOriginalSourceCompletion;
217+
bool sourceHasData;
215218
ReplaySubject<Unit> originalSourceCompleted;
216219
public IObservable<Unit> OriginalCompleted => originalSourceCompleted;
217220

@@ -236,6 +239,7 @@ public TrackingCollection(Func<T, T, int> comparer = null, Func<T, int, IList<T>
236239
Func<T, T, int> newer = null, IScheduler scheduler = null)
237240
{
238241
cache = new ConcurrentQueue<ActionData>();
242+
processingQueue = new Queue<ActionData>();
239243
ProcessingDelay = TimeSpan.FromMilliseconds(10);
240244
fuzziness = TimeSpan.FromMilliseconds(1);
241245

@@ -287,6 +291,7 @@ public IObservable<T> Listen(IObservable<T> obs)
287291
})
288292
.Do(data =>
289293
{
294+
sourceHasData = true;
290295
cache.Enqueue(new ActionData(data));
291296
signalHaveData.OnNext(Unit.Default);
292297
})
@@ -296,16 +301,10 @@ public IObservable<T> Listen(IObservable<T> obs)
296301
return;
297302

298303
originalSourceIsCompleted = true;
299-
if (!cache.IsEmpty)
304+
if (!sourceHasData)
300305
{
301-
signalOriginalSourceCompletion = true;
302-
}
303-
else
304-
{
305-
originalSourceCompleted.OnNext(Unit.Default);
306-
originalSourceCompleted.OnCompleted();
307-
signalNeedData.OnCompleted();
308-
signalHaveData.OnCompleted();
306+
var end = new ActionData(TheAction.End, null);
307+
dataListener.OnNext(end);
309308
}
310309
})
311310
.Publish();
@@ -321,16 +320,24 @@ public IObservable<T> Listen(IObservable<T> obs)
321320
{
322321
var delay = CalculateProcessingDelay(interval);
323322
waitHandle.Wait(delay);
324-
dataListener.OnNext(GetFromQueue());
323+
var data = GetFromQueue();
324+
if (!data.Equals(ActionData.Default))
325+
{
326+
processingQueue.Enqueue(data);
327+
dataListener.OnNext(data);
328+
}
325329
return Unit.Default;
326330
})
327331
.Publish();
328332

329333
source = dataListener
330-
.Where(data => data.Item != null)
334+
.Where(data => data.Item != null || data.TheAction == TheAction.End)
331335
.ObserveOn(scheduler)
332336
.Select(data =>
333337
{
338+
if (data.TheAction == TheAction.End)
339+
return data;
340+
334341
data = ProcessItem(data, original);
335342

336343
// if we're removing an item that doesn't exist, ignore it
@@ -351,15 +358,31 @@ public IObservable<T> Listen(IObservable<T> obs)
351358
data = FilteredRemove(data);
352359
return data;
353360
})
354-
.Do(_ =>
361+
.Do(data =>
355362
{
356-
if (ManualProcessing)
363+
// only objects coming from the original observable go into this queue
364+
if (processingQueue.Count > 0)
365+
processingQueue.Dequeue();
366+
if (ManualProcessing && processingQueue.Count == 0)
357367
{
358-
if (signalOriginalSourceCompletion)
368+
// if we've finished processing we need to raise the Completed event on
369+
// the originalSourceCompleted subject, but we want to do this only
370+
// after listeners have received the last item, so set a flag and
371+
// insert a fake object in the queue so it triggers the Completed
372+
// event on the next processing loop
373+
if (!signaledOriginalSourceCompletion)
359374
{
360-
signalOriginalSourceCompletion = false;
361-
originalSourceCompleted.OnNext(Unit.Default);
362-
originalSourceCompleted.OnCompleted();
375+
if (data.TheAction != TheAction.End)
376+
{
377+
var end = new ActionData(TheAction.End, null);
378+
dataListener.OnNext(end);
379+
}
380+
else
381+
{
382+
signaledOriginalSourceCompletion = true;
383+
originalSourceCompleted.OnNext(Unit.Default);
384+
originalSourceCompleted.OnCompleted();
385+
}
363386
}
364387
}
365388
else
@@ -1174,8 +1197,10 @@ void Reset()
11741197
pumpDisposables.Clear();
11751198
disposables.Clear();
11761199
originalSourceIsCompleted = false;
1177-
signalOriginalSourceCompletion = false;
1200+
signaledOriginalSourceCompletion = false;
1201+
sourceHasData = false;
11781202
cache = new ConcurrentQueue<ActionData>();
1203+
processingQueue = new Queue<ActionData>();
11791204
dataListener = new ReplaySubject<ActionData>();
11801205
disposables.Add(dataListener);
11811206
signalHaveData = new ReplaySubject<Unit>();

0 commit comments

Comments
 (0)