Skip to content

Commit 4e50ea4

Browse files
authored
2.x: Perf measure of Flowable flatMap & flatMapCompletable (#5902)
1 parent 7bdcb59 commit 4e50ea4

File tree

5 files changed

+159
-2
lines changed

5 files changed

+159
-2
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ targetCompatibility = JavaVersion.VERSION_1_6
5454
def junitVersion = "4.12"
5555
def reactiveStreamsVersion = "1.0.2"
5656
def mockitoVersion = "2.1.0"
57-
def jmhLibVersion = "1.19"
57+
def jmhLibVersion = "1.20"
5858
def testNgVersion = "6.11"
5959
def guavaVersion = "24.0-jre"
6060
def jacocoVersion = "0.8.0"
@@ -201,6 +201,7 @@ jmh {
201201
jmhVersion = jmhLibVersion
202202
humanOutputFile = null
203203
includeTests = false
204+
jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"]
204205

205206
if (project.hasProperty("jmh")) {
206207
include = ".*" + project.jmh + ".*"
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.Arrays;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import org.openjdk.jmh.annotations.*;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
import io.reactivex.functions.Action;
23+
import io.reactivex.internal.functions.Functions;
24+
import io.reactivex.schedulers.Schedulers;
25+
26+
@BenchmarkMode(Mode.Throughput)
27+
@Warmup(iterations = 5)
28+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
29+
@OutputTimeUnit(TimeUnit.SECONDS)
30+
@Fork(value = 1)
31+
@State(Scope.Thread)
32+
public class FlowableFlatMapCompletablePerf implements Action {
33+
34+
@Param({"1", "10", "100", "1000", "10000", "100000", "1000000"})
35+
int items;
36+
37+
@Param({"1", "8", "32", "128", "256"})
38+
int maxConcurrency;
39+
40+
@Param({"1", "10", "100", "1000"})
41+
int work;
42+
43+
Completable flatMapCompletable;
44+
45+
Flowable<Object> flatMap;
46+
47+
@Override
48+
public void run() throws Exception {
49+
Blackhole.consumeCPU(work);
50+
}
51+
52+
@Setup
53+
public void setup() {
54+
Integer[] array = new Integer[items];
55+
Arrays.fill(array, 777);
56+
57+
flatMapCompletable = Flowable.fromArray(array)
58+
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
59+
60+
flatMap = Flowable.fromArray(array)
61+
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
62+
}
63+
64+
// @Benchmark
65+
public Object flatMap(Blackhole bh) {
66+
return flatMap.subscribeWith(new PerfAsyncConsumer(bh)).await(items);
67+
}
68+
69+
@Benchmark
70+
public Object flatMapCompletable(Blackhole bh) {
71+
return flatMapCompletable.subscribeWith(new PerfAsyncConsumer(bh)).await(items);
72+
}
73+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.Arrays;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import org.openjdk.jmh.annotations.*;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
import io.reactivex.internal.functions.Functions;
23+
24+
@BenchmarkMode(Mode.Throughput)
25+
@Warmup(iterations = 5)
26+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@Fork(value = 1)
29+
@State(Scope.Thread)
30+
public class FlowableFlatMapCompletableSyncPerf {
31+
32+
@Param({"1", "10", "100", "1000", "10000", "100000", "1000000"})
33+
int items;
34+
35+
@Param({"1", "8", "32", "128", "256"})
36+
int maxConcurrency;
37+
38+
Completable flatMapCompletable;
39+
40+
Flowable<Object> flatMap;
41+
42+
@Setup
43+
public void setup() {
44+
Integer[] array = new Integer[items];
45+
Arrays.fill(array, 777);
46+
47+
flatMapCompletable = Flowable.fromArray(array)
48+
.flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency);
49+
50+
flatMap = Flowable.fromArray(array)
51+
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), false, maxConcurrency);
52+
}
53+
54+
@Benchmark
55+
public Object flatMap(Blackhole bh) {
56+
return flatMap.subscribeWith(new PerfConsumer(bh));
57+
}
58+
59+
@Benchmark
60+
public Object flatMapCompletable(Blackhole bh) {
61+
return flatMapCompletable.subscribeWith(new PerfConsumer(bh));
62+
}
63+
}

src/jmh/java/io/reactivex/PerfAsyncConsumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public void onComplete() {
6868
/**
6969
* Wait for the terminal signal.
7070
* @param count if less than 1001, a spin-wait is used
71+
* @return this
7172
*/
72-
public void await(int count) {
73+
public PerfAsyncConsumer await(int count) {
7374
if (count <= 1000) {
7475
while (getCount() != 0) { }
7576
} else {
@@ -79,6 +80,7 @@ public void await(int count) {
7980
throw new RuntimeException(ex);
8081
}
8182
}
83+
return this;
8284
}
8385

8486
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.reactivex.disposables.*;
2626
import io.reactivex.exceptions.*;
2727
import io.reactivex.functions.Function;
28+
import io.reactivex.internal.functions.Functions;
2829
import io.reactivex.internal.fuseable.*;
2930
import io.reactivex.observers.*;
3031
import io.reactivex.processors.PublishProcessor;
@@ -523,4 +524,21 @@ public CompletableSource apply(Integer v) throws Exception {
523524
.test()
524525
.assertFailure(TestException.class);
525526
}
527+
528+
@Test
529+
public void asyncMaxConcurrency() {
530+
for (int itemCount = 1; itemCount <= 100000; itemCount *= 10) {
531+
for (int concurrency = 1; concurrency <= 256; concurrency *= 2) {
532+
Flowable.range(1, itemCount)
533+
.flatMapCompletable(
534+
Functions.justFunction(Completable.complete()
535+
.subscribeOn(Schedulers.computation()))
536+
, false, concurrency)
537+
.test()
538+
.withTag("itemCount=" + itemCount + ", concurrency=" + concurrency)
539+
.awaitDone(5, TimeUnit.SECONDS)
540+
.assertResult();
541+
}
542+
}
543+
}
526544
}

0 commit comments

Comments
 (0)