Skip to content

Commit 6a9ff72

Browse files
Merge pull request #1293 from benjchristensen/perf-tests
Fix and Update JMH Perf Tests
2 parents 37bdda4 + be17891 commit 6a9ff72

File tree

5 files changed

+134
-44
lines changed

5 files changed

+134
-44
lines changed

rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package rx.jmh;
1717

18-
import java.util.concurrent.CountDownLatch;
19-
2018
import org.openjdk.jmh.annotations.Param;
2119
import org.openjdk.jmh.annotations.Scope;
2220
import org.openjdk.jmh.annotations.Setup;
@@ -27,6 +25,7 @@
2725
import rx.Observable.OnSubscribe;
2826
import rx.Observer;
2927
import rx.Subscriber;
28+
import rx.observers.TestSubscriber;
3029

3130
/**
3231
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
@@ -37,12 +36,11 @@ public class InputWithIncrementingInteger {
3736
public int size;
3837

3938
public Observable<Integer> observable;
40-
public Observer<Integer> observer;
41-
42-
private CountDownLatch latch;
39+
private BlackHole bh;
4340

4441
@Setup
4542
public void setup(final BlackHole bh) {
43+
this.bh = bh;
4644
observable = Observable.create(new OnSubscribe<Integer>() {
4745
@Override
4846
public void call(Subscriber<? super Integer> o) {
@@ -54,13 +52,12 @@ public void call(Subscriber<? super Integer> o) {
5452
o.onCompleted();
5553
}
5654
});
55+
}
5756

58-
latch = new CountDownLatch(1);
59-
60-
observer = new Observer<Integer>() {
57+
public TestSubscriber<Integer> newSubscriber() {
58+
return new TestSubscriber<Integer>(new Observer<Integer>() {
6159
@Override
6260
public void onCompleted() {
63-
latch.countDown();
6461
}
6562

6663
@Override
@@ -72,11 +69,6 @@ public void onError(Throwable e) {
7269
public void onNext(Integer value) {
7370
bh.consume(value);
7471
}
75-
};
76-
77-
}
78-
79-
public void awaitCompletion() throws InterruptedException {
80-
latch.await();
72+
});
8173
}
8274
}

rxjava-core/src/perf/java/rx/operators/OperatorMapPerf.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import rx.Observable.Operator;
2121
import rx.functions.Func1;
2222
import rx.jmh.InputWithIncrementingInteger;
23+
import rx.observers.TestSubscriber;
2324

2425
public class OperatorMapPerf {
2526

2627
@GenerateMicroBenchmark
2728
public void mapIdentityFunction(InputWithIncrementingInteger input) throws InterruptedException {
28-
input.observable.lift(MAP_OPERATOR).subscribe(input.observer);
29-
input.awaitCompletion();
29+
TestSubscriber<Integer> ts = input.newSubscriber();
30+
input.observable.lift(MAP_OPERATOR).subscribe(ts);
31+
ts.awaitTerminalEvent();
3032
}
3133

3234
private static final Func1<Integer, Integer> IDENTITY_FUNCTION = new Func1<Integer, Integer>() {

rxjava-core/src/perf/java/rx/operators/OperatorSerializePerf.java

Lines changed: 77 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.TimeUnit;
1919

2020
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
2121
import org.openjdk.jmh.annotations.Param;
@@ -29,36 +29,97 @@
2929
import rx.Observer;
3030
import rx.Subscriber;
3131
import rx.observers.TestSubscriber;
32+
import rx.schedulers.Schedulers;
3233

3334
public class OperatorSerializePerf {
3435

36+
public static void main(String[] args) {
37+
38+
}
39+
3540
@GenerateMicroBenchmark
3641
public void noSerializationSingleThreaded(Input input) {
37-
input.observable.subscribe(input.subscriber);
42+
TestSubscriber<Long> ts = input.newSubscriber();
43+
input.firehose.subscribe(ts);
44+
ts.awaitTerminalEvent();
3845
}
3946

4047
@GenerateMicroBenchmark
4148
public void serializedSingleStream(Input input) {
42-
input.observable.serialize().subscribe(input.subscriber);
49+
TestSubscriber<Long> ts = input.newSubscriber();
50+
input.firehose.serialize().subscribe(ts);
51+
ts.awaitTerminalEvent();
52+
}
53+
54+
@GenerateMicroBenchmark
55+
public void serializedTwoStreamsSlightlyContended(final Input input) {
56+
TestSubscriber<Long> ts = input.newSubscriber();
57+
Observable.create(new OnSubscribe<Long>() {
58+
59+
@Override
60+
public void call(Subscriber<? super Long> s) {
61+
// break the contract here and concurrently onNext
62+
input.interval.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
63+
input.interval.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
64+
// they will be serialized after
65+
}
66+
67+
}).serialize().subscribe(ts);
68+
ts.awaitTerminalEvent();
4369
}
4470

45-
@State(Scope.Thread)
71+
@GenerateMicroBenchmark
72+
public void serializedTwoStreamsHighlyContended(final Input input) {
73+
TestSubscriber<Long> ts = input.newSubscriber();
74+
Observable.create(new OnSubscribe<Long>() {
75+
76+
@Override
77+
public void call(Subscriber<? super Long> s) {
78+
// break the contract here and concurrently onNext
79+
input.firehose.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
80+
input.firehose.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
81+
// they will be serialized after
82+
}
83+
84+
}).serialize().subscribe(ts);
85+
ts.awaitTerminalEvent();
86+
}
87+
88+
@GenerateMicroBenchmark
89+
public void serializedTwoStreamsOneFastOneSlow(final Input input) {
90+
TestSubscriber<Long> ts = input.newSubscriber();
91+
Observable.create(new OnSubscribe<Long>() {
92+
93+
@Override
94+
public void call(final Subscriber<? super Long> s) {
95+
// break the contract here and concurrently onNext
96+
input.interval.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
97+
input.firehose.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
98+
// they will be serialized after
99+
}
100+
101+
}).serialize().subscribe(ts);
102+
ts.awaitTerminalEvent();
103+
}
104+
105+
@State(Scope.Benchmark)
46106
public static class Input {
47107

48-
@Param({ "1024", "1048576" })
108+
@Param({ "1", "1000" })
49109
public int size;
50110

51-
public Observable<Integer> observable;
52-
public TestSubscriber<Integer> subscriber;
111+
public Observable<Long> firehose;
112+
public Observable<Long> interval;
53113

54-
private CountDownLatch latch;
114+
private BlackHole bh;
55115

56116
@Setup
57117
public void setup(final BlackHole bh) {
58-
observable = Observable.create(new OnSubscribe<Integer>() {
118+
this.bh = bh;
119+
firehose = Observable.create(new OnSubscribe<Long>() {
59120
@Override
60-
public void call(Subscriber<? super Integer> o) {
61-
for (int value = 0; value < size; value++) {
121+
public void call(Subscriber<? super Long> o) {
122+
for (long value = 0; value < size; value++) {
62123
if (o.isUnsubscribed())
63124
return;
64125
o.onNext(value);
@@ -67,12 +128,13 @@ public void call(Subscriber<? super Integer> o) {
67128
}
68129
});
69130

70-
latch = new CountDownLatch(1);
131+
interval = Observable.timer(0, 1, TimeUnit.MILLISECONDS).take(size);
132+
}
71133

72-
subscriber = new TestSubscriber<Integer>(new Observer<Integer>() {
134+
public TestSubscriber<Long> newSubscriber() {
135+
return new TestSubscriber<Long>(new Observer<Long>() {
73136
@Override
74137
public void onCompleted() {
75-
latch.countDown();
76138
}
77139

78140
@Override
@@ -81,15 +143,11 @@ public void onError(Throwable e) {
81143
}
82144

83145
@Override
84-
public void onNext(Integer value) {
146+
public void onNext(Long value) {
85147
bh.consume(value);
86148
}
87149
});
88-
89150
}
90151

91-
public void awaitCompletion() throws InterruptedException {
92-
latch.await();
93-
}
94152
}
95153
}

rxjava-core/src/perf/java/rx/schedulers/ComputationSchedulerPerf.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
1919

2020
import rx.jmh.InputWithIncrementingInteger;
21+
import rx.observers.TestSubscriber;
2122

2223
public class ComputationSchedulerPerf {
2324

2425
@GenerateMicroBenchmark
2526
public void subscribeOn(InputWithIncrementingInteger input) throws InterruptedException {
26-
input.observable.subscribeOn(Schedulers.computation()).subscribe(input.observer);
27-
input.awaitCompletion();
27+
TestSubscriber<Integer> ts = input.newSubscriber();
28+
input.observable.subscribeOn(Schedulers.computation()).subscribe(ts);
29+
ts.awaitTerminalEvent();
2830
}
2931

3032
@GenerateMicroBenchmark
3133
public void observeOn(InputWithIncrementingInteger input) throws InterruptedException {
32-
input.observable.observeOn(Schedulers.computation()).subscribe(input.observer);
33-
input.awaitCompletion();
34+
TestSubscriber<Integer> ts = input.newSubscriber();
35+
input.observable.observeOn(Schedulers.computation()).subscribe(ts);
36+
ts.awaitTerminalEvent();
3437
}
3538
}

rxjava-core/src/perf/java/rx/usecases/PerfTransforms.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import rx.Observable;
2121
import rx.functions.Func1;
22+
import rx.schedulers.Schedulers;
2223

2324
public class PerfTransforms {
2425

@@ -43,25 +44,59 @@ public Integer call(String i) {
4344
}
4445

4546
@GenerateMicroBenchmark
46-
public void flatMapTransformsUsingFrom(UseCaseInput input) throws InterruptedException {
47+
public void flatMapTransforms(UseCaseInput input) throws InterruptedException {
4748
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {
4849

4950
@Override
5051
public Observable<Integer> call(Integer i) {
51-
return Observable.from(i);
52+
return Observable.just(i);
5253
}
5354

5455
}).subscribe(input.observer);
5556
input.awaitCompletion();
5657
}
5758

5859
@GenerateMicroBenchmark
59-
public void flatMapTransformsUsingJust(UseCaseInput input) throws InterruptedException {
60+
public void flatMapNestedMapFilterTake(final UseCaseInput input) throws InterruptedException {
6061
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {
6162

6263
@Override
6364
public Observable<Integer> call(Integer i) {
64-
return Observable.just(i);
65+
return input.observable.map(new Func1<Integer, String>() {
66+
67+
@Override
68+
public String call(Integer i) {
69+
return String.valueOf(i);
70+
}
71+
72+
}).map(new Func1<String, Integer>() {
73+
74+
@Override
75+
public Integer call(String i) {
76+
return Integer.parseInt(i);
77+
}
78+
79+
}).filter(new Func1<Integer, Boolean>() {
80+
81+
@Override
82+
public Boolean call(Integer t1) {
83+
return true;
84+
}
85+
86+
}).take(100);
87+
}
88+
89+
}).subscribe(input.observer);
90+
input.awaitCompletion();
91+
}
92+
93+
@GenerateMicroBenchmark
94+
public void flatMapAsyncNested(final UseCaseInput input) throws InterruptedException {
95+
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {
96+
97+
@Override
98+
public Observable<Integer> call(Integer i) {
99+
return input.observable.subscribeOn(Schedulers.computation());
65100
}
66101

67102
}).subscribe(input.observer);

0 commit comments

Comments
 (0)