Skip to content

Commit ddd9b67

Browse files
authored
2.x: make parallel() a fusion-async-boundary (#5677)
1 parent 1ad6647 commit ddd9b67

File tree

2 files changed

+101
-4
lines changed

2 files changed

+101
-4
lines changed

src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void onSubscribe(Subscription s) {
116116
@SuppressWarnings("unchecked")
117117
QueueSubscription<T> qs = (QueueSubscription<T>) s;
118118

119-
int m = qs.requestFusion(QueueSubscription.ANY);
119+
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
120120

121121
if (m == QueueSubscription.SYNC) {
122122
sourceMode = m;

src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,22 @@
1414
package io.reactivex.parallel;
1515

1616
import static org.junit.Assert.*;
17+
18+
import java.util.*;
19+
import java.util.concurrent.*;
20+
1721
import org.junit.Test;
18-
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.*;
1923

20-
import io.reactivex.Flowable;
24+
import io.reactivex.*;
2125
import io.reactivex.exceptions.*;
22-
import io.reactivex.functions.Function;
26+
import io.reactivex.functions.*;
2327
import io.reactivex.internal.functions.Functions;
28+
import io.reactivex.internal.fuseable.QueueSubscription;
29+
import io.reactivex.internal.subscribers.BasicFuseableSubscriber;
2430
import io.reactivex.internal.subscriptions.BooleanSubscription;
2531
import io.reactivex.processors.UnicastProcessor;
32+
import io.reactivex.schedulers.Schedulers;
2633

2734
public class ParallelFromPublisherTest {
2835

@@ -53,6 +60,53 @@ public void fusedFilterBecomesEmpty() {
5360
.assertResult();
5461
}
5562

63+
static final class StripBoundary<T> extends Flowable<T> implements FlowableTransformer<T, T> {
64+
65+
final Flowable<T> source;
66+
67+
StripBoundary(Flowable<T> source) {
68+
this.source = source;
69+
}
70+
71+
@Override
72+
public Publisher<T> apply(Flowable<T> upstream) {
73+
return new StripBoundary<T>(upstream);
74+
}
75+
76+
@Override
77+
protected void subscribeActual(Subscriber<? super T> s) {
78+
source.subscribe(new StripBoundarySubscriber<T>(s));
79+
}
80+
81+
static final class StripBoundarySubscriber<T> extends BasicFuseableSubscriber<T, T> {
82+
83+
StripBoundarySubscriber(Subscriber<? super T> actual) {
84+
super(actual);
85+
}
86+
87+
@Override
88+
public void onNext(T t) {
89+
actual.onNext(t);
90+
}
91+
92+
@Override
93+
public int requestFusion(int mode) {
94+
QueueSubscription<T> fs = qs;
95+
if (fs != null) {
96+
int m = fs.requestFusion(mode & ~QueueSubscription.BOUNDARY);
97+
this.sourceMode = m;
98+
return m;
99+
}
100+
return QueueSubscription.NONE;
101+
}
102+
103+
@Override
104+
public T poll() throws Exception {
105+
return qs.poll();
106+
}
107+
}
108+
}
109+
56110
@Test
57111
public void syncFusedMapCrash() {
58112
Flowable.just(1)
@@ -62,6 +116,7 @@ public Object apply(Integer v) throws Exception {
62116
throw new TestException();
63117
}
64118
})
119+
.compose(new StripBoundary<Object>(null))
65120
.parallel()
66121
.sequential()
67122
.test()
@@ -81,11 +136,53 @@ public Object apply(Integer v) throws Exception {
81136
throw new TestException();
82137
}
83138
})
139+
.compose(new StripBoundary<Object>(null))
84140
.parallel()
85141
.sequential()
86142
.test()
87143
.assertFailure(TestException.class);
88144

89145
assertFalse(up.hasSubscribers());
90146
}
147+
148+
@Test
149+
public void boundaryConfinement() {
150+
final Set<String> between = new HashSet<String>();
151+
final ConcurrentHashMap<String, String> processing = new ConcurrentHashMap<String, String>();
152+
153+
Flowable.range(1, 10)
154+
.observeOn(Schedulers.single(), false, 1)
155+
.doOnNext(new Consumer<Integer>() {
156+
@Override
157+
public void accept(Integer v) throws Exception {
158+
between.add(Thread.currentThread().getName());
159+
}
160+
})
161+
.parallel(2, 1)
162+
.runOn(Schedulers.computation(), 1)
163+
.map(new Function<Integer, Object>() {
164+
@Override
165+
public Object apply(Integer v) throws Exception {
166+
processing.putIfAbsent(Thread.currentThread().getName(), "");
167+
return v;
168+
}
169+
})
170+
.sequential()
171+
.test()
172+
.awaitDone(5, TimeUnit.SECONDS)
173+
.assertSubscribed()
174+
.assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
175+
.assertComplete()
176+
.assertNoErrors()
177+
;
178+
179+
assertEquals(between.toString(), 1, between.size());
180+
assertTrue(between.toString(), between.iterator().next().contains("RxSingleScheduler"));
181+
182+
Map<String, String> map = processing; // AnimalSniffer: CHM.keySet() in Java 8 returns KeySetView
183+
184+
for (String e : map.keySet()) {
185+
assertTrue(map.toString(), e.contains("RxComputationThreadPool"));
186+
}
187+
}
91188
}

0 commit comments

Comments
 (0)