File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed
src/perf/java/rx/operators Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change 2828import rx .Observable ;
2929import rx .functions .Func1 ;
3030import rx .jmh .InputWithIncrementingInteger ;
31+ import rx .jmh .LatchedObserver ;
3132import rx .schedulers .Schedulers ;
3233
3334@ BenchmarkMode (Mode .Throughput )
@@ -62,14 +63,16 @@ public Observable<Integer> call(Integer i) {
6263
6364 @ Benchmark
6465 public void flatMapIntPassthruAsync (Input input ) throws InterruptedException {
66+ LatchedObserver <Integer > latchedObserver = input .newLatchedObserver ();
6567 input .observable .flatMap (new Func1 <Integer , Observable <Integer >>() {
6668
6769 @ Override
6870 public Observable <Integer > call (Integer i ) {
6971 return Observable .just (i ).subscribeOn (Schedulers .computation ());
7072 }
7173
72- }).subscribe (input .observer );
74+ }).subscribe (latchedObserver );
75+ latchedObserver .latch .await ();
7376 }
7477
7578 @ Benchmark
You can’t perform that action at this time.
0 commit comments