@@ -53,7 +53,22 @@ public int getSize() {
5353 }
5454
5555 @ Benchmark
56- public void mergeSynchronous (final Input input ) throws InterruptedException {
56+ public void merge1SyncStreamOfN (final Input input ) throws InterruptedException {
57+ Observable <Observable <Integer >> os = Observable .just (1 ).map (new Func1 <Integer , Observable <Integer >>() {
58+
59+ @ Override
60+ public Observable <Integer > call (Integer i ) {
61+ return Observable .range (0 , input .size );
62+ }
63+
64+ });
65+ LatchedObserver <Integer > o = input .newLatchedObserver ();
66+ Observable .merge (os ).subscribe (o );
67+ o .latch .await ();
68+ }
69+
70+ @ Benchmark
71+ public void mergeNSyncStreamsOfN (final Input input ) throws InterruptedException {
5772 Observable <Observable <Integer >> os = input .observable .map (new Func1 <Integer , Observable <Integer >>() {
5873
5974 @ Override
@@ -68,7 +83,7 @@ public Observable<Integer> call(Integer i) {
6883 }
6984
7085 @ Benchmark
71- public void mergeAsynchronous (final Input input ) throws InterruptedException {
86+ public void mergeNAsyncStreamsOfN (final Input input ) throws InterruptedException {
7287 Observable <Observable <Integer >> os = input .observable .map (new Func1 <Integer , Observable <Integer >>() {
7388
7489 @ Override
@@ -83,15 +98,15 @@ public Observable<Integer> call(Integer i) {
8398 }
8499
85100 @ Benchmark
86- public void mergeTwoAsyncStreams (final Input input ) throws InterruptedException {
101+ public void mergeTwoAsyncStreamsOfN (final Input input ) throws InterruptedException {
87102 LatchedObserver <Integer > o = input .newLatchedObserver ();
88103 Observable <Integer > ob = Observable .range (0 , input .size ).subscribeOn (Schedulers .computation ());
89104 Observable .merge (ob , ob ).subscribe (o );
90105 o .latch .await ();
91106 }
92107
93108 @ Benchmark
94- public void mergeNStreams (final InputForMergeN input ) throws InterruptedException {
109+ public void mergeNSyncStreamsOf1 (final InputForMergeN input ) throws InterruptedException {
95110 LatchedObserver <Integer > o = input .newLatchedObserver ();
96111 Observable .merge (input .observables ).subscribe (o );
97112 o .latch .await ();
0 commit comments