Skip to content

Commit 8eee476

Browse files
committed
1.x: Single performance measurements
1 parent fb4241d commit 8eee476

File tree

1 file changed

+263
-0
lines changed

1 file changed

+263
-0
lines changed
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx;
18+
19+
import java.util.concurrent.*;
20+
21+
import org.openjdk.jmh.annotations.*;
22+
import org.openjdk.jmh.infra.Blackhole;
23+
24+
import rx.functions.Func1;
25+
import rx.schedulers.Schedulers;
26+
27+
/**
28+
* Benchmark Single.
29+
* <p>
30+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*SingleSourcePerf.*"
31+
* <p>
32+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*SingleSourcePerf.*"
33+
*/
34+
@BenchmarkMode(Mode.Throughput)
35+
@OutputTimeUnit(TimeUnit.SECONDS)
36+
@State(Scope.Thread)
37+
public class SingleSourcePerf {
38+
39+
Single<Integer> source;
40+
41+
Single<Integer> flatmapped;
42+
43+
Single<Integer> flatmappedConst;
44+
45+
Single<Integer> sourceObserveOn;
46+
47+
Single<Integer> sourceSubscribeOn;
48+
49+
Single<Integer> sourceObserveOnExecutor;
50+
51+
Single<Integer> sourceSubscribeOnExecutor;
52+
53+
Single<Integer> sourceObserveOnScheduledExecutor;
54+
55+
Single<Integer> sourceSubscribeOnScheduledExecutor;
56+
57+
// Single<Integer> sourceObserveOnFJ;
58+
59+
// Single<Integer> sourceSubscribeOnFJ;
60+
61+
ScheduledExecutorService scheduledExecutor;
62+
63+
ExecutorService executor;
64+
65+
@Setup
66+
public void setup() {
67+
source = Single.just(1);
68+
69+
flatmapped = source.flatMap(new Func1<Integer, Single<Integer>>() {
70+
@Override
71+
public Single<Integer> call(Integer t) {
72+
return Single.just(t);
73+
}
74+
});
75+
76+
flatmapped = source.flatMap(new Func1<Integer, Single<Integer>>() {
77+
@Override
78+
public Single<Integer> call(Integer t) {
79+
return source;
80+
}
81+
});
82+
83+
sourceObserveOn = source.observeOn(Schedulers.computation());
84+
85+
sourceSubscribeOn = source.subscribeOn(Schedulers.computation());
86+
87+
// ----------
88+
89+
scheduledExecutor = Executors.newScheduledThreadPool(1);
90+
91+
Scheduler s = Schedulers.from(scheduledExecutor);
92+
93+
sourceObserveOnScheduledExecutor = source.observeOn(s);
94+
95+
sourceSubscribeOnScheduledExecutor = source.subscribeOn(s);
96+
97+
// ----------
98+
99+
executor = Executors.newSingleThreadExecutor();
100+
101+
Scheduler se = Schedulers.from(executor);
102+
103+
sourceObserveOnExecutor = source.observeOn(se);
104+
105+
sourceSubscribeOnExecutor = source.subscribeOn(se);
106+
107+
// --------
108+
109+
// Scheduler fj = Schedulers.from(ForkJoinPool.commonPool());
110+
111+
// sourceObserveOnFJ = source.observeOn(fj);
112+
113+
// sourceSubscribeOnFJ = source.subscribeOn(fj);
114+
}
115+
116+
@TearDown
117+
public void teardown() {
118+
scheduledExecutor.shutdownNow();
119+
120+
executor.shutdownNow();
121+
}
122+
123+
static final class PlainSingleSubscriber extends SingleSubscriber<Object> {
124+
final Blackhole bh;
125+
126+
public PlainSingleSubscriber(Blackhole bh) {
127+
this.bh = bh;
128+
}
129+
130+
@Override
131+
public void onSuccess(Object value) {
132+
bh.consume(value);
133+
}
134+
135+
@Override
136+
public void onError(Throwable error) {
137+
bh.consume(error);
138+
}
139+
}
140+
141+
static final class LatchedSingleSubscriber extends SingleSubscriber<Object> {
142+
final Blackhole bh;
143+
144+
final CountDownLatch cdl;
145+
146+
public LatchedSingleSubscriber(Blackhole bh) {
147+
this.bh = bh;
148+
this.cdl = new CountDownLatch(1);
149+
}
150+
151+
@Override
152+
public void onSuccess(Object value) {
153+
bh.consume(value);
154+
cdl.countDown();
155+
}
156+
157+
@Override
158+
public void onError(Throwable error) {
159+
bh.consume(error);
160+
cdl.countDown();
161+
}
162+
163+
public void await() {
164+
try {
165+
cdl.await();
166+
} catch (InterruptedException ex) {
167+
throw new RuntimeException(ex);
168+
}
169+
}
170+
171+
public void awaitSpin() {
172+
while (cdl.getCount() != 0L) ;
173+
}
174+
}
175+
176+
@Benchmark
177+
public void direct(Blackhole bh) {
178+
source.subscribe(new PlainSingleSubscriber(bh));
179+
}
180+
181+
@Benchmark
182+
public void flatmap(Blackhole bh) {
183+
flatmapped.subscribe(new PlainSingleSubscriber(bh));
184+
}
185+
186+
@Benchmark
187+
public void flatmapConst(Blackhole bh) {
188+
flatmapped.subscribe(new PlainSingleSubscriber(bh));
189+
}
190+
191+
@Benchmark
192+
public void observeOn(Blackhole bh) {
193+
LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
194+
195+
sourceObserveOn.subscribe(o);
196+
197+
o.awaitSpin();
198+
}
199+
200+
@Benchmark
201+
public void observeOnExec(Blackhole bh) {
202+
LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
203+
204+
sourceObserveOnExecutor.subscribe(o);
205+
206+
o.awaitSpin();
207+
}
208+
209+
@Benchmark
210+
public void subscribeOn(Blackhole bh) {
211+
LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
212+
213+
sourceSubscribeOn.subscribe(o);
214+
215+
o.awaitSpin();
216+
}
217+
218+
@Benchmark
219+
public void subscribeOnExec(Blackhole bh) {
220+
LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
221+
222+
sourceSubscribeOnExecutor.subscribe(o);
223+
224+
o.awaitSpin();
225+
}
226+
227+
@Benchmark
228+
public void subscribeOnSchExec(Blackhole bh) {
229+
LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
230+
231+
sourceSubscribeOnScheduledExecutor.subscribe(o);
232+
233+
o.awaitSpin();
234+
}
235+
236+
// @Benchmark
237+
// public void subscribeOnFJ(Blackhole bh) {
238+
// LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
239+
//
240+
// sourceSubscribeOnFJ.subscribe(o);
241+
//
242+
// o.awaitSpin();
243+
// }
244+
245+
@Benchmark
246+
public void observeOnSchExec(Blackhole bh) {
247+
LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
248+
249+
sourceObserveOnScheduledExecutor.subscribe(o);
250+
251+
o.awaitSpin();
252+
}
253+
254+
// @Benchmark
255+
// public void observeOnFJ(Blackhole bh) {
256+
// LatchedSingleSubscriber o = new LatchedSingleSubscriber(bh);
257+
//
258+
// sourceObserveOnFJ.subscribe(o);
259+
//
260+
// o.awaitSpin();
261+
// }
262+
263+
}

0 commit comments

Comments
 (0)