Skip to content

Commit 125bd05

Browse files
authored
3.x: Disable fusion on the groups of Flowable.groupBy (#6983)
1 parent 5c26064 commit 125bd05

File tree

2 files changed

+188
-2
lines changed

2 files changed

+188
-2
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,10 +642,13 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a,
642642

643643
@Override
644644
public int requestFusion(int mode) {
645+
// FIXME fusion mode causes hangs
646+
/*
645647
if ((mode & ASYNC) != 0) {
646648
outputFused = true;
647649
return ASYNC;
648650
}
651+
*/
649652
return NONE;
650653
}
651654

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 185 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.*;
2323
import java.util.concurrent.atomic.*;
2424

25-
import org.junit.Test;
25+
import org.junit.*;
2626
import org.mockito.Mockito;
2727
import org.reactivestreams.*;
2828

@@ -1671,7 +1671,9 @@ public void accept(GroupedFlowable<Integer, Integer> g) {
16711671
.subscribe(ts2);
16721672

16731673
ts1
1674-
.assertFusionMode(QueueFuseable.ASYNC)
1674+
// FIXME fusion mode causes hangs
1675+
//.assertFusionMode(QueueFuseable.ASYNC)
1676+
.assertFusionMode(QueueFuseable.NONE)
16751677
.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
16761678
.assertNoErrors()
16771679
.assertComplete();
@@ -2693,4 +2695,185 @@ public void issue6974Part2Case1NoEvictLoop() {
26932695
}
26942696
}
26952697
*/
2698+
2699+
@Test
2700+
public void issue6974Part2Case1ObserveOn() {
2701+
final int groups = 20;
2702+
2703+
// Not completed (Timed out), buffer is too small
2704+
int groupByBufferSize = groups * 2;
2705+
int flatMapMaxConcurrency = 2 * groups;
2706+
boolean notifyOnExplicitEviction = false;
2707+
2708+
Flowable
2709+
.range(1, 500_000)
2710+
.map(i -> i % groups)
2711+
.doOnCancel(() -> {
2712+
System.out.println("Cancelling upstream");
2713+
})
2714+
.groupBy(i -> i, i -> i, false, groupByBufferSize,
2715+
sizeCap(groups * 2, notifyOnExplicitEviction))
2716+
.flatMap(gf -> gf
2717+
.observeOn(Schedulers.computation())
2718+
// .take(10)
2719+
.take(10, TimeUnit.MILLISECONDS)
2720+
, flatMapMaxConcurrency)
2721+
.test()
2722+
.awaitDone(5, TimeUnit.SECONDS)
2723+
.assertNoErrors()
2724+
.assertComplete();
2725+
}
2726+
2727+
@Test
2728+
public void issue6974Part2Case1ObserveOnHide() {
2729+
final int groups = 20;
2730+
2731+
// Not completed (Timed out), buffer is too small
2732+
int groupByBufferSize = groups * 2;
2733+
int flatMapMaxConcurrency = 2 * groups;
2734+
boolean notifyOnExplicitEviction = false;
2735+
2736+
Flowable
2737+
.range(1, 500_000)
2738+
.map(i -> i % groups)
2739+
.doOnCancel(() -> System.out.println("Cancelling upstream"))
2740+
.groupBy(i -> i, i -> i, false, groupByBufferSize,
2741+
sizeCap(groups * 2, notifyOnExplicitEviction))
2742+
.flatMap(gf -> gf
2743+
.hide()
2744+
.observeOn(Schedulers.computation())
2745+
// .take(10)
2746+
.take(10, TimeUnit.MILLISECONDS)
2747+
, flatMapMaxConcurrency)
2748+
.test()
2749+
.awaitDone(5, TimeUnit.SECONDS)
2750+
.assertNoErrors()
2751+
.assertComplete();
2752+
}
2753+
2754+
@Test
2755+
public void issue6974Part2Case1ObserveOnNoCap() {
2756+
final int groups = 20;
2757+
2758+
// Not completed (Timed out), buffer is too small
2759+
int flatMapMaxConcurrency = 1_000_000;
2760+
2761+
Flowable
2762+
.range(1, 500_000)
2763+
.map(i -> i % groups)
2764+
.doOnRequest(v -> {
2765+
System.out.println("Source: " + v);
2766+
})
2767+
.groupBy(i -> i)
2768+
.flatMap(gf -> gf
2769+
.observeOn(Schedulers.computation())
2770+
// .take(10)
2771+
.take(10, TimeUnit.MILLISECONDS)
2772+
, flatMapMaxConcurrency)
2773+
.test()
2774+
.awaitDone(5, TimeUnit.SECONDS)
2775+
.assertNoErrors()
2776+
.assertComplete();
2777+
}
2778+
2779+
@Test
2780+
public void issue6974Part2Case1ObserveOnNoCapHide() {
2781+
final int groups = 20;
2782+
2783+
// Not completed (Timed out), buffer is too small
2784+
int flatMapMaxConcurrency = 1_000_000;
2785+
2786+
Flowable
2787+
.range(1, 500_000)
2788+
.map(i -> i % groups)
2789+
.doOnRequest(v -> {
2790+
System.out.println("Source: " + v);
2791+
})
2792+
.groupBy(i -> i)
2793+
.flatMap(gf -> gf
2794+
.hide()
2795+
.observeOn(Schedulers.computation())
2796+
// .take(10)
2797+
.take(10, TimeUnit.MILLISECONDS)
2798+
, flatMapMaxConcurrency)
2799+
.test()
2800+
.awaitDone(5, TimeUnit.SECONDS)
2801+
.assertNoErrors()
2802+
.assertComplete();
2803+
}
2804+
2805+
/*
2806+
* Disabled: Takes very long. Run it locally only.
2807+
@Test
2808+
public void issue6974Part2Case1ObserveOnNoCapHideLoop() {
2809+
for (int i = 0; i < 100; i++) {
2810+
issue6974Part2Case1ObserveOnNoCapHide();
2811+
}
2812+
}
2813+
*/
2814+
2815+
@Test
2816+
public void issue6974Part2Case1ObserveOnConditional() {
2817+
final int groups = 20;
2818+
2819+
// Not completed (Timed out), buffer is too small
2820+
int groupByBufferSize = groups * 2;
2821+
int flatMapMaxConcurrency = 2 * groups;
2822+
boolean notifyOnExplicitEviction = false;
2823+
2824+
Flowable
2825+
.range(1, 500_000)
2826+
.map(i -> i % groups)
2827+
.doOnCancel(() -> System.out.println("Cancelling upstream"))
2828+
.groupBy(i -> i, i -> i, false, groupByBufferSize,
2829+
sizeCap(groups * 2, notifyOnExplicitEviction))
2830+
.flatMap(gf -> gf
2831+
.observeOn(Schedulers.computation())
2832+
.filter(v -> true)
2833+
// .take(10)
2834+
.take(10, TimeUnit.MILLISECONDS)
2835+
, flatMapMaxConcurrency)
2836+
.test()
2837+
.awaitDone(5, TimeUnit.SECONDS)
2838+
.assertNoErrors()
2839+
.assertComplete();
2840+
}
2841+
2842+
@Test
2843+
public void issue6974Part2Case1ObserveOnConditionalHide() {
2844+
final int groups = 20;
2845+
2846+
// Not completed (Timed out), buffer is too small
2847+
int groupByBufferSize = groups * 2;
2848+
int flatMapMaxConcurrency = 2 * groups;
2849+
boolean notifyOnExplicitEviction = false;
2850+
2851+
Flowable
2852+
.range(1, 500_000)
2853+
.map(i -> i % groups)
2854+
.doOnCancel(() -> System.out.println("Cancelling upstream"))
2855+
.groupBy(i -> i, i -> i, false, groupByBufferSize,
2856+
sizeCap(groups * 2, notifyOnExplicitEviction))
2857+
.flatMap(gf -> gf
2858+
.hide()
2859+
.observeOn(Schedulers.computation())
2860+
.filter(v -> true)
2861+
// .take(10)
2862+
.take(10, TimeUnit.MILLISECONDS)
2863+
, flatMapMaxConcurrency)
2864+
.test()
2865+
.awaitDone(5, TimeUnit.SECONDS)
2866+
.assertNoErrors()
2867+
.assertComplete();
2868+
}
2869+
2870+
/*
2871+
* Disabled: Takes very long. Run it locally only.
2872+
@Test
2873+
public void issue6974Part2Case1ObserveOnHideLoop() {
2874+
for (int i = 0; i < 100; i++) {
2875+
issue6974Part2Case1ObserveOnHide();
2876+
}
2877+
}
2878+
*/
26962879
}

0 commit comments

Comments
 (0)