Skip to content

Commit bf079fe

Browse files
Merge pull request #1635 from daschl/blockingperf
Optimize single BlockingObservable operations.
2 parents 0aab682 + 3816f47 commit bf079fe

File tree

4 files changed

+162
-37
lines changed

4 files changed

+162
-37
lines changed

rxjava/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2014 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -79,7 +79,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
7979
* need the {@link Subscriber#onCompleted()} or {@link Subscriber#onError(Throwable)} methods.
8080
* <p>
8181
* <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt="">
82-
*
82+
*
8383
* @param onNext
8484
* the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable}
8585
* @throws RuntimeException
@@ -143,7 +143,7 @@ public void onNext(T args) {
143143
* Returns an {@link Iterator} that iterates over all items emitted by this {@code BlockingObservable}.
144144
* <p>
145145
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.getIterator.png" alt="">
146-
*
146+
*
147147
* @return an {@link Iterator} that can iterate over the items emitted by this {@code BlockingObservable}
148148
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: getIterator()</a>
149149
*/
@@ -154,21 +154,21 @@ public Iterator<T> getIterator() {
154154
/**
155155
* Returns the first item emitted by this {@code BlockingObservable}, or throws
156156
* {@code NoSuchElementException} if it emits no items.
157-
*
157+
*
158158
* @return the first item emitted by this {@code BlockingObservable}
159159
* @throws NoSuchElementException
160160
* if this {@code BlockingObservable} emits no items
161161
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki: first()</a>
162162
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
163163
*/
164164
public T first() {
165-
return from(o.first()).single();
165+
return blockForSingle(o.first());
166166
}
167167

168168
/**
169169
* Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or throws
170170
* {@code NoSuchElementException} if it emits no such item.
171-
*
171+
*
172172
* @param predicate
173173
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
174174
* @return the first item emitted by this {@code BlockingObservable} that matches the predicate
@@ -178,13 +178,13 @@ public T first() {
178178
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229739.aspx">MSDN: Observable.First</a>
179179
*/
180180
public T first(Func1<? super T, Boolean> predicate) {
181-
return from(o.first(predicate)).single();
181+
return blockForSingle(o.first(predicate));
182182
}
183183

184184
/**
185185
* Returns the first item emitted by this {@code BlockingObservable}, or a default value if it emits no
186186
* items.
187-
*
187+
*
188188
* @param defaultValue
189189
* a default value to return if this {@code BlockingObservable} emits no items
190190
* @return the first item emitted by this {@code BlockingObservable}, or the default value if it emits no
@@ -193,13 +193,13 @@ public T first(Func1<? super T, Boolean> predicate) {
193193
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320.aspx">MSDN: Observable.FirstOrDefault</a>
194194
*/
195195
public T firstOrDefault(T defaultValue) {
196-
return from(o.take(1)).singleOrDefault(defaultValue);
196+
return blockForSingle(o.map(Functions.<T>identity()).firstOrDefault(defaultValue));
197197
}
198198

199199
/**
200200
* Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or a default
201201
* value if it emits no such items.
202-
*
202+
*
203203
* @param defaultValue
204204
* a default value to return if this {@code BlockingObservable} emits no matching items
205205
* @param predicate
@@ -210,31 +210,31 @@ public T firstOrDefault(T defaultValue) {
210210
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759.aspx">MSDN: Observable.FirstOrDefault</a>
211211
*/
212212
public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
213-
return from(o.filter(predicate)).firstOrDefault(defaultValue);
213+
return blockForSingle(o.filter(predicate).map(Functions.<T>identity()).firstOrDefault(defaultValue));
214214
}
215215

216216
/**
217217
* Returns the last item emitted by this {@code BlockingObservable}, or throws
218218
* {@code NoSuchElementException} if this {@code BlockingObservable} emits no items.
219219
* <p>
220220
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.png" alt="">
221-
*
221+
*
222222
* @return the last item emitted by this {@code BlockingObservable}
223223
* @throws NoSuchElementException
224224
* if this {@code BlockingObservable} emits no items
225225
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: last()</a>
226226
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.last.aspx">MSDN: Observable.Last</a>
227227
*/
228228
public T last() {
229-
return from(o.last()).single();
229+
return blockForSingle(o.last());
230230
}
231231

232232
/**
233233
* Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or throws
234234
* {@code NoSuchElementException} if it emits no such items.
235235
* <p>
236236
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.p.png" alt="">
237-
*
237+
*
238238
* @param predicate
239239
* a predicate function to evaluate items emitted by the {@code BlockingObservable}
240240
* @return the last item emitted by the {@code BlockingObservable} that matches the predicate
@@ -244,15 +244,15 @@ public T last() {
244244
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.last.aspx">MSDN: Observable.Last</a>
245245
*/
246246
public T last(final Func1<? super T, Boolean> predicate) {
247-
return from(o.last(predicate)).single();
247+
return blockForSingle(o.last(predicate));
248248
}
249249

250250
/**
251251
* Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
252252
* items.
253253
* <p>
254254
* <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
255-
*
255+
*
256256
* @param defaultValue
257257
* a default value to return if this {@code BlockingObservable} emits no items
258258
* @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
@@ -261,15 +261,15 @@ public T last(final Func1<? super T, Boolean> predicate) {
261261
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
262262
*/
263263
public T lastOrDefault(T defaultValue) {
264-
return from(o.takeLast(1)).singleOrDefault(defaultValue);
264+
return blockForSingle(o.map(Functions.<T>identity()).lastOrDefault(defaultValue));
265265
}
266266

267267
/**
268268
* Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
269269
* value if it emits no such items.
270270
* <p>
271271
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
272-
*
272+
*
273273
* @param defaultValue
274274
* a default value to return if this {@code BlockingObservable} emits no matching items
275275
* @param predicate
@@ -280,15 +280,15 @@ public T lastOrDefault(T defaultValue) {
280280
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
281281
*/
282282
public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
283-
return from(o.filter(predicate)).lastOrDefault(defaultValue);
283+
return blockForSingle(o.filter(predicate).map(Functions.<T>identity()).lastOrDefault(defaultValue));
284284
}
285285

