Skip to content

Commit 170f952

Browse files
authored
2.x: Fix parallel() on grouped flowable not replenishing properly (#6720)
1 parent aeb5f2c commit 170f952

File tree

2 files changed

+44
-6
lines changed

2 files changed

+44
-6
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -709,17 +709,25 @@ public T poll() {
709709
produced++;
710710
return v;
711711
}
712-
int p = produced;
713-
if (p != 0) {
714-
produced = 0;
715-
parent.upstream.request(p);
716-
}
712+
tryReplenish();
717713
return null;
718714
}
719715

720716
@Override
721717
public boolean isEmpty() {
722-
return queue.isEmpty();
718+
if (queue.isEmpty()) {
719+
tryReplenish();
720+
return true;
721+
}
722+
return false;
723+
}
724+
725+
void tryReplenish() {
726+
int p = produced;
727+
if (p != 0) {
728+
produced = 0;
729+
parent.upstream.request(p);
730+
}
723731
}
724732

725733
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2286,4 +2286,34 @@ public void run() {
22862286
}
22872287
}
22882288
}
2289+
2290+
@Test
2291+
public void fusedParallelGroupProcessing() {
2292+
Flowable.range(0, 500000)
2293+
.subscribeOn(Schedulers.single())
2294+
.groupBy(new Function<Integer, Integer>() {
2295+
@Override
2296+
public Integer apply(Integer i) {
2297+
return i % 2;
2298+
}
2299+
})
2300+
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<Integer>>() {
2301+
@Override
2302+
public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) {
2303+
return g.getKey() == 0
2304+
? g
2305+
.parallel()
2306+
.runOn(Schedulers.computation())
2307+
.map(Functions.<Integer>identity())
2308+
.sequential()
2309+
: g.map(Functions.<Integer>identity()) // no need to use hide
2310+
;
2311+
}
2312+
})
2313+
.test()
2314+
.awaitDone(20, TimeUnit.SECONDS)
2315+
.assertValueCount(500000)
2316+
.assertComplete()
2317+
.assertNoErrors();
2318+
}
22892319
}

0 commit comments

Comments
 (0)