Skip to content

Commit 0518d58

Browse files
authored
2.x: benchmark the new strict/interop mode (#5115)
* 2.x: benchmark the new strict/interop mode * consider CPU usage
1 parent a00ea07 commit 0518d58

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import org.openjdk.jmh.infra.Blackhole;
17+
import org.reactivestreams.*;
18+
19+
import io.reactivex.disposables.Disposable;
20+
21+
/**
22+
* A multi-type synchronous consumer that doesn't implement FlowableSubscriber and
23+
* thus should be treated by Flowable as a candidate for strict interop.
24+
*/
25+
public final class PerfInteropConsumer implements Subscriber<Object>, Observer<Object>,
26+
SingleObserver<Object>, CompletableObserver, MaybeObserver<Object> {
27+
28+
final Blackhole bh;
29+
30+
public PerfInteropConsumer(Blackhole bh) {
31+
this.bh = bh;
32+
}
33+
34+
@Override
35+
public void onSuccess(Object value) {
36+
bh.consume(value);
37+
}
38+
39+
@Override
40+
public void onSubscribe(Disposable d) {
41+
}
42+
43+
@Override
44+
public void onSubscribe(Subscription s) {
45+
s.request(Long.MAX_VALUE);
46+
}
47+
48+
@Override
49+
public void onNext(Object t) {
50+
bh.consume(t);
51+
}
52+
53+
@Override
54+
public void onError(Throwable t) {
55+
t.printStackTrace();
56+
}
57+
58+
@Override
59+
public void onComplete() {
60+
bh.consume(true);
61+
}
62+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.Arrays;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import org.openjdk.jmh.annotations.*;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
import org.reactivestreams.*;
22+
23+
@BenchmarkMode(Mode.Throughput)
24+
@Warmup(iterations = 5)
25+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
26+
@OutputTimeUnit(TimeUnit.SECONDS)
27+
@Fork(value = 1)
28+
@State(Scope.Thread)
29+
public class StrictPerf {
30+
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
31+
public int count;
32+
33+
@Param({ "1", "10", "100", "1000", "10000" })
34+
public int cpu;
35+
36+
Flowable<Integer> source;
37+
38+
@Setup
39+
public void setup() {
40+
Integer[] array = new Integer[count];
41+
Arrays.fill(array, 777);
42+
43+
source = Flowable.fromArray(array);
44+
}
45+
46+
@Benchmark
47+
public void internal(Blackhole bh) {
48+
source.subscribe(new InternalConsumer(bh, cpu));
49+
}
50+
51+
@Benchmark
52+
public void external(Blackhole bh) {
53+
source.subscribe(new ExternalConsumer(bh, cpu));
54+
}
55+
56+
static final class InternalConsumer implements FlowableSubscriber<Object> {
57+
final Blackhole bh;
58+
59+
final int cycles;
60+
61+
InternalConsumer(Blackhole bh, int cycles) {
62+
this.bh = bh;
63+
this.cycles = cycles;
64+
}
65+
66+
@Override
67+
public void onNext(Object t) {
68+
bh.consume(t);
69+
Blackhole.consumeCPU(cycles);
70+
}
71+
72+
@Override
73+
public void onError(Throwable t) {
74+
bh.consume(t);
75+
}
76+
77+
@Override
78+
public void onComplete() {
79+
bh.consume(true);
80+
}
81+
82+
@Override
83+
public void onSubscribe(Subscription s) {
84+
s.request(Long.MAX_VALUE);
85+
}
86+
}
87+
88+
static final class ExternalConsumer implements Subscriber<Object> {
89+
final Blackhole bh;
90+
91+
final int cycles;
92+
93+
ExternalConsumer(Blackhole bh, int cycles) {
94+
this.bh = bh;
95+
this.cycles = cycles;
96+
}
97+
98+
@Override
99+
public void onNext(Object t) {
100+
bh.consume(t);
101+
Blackhole.consumeCPU(cycles);
102+
}
103+
104+
@Override
105+
public void onError(Throwable t) {
106+
bh.consume(t);
107+
}
108+
109+
@Override
110+
public void onComplete() {
111+
bh.consume(true);
112+
}
113+
114+
@Override
115+
public void onSubscribe(Subscription s) {
116+
s.request(Long.MAX_VALUE);
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)