2121import org .openjdk .jmh .annotations .*;
2222import org .openjdk .jmh .infra .Blackhole ;
2323
24+ import rx .functions .Func1 ;
25+
2426/**
2527 * Benchmark the cost of subscription and initial request management.
2628 * <p>
@@ -38,64 +40,121 @@ public class SubscribingPerf {
3840
3941 @ Benchmark
4042 public void justDirect (Blackhole bh ) {
41- just .subscribe (new DirectSubscriber <Integer >(Long .MAX_VALUE , bh ));
43+ DirectSubscriber <Integer > subscriber = new DirectSubscriber <Integer >(Long .MAX_VALUE , bh );
44+ bh .consume (subscriber );
45+ just .subscribe (subscriber );
4246 }
4347
4448 @ Benchmark
4549 public void justStarted (Blackhole bh ) {
46- just .subscribe (new StartedSubscriber <Integer >(Long .MAX_VALUE , bh ));
50+ StartedSubscriber <Integer > subscriber = new StartedSubscriber <Integer >(Long .MAX_VALUE , bh );
51+ bh .consume (subscriber );
52+ just .subscribe (subscriber );
4753 }
4854
4955 @ Benchmark
5056 public void justUsual (Blackhole bh ) {
51- just .subscribe (new UsualSubscriber <Integer >(Long .MAX_VALUE , bh ));
57+ UsualSubscriber <Integer > subscriber = new UsualSubscriber <Integer >(Long .MAX_VALUE , bh );
58+ bh .consume (subscriber );
59+ just .subscribe (subscriber );
5260 }
5361
5462 @ Benchmark
5563 public void rangeDirect (Blackhole bh ) {
56- range .subscribe (new DirectSubscriber <Integer >(Long .MAX_VALUE , bh ));
64+ DirectSubscriber <Integer > subscriber = new DirectSubscriber <Integer >(Long .MAX_VALUE , bh );
65+ bh .consume (subscriber );
66+ range .subscribe (subscriber );
5767 }
5868
5969 @ Benchmark
6070 public void rangeStarted (Blackhole bh ) {
61- range .subscribe (new DirectSubscriber <Integer >(Long .MAX_VALUE , bh ));
71+ StartedSubscriber <Integer > subscriber = new StartedSubscriber <Integer >(Long .MAX_VALUE , bh );
72+ bh .consume (subscriber );
73+ range .subscribe (subscriber );
6274 }
6375
6476 @ Benchmark
6577 public void rangeUsual (Blackhole bh ) {
66- range .subscribe (new UsualSubscriber <Integer >(Long .MAX_VALUE , bh ));
78+ UsualSubscriber <Integer > subscriber = new UsualSubscriber <Integer >(Long .MAX_VALUE , bh );
79+ bh .consume (subscriber );
80+ range .subscribe (subscriber );
6781 }
6882
6983 @ Benchmark
7084 public void justDirectUnsafe (Blackhole bh ) {
71- just .unsafeSubscribe (new DirectSubscriber <Integer >(Long .MAX_VALUE , bh ));
85+ DirectSubscriber <Integer > subscriber = new DirectSubscriber <Integer >(Long .MAX_VALUE , bh );
86+ bh .consume (subscriber );
87+ just .unsafeSubscribe (subscriber );
7288 }
7389
7490 @ Benchmark
7591 public void justStartedUnsafe (Blackhole bh ) {
76- just .unsafeSubscribe (new StartedSubscriber <Integer >(Long .MAX_VALUE , bh ));
92+ StartedSubscriber <Integer > subscriber = new StartedSubscriber <Integer >(Long .MAX_VALUE , bh );
93+ bh .consume (subscriber );
94+ just .unsafeSubscribe (subscriber );
7795 }
7896
7997 @ Benchmark
8098 public void justUsualUnsafe (Blackhole bh ) {
81- just .unsafeSubscribe (new UsualSubscriber <Integer >(Long .MAX_VALUE , bh ));
99+ UsualSubscriber <Integer > subscriber = new UsualSubscriber <Integer >(Long .MAX_VALUE , bh );
100+ bh .consume (subscriber );
101+ just .unsafeSubscribe (subscriber );
82102 }
83103
84104 @ Benchmark
85105 public void rangeDirectUnsafe (Blackhole bh ) {
86- range .unsafeSubscribe (new DirectSubscriber <Integer >(Long .MAX_VALUE , bh ));
106+ DirectSubscriber <Integer > subscriber = new DirectSubscriber <Integer >(Long .MAX_VALUE , bh );
107+ bh .consume (subscriber );
108+ range .unsafeSubscribe (subscriber );
87109 }
88110
89111 @ Benchmark
90112 public void rangeStartedUnsafe (Blackhole bh ) {
91- range .unsafeSubscribe (new DirectSubscriber <Integer >(Long .MAX_VALUE , bh ));
113+ StartedSubscriber <Integer > subscriber = new StartedSubscriber <Integer >(Long .MAX_VALUE , bh );
114+ bh .consume (subscriber );
115+ range .unsafeSubscribe (subscriber );
92116 }
93117
94118 @ Benchmark
95119 public void rangeUsualUnsafe (Blackhole bh ) {
96- range .unsafeSubscribe (new UsualSubscriber <Integer >(Long .MAX_VALUE , bh ));
120+ UsualSubscriber <Integer > subscriber = new UsualSubscriber <Integer >(Long .MAX_VALUE , bh );
121+ bh .consume (subscriber );
122+ range .unsafeSubscribe (subscriber );
97123 }
98124
125+ @ State (Scope .Thread )
126+ public static class Chain {
127+ @ Param ({"10" , "1000" , "1000000" })
128+ public int times ;
129+
130+ @ Param ({"1" , "2" , "3" , "4" , "5" })
131+ public int maps ;
132+
133+ Observable <Integer > source ;
134+
135+ @ Setup
136+ public void setup () {
137+ Observable <Integer > o = Observable .range (1 , times );
138+
139+ for (int i = 0 ; i < maps ; i ++) {
140+ o = o .map (new Func1 <Integer , Integer >() {
141+ @ Override
142+ public Integer call (Integer v ) {
143+ return v + 1 ;
144+ }
145+ });
146+ }
147+
148+ source = o ;
149+ }
150+
151+ @ Benchmark
152+ public void mapped (Chain c , Blackhole bh ) {
153+ DirectSubscriber <Integer > subscriber = new DirectSubscriber <Integer >(Long .MAX_VALUE , bh );
154+ bh .consume (subscriber );
155+ c .source .subscribe (subscriber );
156+ }
157+ }
99158
100159 static final class DirectSubscriber <T > extends Subscriber <T > {
101160 final long r ;
0 commit comments