Skip to content

Commit 1ad6647

Browse files
authored
2.x: Add PublishProcessor JMH perf comparison (#5675)
1 parent b0bc478 commit 1ad6647

File tree

3 files changed

+168
-2
lines changed

3 files changed

+168
-2
lines changed

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ targetCompatibility = JavaVersion.VERSION_1_6
5353
def junitVersion = "4.12"
5454
def reactiveStreamsVersion = "1.0.1"
5555
def mockitoVersion = "2.1.0"
56-
def jmhVersion = "1.16"
56+
def jmhLibVersion = "1.19"
5757
def testNgVersion = "6.11"
5858

5959
// --------------------------------------
@@ -192,8 +192,9 @@ publishing.publications.all {
192192
}
193193

194194
jmh {
195-
jmhVersion = "1.19"
195+
jmhVersion = jmhLibVersion
196196
humanOutputFile = null
197+
includeTests = false
197198

198199
if (project.hasProperty("jmh")) {
199200
include = ".*" + project.jmh + ".*"
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.CountDownLatch;
17+
18+
import org.openjdk.jmh.infra.Blackhole;
19+
import org.reactivestreams.Subscription;
20+
21+
/**
22+
* Performance subscriber with a one-time request from the upstream.
23+
*/
24+
public class PerfBoundedSubscriber extends CountDownLatch implements FlowableSubscriber<Object> {
25+
26+
final Blackhole bh;
27+
28+
final long request;
29+
30+
public PerfBoundedSubscriber(Blackhole bh, long request) {
31+
super(1);
32+
this.bh = bh;
33+
this.request = request;
34+
}
35+
36+
@Override
37+
public void onSubscribe(Subscription s) {
38+
s.request(request);
39+
}
40+
41+
@Override
42+
public void onComplete() {
43+
countDown();
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
countDown();
49+
}
50+
51+
@Override
52+
public void onNext(Object t) {
53+
bh.consume(t);
54+
}
55+
56+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.TimeUnit;
17+
18+
import org.openjdk.jmh.annotations.*;
19+
import org.openjdk.jmh.infra.Blackhole;
20+
21+
import io.reactivex.processors.PublishProcessor;
22+
import io.reactivex.subjects.PublishSubject;
23+
24+
@BenchmarkMode(Mode.Throughput)
25+
@Warmup(iterations = 5)
26+
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@Fork(value = 1)
29+
@State(Scope.Thread)
30+
public class PublishProcessorPerf {
31+
32+
PublishProcessor<Integer> unbounded;
33+
34+
PublishProcessor<Integer> bounded;
35+
36+
PublishSubject<Integer> subject;
37+
38+
@Setup
39+
public void setup(Blackhole bh) {
40+
unbounded = PublishProcessor.create();
41+
unbounded.subscribe(new PerfConsumer(bh));
42+
43+
bounded = PublishProcessor.create();
44+
bounded.subscribe(new PerfBoundedSubscriber(bh, 1000 * 1000));
45+
46+
subject = PublishSubject.create();
47+
subject.subscribe(new PerfConsumer(bh));
48+
}
49+
50+
@Benchmark
51+
public void unbounded1() {
52+
unbounded.onNext(1);
53+
}
54+
55+
@Benchmark
56+
public void unbounded1k() {
57+
for (int i = 0; i < 1000; i++) {
58+
unbounded.onNext(1);
59+
}
60+
}
61+
62+
@Benchmark
63+
public void unbounded1m() {
64+
for (int i = 0; i < 1000000; i++) {
65+
unbounded.onNext(1);
66+
}
67+
}
68+
69+
@Benchmark
70+
public void bounded1() {
71+
bounded.onNext(1);
72+
}
73+
74+
75+
@Benchmark
76+
public void bounded1k() {
77+
for (int i = 0; i < 1000; i++) {
78+
bounded.onNext(1);
79+
}
80+
}
81+
82+
@Benchmark
83+
public void bounded1m() {
84+
for (int i = 0; i < 1000000; i++) {
85+
bounded.onNext(1);
86+
}
87+
}
88+
89+
90+
@Benchmark
91+
public void subject1() {
92+
subject.onNext(1);
93+
}
94+
95+
96+
@Benchmark
97+
public void subject1k() {
98+
for (int i = 0; i < 1000; i++) {
99+
subject.onNext(1);
100+
}
101+
}
102+
103+
@Benchmark
104+
public void subject1m() {
105+
for (int i = 0; i < 1000000; i++) {
106+
subject.onNext(1);
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)