286286
/**
287287
* Returns an {@link Iterable} that always returns the item most recently emitted by this
288288
* {@code BlockingObservable}.
289289
* <p>
290290
* <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
291-
*
291+
*
292292
* @param initialValue
293293
* the initial value that the {@link Iterable} sequence will yield if this
294294
* {@code BlockingObservable} has not yet emitted an item
@@ -306,7 +306,7 @@ public Iterable<T> mostRecent(T initialValue) {
306306
* returns that item.
307307
* <p>
308308
* <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt="">
309-
*
309+
*
310310
* @return an {@link Iterable} that blocks upon each iteration until this {@code BlockingObservable} emits
311311
* a new item, whereupon the Iterable returns that item
312312
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#next">RxJava Wiki: next()</a>
@@ -325,7 +325,7 @@ public Iterable<T> next() {
325325
* <p>
326326
* Note also that an {@code onNext} directly followed by {@code onCompleted} might hide the {@code onNext}
327327
* event.
328-
*
328+
*
329329
* @return an Iterable that always returns the latest item emitted by this {@code BlockingObservable}
330330
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#latest">RxJava wiki: latest()</a>
331331
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212115.aspx">MSDN: Observable.Latest</a>
@@ -339,29 +339,29 @@ public Iterable<T> latest() {
339339
* throw a {@code NoSuchElementException}.
340340
* <p>
341341
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
342-
*
342+
*
343343
* @return the single item emitted by this {@code BlockingObservable}
344344
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava Wiki: single()</a>
345345
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.single.aspx">MSDN: Observable.Single</a>
346346
*/
347347
public T single() {
348-
return from(o.single()).toIterable().iterator().next();
348+
return blockForSingle(o.single());
349349
}
350350

351351
/**
352352
* If this {@code BlockingObservable} completes after emitting a single item that matches a given predicate,
353353
* return that item, otherwise throw a {@code NoSuchElementException}.
354354
* <p>
355355
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.p.png" alt="">
356-
*
356+
*
357357
* @param predicate
358358
* a predicate function to evaluate items emitted by this {@link BlockingObservable}
359359
* @return the single item emitted by this {@code BlockingObservable} that matches the predicate
360360
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava Wiki: single()</a>
361361
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.single.aspx">MSDN: Observable.Single</a>
362362
*/
363363
public T single(Func1<? super T, Boolean> predicate) {
364-
return from(o.single(predicate)).toIterable().iterator().next();
364+
return blockForSingle(o.single(predicate));
365365
}
366366

