Skip to content

Commit bcf9807

Browse files
BugFix: Another GroupBy use case found and fixed.
1 parent 0bb6666 commit bcf9807

File tree

2 files changed

+54
-4
lines changed

2 files changed

+54
-4
lines changed

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void onCompleted() {
5959
}
6060

6161
if (completionCounter.get() == 0) {
62-
// special case if no children are running (such as an empty sequence)
62+
// special case if no children are running (such as an empty sequence, or just getting the groups and not subscribing)
6363
childOperator.onCompleted();
6464
}
6565
}
@@ -88,6 +88,8 @@ public void onNext(T t) {
8888

8989
@Override
9090
public void call(final Operator<? super T> o) {
91+
// number of children we have running
92+
completionCounter.incrementAndGet();
9193
o.add(Subscriptions.create(new Action0() {
9294

9395
@Override
@@ -119,8 +121,6 @@ public void onNext(T t) {
119121

120122
});
121123
groups.put(key, gps);
122-
// number of children we have running
123-
completionCounter.incrementAndGet();
124124
childOperator.onNext(go);
125125
}
126126
// we have the correct group so send value to it

rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,19 @@ public Integer call(Integer i) {
483483
@Override
484484
public Observable<Integer> call(GroupedObservable<Integer, Integer> group) {
485485
if (group.getKey() == 0) {
486-
return group.observeOn(Schedulers.newThread()).delay(200, TimeUnit.MILLISECONDS);
486+
return group.observeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
487+
488+
@Override
489+
public Integer call(Integer t) {
490+
try {
491+
Thread.sleep(2);
492+
} catch (InterruptedException e) {
493+
e.printStackTrace();
494+
}
495+
return t * 10;
496+
}
497+
498+
});
487499
} else {
488500
return group.observeOn(Schedulers.newThread());
489501
}
@@ -493,6 +505,7 @@ public Observable<Integer> call(GroupedObservable<Integer, Integer> group) {
493505

494506
@Override
495507
public void onCompleted() {
508+
System.out.println("=> onCompleted");
496509
latch.countDown();
497510
}
498511

@@ -516,6 +529,43 @@ public void onNext(Integer s) {
516529
assertEquals(100, eventCounter.get());
517530
}
518531

532+
@Test
533+
public void testCompletionIfInnerNotSubscribed() throws InterruptedException {
534+
final CountDownLatch latch = new CountDownLatch(1);
535+
final AtomicInteger eventCounter = new AtomicInteger();
536+
Observable.range(0, 100)
537+
.groupBy(new Func1<Integer, Integer>() {
538+
539+
@Override
540+
public Integer call(Integer i) {
541+
return i % 2;
542+
}
543+
})
544+
.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
545+
546+
@Override
547+
public void onCompleted() {
548+
latch.countDown();
549+
}
550+
551+
@Override
552+
public void onError(Throwable e) {
553+
e.printStackTrace();
554+
latch.countDown();
555+
}
556+
557+
@Override
558+
public void onNext(GroupedObservable<Integer, Integer> s) {
559+
eventCounter.incrementAndGet();
560+
System.out.println("=> " + s);
561+
}
562+
});
563+
if (!latch.await(500, TimeUnit.MILLISECONDS)) {
564+
fail("timed out - never got completion");
565+
}
566+
assertEquals(2, eventCounter.get());
567+
}
568+
519569
private static class Event {
520570
int source;
521571
String message;

0 commit comments

Comments
 (0)