Skip to content

Commit c601c1a

Browse files
Bugfix: GroupBy Completion
Found bug while doing Parallel. It was completing prematurely when child groups were asynchronous and delayed.
1 parent 06f5d83 commit c601c1a

File tree

2 files changed

+75
-3
lines changed

2 files changed

+75
-3
lines changed

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,15 @@ public Operator<? super T> call(final Operator<? super GroupedObservable<K, T>>
5353

5454
@Override
5555
public void onCompleted() {
56-
// if we receive onCompleted from our parent we onComplete everything
56+
// if we receive onCompleted from our parent we onComplete children
5757
for (PublishSubject<T> ps : groups.values()) {
5858
ps.onCompleted();
5959
}
60-
childOperator.onCompleted();
60+
61+
if (completionCounter.get() == 0) {
62+
// special case if no children are running (such as an empty sequence)
63+
childOperator.onCompleted();
64+
}
6165
}
6266

6367
@Override
@@ -92,7 +96,25 @@ public void call() {
9296
}
9397

9498
}));
95-
_gps.subscribe(o);
99+
_gps.subscribe(new Operator<T>(o) {
100+
101+
@Override
102+
public void onCompleted() {
103+
o.onCompleted();
104+
completeInner();
105+
}
106+
107+
@Override
108+
public void onError(Throwable e) {
109+
o.onError(e);
110+
}
111+
112+
@Override
113+
public void onNext(T t) {
114+
o.onNext(t);
115+
}
116+
117+
});
96118
}
97119

98120
});

rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,56 @@ public void call(String s) {
466466
assertEquals(37, sentEventCounter.get());
467467
}
468468

469+
@Test
470+
public void testStaggeredCompletion() throws InterruptedException {
471+
final AtomicInteger eventCounter = new AtomicInteger();
472+
final CountDownLatch latch = new CountDownLatch(1);
473+
Observable.range(0, 100)
474+
.groupBy(new Func1<Integer, Integer>() {
475+
476+
@Override
477+
public Integer call(Integer i) {
478+
return i % 2;
479+
}
480+
})
481+
.flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
482+
483+
@Override
484+
public Observable<Integer> call(GroupedObservable<Integer, Integer> group) {
485+
if (group.getKey() == 0) {
486+
return group.observeOn(Schedulers.newThread()).delay(200, TimeUnit.MILLISECONDS);
487+
} else {
488+
return group.observeOn(Schedulers.newThread());
489+
}
490+
}
491+
})
492+
.subscribe(new Observer<Integer>() {
493+
494+
@Override
495+
public void onCompleted() {
496+
latch.countDown();
497+
}
498+
499+
@Override
500+
public void onError(Throwable e) {
501+
e.printStackTrace();
502+
latch.countDown();
503+
}
504+
505+
@Override
506+
public void onNext(Integer s) {
507+
eventCounter.incrementAndGet();
508+
System.out.println("=> " + s);
509+
}
510+
});
511+
512+
if (!latch.await(2000, TimeUnit.MILLISECONDS)) {
513+
fail("timed out");
514+
}
515+
516+
assertEquals(100, eventCounter.get());
517+
}
518+
469519
private static class Event {
470520
int source;
471521
String message;

0 commit comments

Comments
 (0)