367367
/**
@@ -370,7 +370,7 @@ public T single(Func1<? super T, Boolean> predicate) {
370370
* value.
371371
* <p>
372372
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.png" alt="">
373-
*
373+
*
374374
* @param defaultValue
375375
* a default value to return if this {@code BlockingObservable} emits no items
376376
* @return the single item emitted by this {@code BlockingObservable}, or the default value if it emits no
@@ -379,7 +379,7 @@ public T single(Func1<? super T, Boolean> predicate) {
379379
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.singleordefault.aspx">MSDN: Observable.SingleOrDefault</a>
380380
*/
381381
public T singleOrDefault(T defaultValue) {
382-
return from(o.map(Functions.<T>identity()).singleOrDefault(defaultValue)).single();
382+
return blockForSingle(o.map(Functions.<T>identity()).singleOrDefault(defaultValue));
383383
}
384384

385385
/**
@@ -388,7 +388,7 @@ public T singleOrDefault(T defaultValue) {
388388
* emits no items, return a default value.
389389
* <p>
390390
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.p.png" alt="">
391-
*
391+
*
392392
* @param defaultValue
393393
* a default value to return if this {@code BlockingObservable} emits no matching items
394394
* @param predicate
@@ -399,7 +399,7 @@ public T singleOrDefault(T defaultValue) {
399399
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.singleordefault.aspx">MSDN: Observable.SingleOrDefault</a>
400400
*/
401401
public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
402-
return from(o.filter(predicate)).singleOrDefault(defaultValue);
402+
return blockForSingle(o.filter(predicate).map(Functions.<T>identity()).singleOrDefault(defaultValue));
403403
}
404404

405405
/**
@@ -412,7 +412,7 @@ public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
412412
* If the {@code BlockingObservable} may emit more than one item, use {@code Observable.toList().toBlocking().toFuture()}.
413413
* <p>
414414
* <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
415-
*
415+
*
416416
* @return a {@link Future} that expects a single item to be emitted by this {@code BlockingObservable}
417417
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: toFuture()</a>
418418
*/
@@ -424,7 +424,7 @@ public Future<T> toFuture() {
424424
* Converts this {@code BlockingObservable} into an {@link Iterable}.
425425
* <p>
426426
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterable.png" alt="">
427-
*
427+
*
428428
* @return an {@link Iterable} version of this {@code BlockingObservable}
429429
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: toIterable()</a>
430430
*/
@@ -436,4 +436,52 @@ public Iterator<T> iterator() {
436436
}
437437
};
438438
}
439+
440+
/**
441+
* Helper method which handles the actual blocking for a single response.
442+
* <p>
443+
* If the {@link Observable} errors, it will be thrown right away.
444+
*
445+
* @return the actual item.
446+
*/
447+
private T blockForSingle(final Observable<? extends T> observable) {
448+
final AtomicReference<T> returnItem = new AtomicReference<T>();
449+
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
450+
final CountDownLatch latch = new CountDownLatch(1);
451+
452+
observable.subscribe(new Subscriber<T>() {
453+
@Override
454+
public void onCompleted() {
455+
latch.countDown();
456+
}
457+
458+
@Override
459+
public void onError(final Throwable e) {
460+
returnException.set(e);
461+
latch.countDown();
462+
}
463+
464+
@Override
465+
public void onNext(final T item) {
466+
returnItem.set(item);
467+
}
468+
});
469+
470+
try {
471+
latch.await();
472+
} catch (InterruptedException e) {
473+
Thread.currentThread().interrupt();
474+
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
475+
}
476+
477+
if (returnException.get() != null) {
478+
if (returnException.get() instanceof RuntimeException) {
479+
throw (RuntimeException) returnException.get();
480+
} else {
481+
throw new RuntimeException(returnException.get());
482+
}
483+
}
484+
485+
return returnItem.get();
486+
}
439487
}

0 commit comments

Comments
 (0)