34
34
import rx .internal .util .UtilityFunctions ;
35
35
36
36
/**
37
- * An extension of {@link Observable} that provides blocking operators.
37
+ * {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be
38
+ * useful for testing and demo purposes, but is generally inappropriate for production applications (if you
39
+ * think you need to use a {@code BlockingObservable} this is usually a sign that you should rethink your
40
+ * design).
38
41
* <p>
39
42
* You construct a {@code BlockingObservable} from an {@code Observable} with {@link #from(Observable)} or
40
43
* {@link Observable#toBlocking()}.
43
46
* illustrate blocking operators. The following legend explains these marble diagrams:
44
47
* <p>
45
48
* <img width="640" height="301" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.legend.png" alt="">
46
- * <p>
47
- * For more information see the
48
- * <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">Blocking
49
- * Observable Operators</a> page at the RxJava Wiki.
50
49
*
50
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">RxJava wiki: Blocking
51
+ * Observable Operators</a>
51
52
* @param <T>
52
53
* the type of item emitted by the {@code BlockingObservable}
53
54
*/
@@ -85,7 +86,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
85
86
* the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable}
86
87
* @throws RuntimeException
87
88
* if an error occurs
88
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#foreach">RxJava Wiki : forEach()</a>
89
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#foreach">RxJava wiki : forEach()</a>
89
90
*/
90
91
public void forEach (final Action1 <? super T > onNext ) {
91
92
final CountDownLatch latch = new CountDownLatch (1 );
@@ -146,7 +147,7 @@ public void onNext(T args) {
146
147
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.getIterator.png" alt="">
147
148
*
148
149
* @return an {@link Iterator} that can iterate over the items emitted by this {@code BlockingObservable}
149
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki : getIterator()</a>
150
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava wiki : getIterator()</a>
150
151
*/
151
152
public Iterator <T > getIterator () {
152
153
return BlockingOperatorToIterator .toIterator (o );
@@ -159,7 +160,7 @@ public Iterator<T> getIterator() {
159
160
* @return the first item emitted by this {@code BlockingObservable}
160
161
* @throws NoSuchElementException
161
162
* if this {@code BlockingObservable} emits no items
162
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki : first()</a>
163
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava wiki : first()</a>
163
164
*/
164
165
public T first () {
165
166
return blockForSingle (o .first ());
@@ -174,7 +175,7 @@ public T first() {
174
175
* @return the first item emitted by this {@code BlockingObservable} that matches the predicate
175
176
* @throws NoSuchElementException
176
177
* if this {@code BlockingObservable} emits no such items
177
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki : first()</a>
178
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava wiki : first()</a>
178
179
*/
179
180
public T first (Func1 <? super T , Boolean > predicate ) {
180
181
return blockForSingle (o .first (predicate ));
@@ -188,7 +189,7 @@ public T first(Func1<? super T, Boolean> predicate) {
188
189
* a default value to return if this {@code BlockingObservable} emits no items
189
190
* @return the first item emitted by this {@code BlockingObservable}, or the default value if it emits no
190
191
* items
191
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki : firstOrDefault()</a>
192
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava wiki : firstOrDefault()</a>
192
193
*/
193
194
public T firstOrDefault (T defaultValue ) {
194
195
return blockForSingle (o .map (UtilityFunctions .<T >identity ()).firstOrDefault (defaultValue ));
@@ -204,7 +205,7 @@ public T firstOrDefault(T defaultValue) {
204
205
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
205
206
* @return the first item emitted by this {@code BlockingObservable} that matches the predicate, or the
206
207
* default value if this {@code BlockingObservable} emits no matching items
207
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki : firstOrDefault()</a>
208
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava wiki : firstOrDefault()</a>
208
209
*/
209
210
public T firstOrDefault (T defaultValue , Func1 <? super T , Boolean > predicate ) {
210
211
return blockForSingle (o .filter (predicate ).map (UtilityFunctions .<T >identity ()).firstOrDefault (defaultValue ));
@@ -219,7 +220,7 @@ public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
219
220
* @return the last item emitted by this {@code BlockingObservable}
220
221
* @throws NoSuchElementException
221
222
* if this {@code BlockingObservable} emits no items
222
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki : last()</a>
223
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava wiki : last()</a>
223
224
*/
224
225
public T last () {
225
226
return blockForSingle (o .last ());
@@ -236,7 +237,7 @@ public T last() {
236
237
* @return the last item emitted by the {@code BlockingObservable} that matches the predicate
237
238
* @throws NoSuchElementException
238
239
* if this {@code BlockingObservable} emits no items
239
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki : last()</a>
240
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava wiki : last()</a>
240
241
*/
241
242
public T last (final Func1 <? super T , Boolean > predicate ) {
242
243
return blockForSingle (o .last (predicate ));
@@ -252,7 +253,7 @@ public T last(final Func1<? super T, Boolean> predicate) {
252
253
* a default value to return if this {@code BlockingObservable} emits no items
253
254
* @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
254
255
* items
255
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki : lastOrDefault()</a>
256
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava wiki : lastOrDefault()</a>
256
257
*/
257
258
public T lastOrDefault (T defaultValue ) {
258
259
return blockForSingle (o .map (UtilityFunctions .<T >identity ()).lastOrDefault (defaultValue ));
@@ -270,7 +271,7 @@ public T lastOrDefault(T defaultValue) {
270
271
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
271
272
* @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
272
273
* default value if it emits no matching items
273
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki : lastOrDefault()</a>
274
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava wiki : lastOrDefault()</a>
274
275
*/
275
276
public T lastOrDefault (T defaultValue , Func1 <? super T , Boolean > predicate ) {
276
277
return blockForSingle (o .filter (predicate ).map (UtilityFunctions .<T >identity ()).lastOrDefault (defaultValue ));
@@ -301,7 +302,7 @@ public Iterable<T> mostRecent(T initialValue) {
301
302
*
302
303
* @return an {@link Iterable} that blocks upon each iteration until this {@code BlockingObservable} emits
303
304
* a new item, whereupon the Iterable returns that item
304
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#next">RxJava Wiki : next()</a>
305
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#next">RxJava wiki : next()</a>
305
306
*/
306
307
public Iterable <T > next () {
307
308
return BlockingOperatorNext .next (o );
@@ -331,7 +332,7 @@ public Iterable<T> latest() {
331
332
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
332
333
*
333
334
* @return the single item emitted by this {@code BlockingObservable}
334
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava Wiki : single()</a>
335
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava wiki : single()</a>
335
336
*/
336
337
public T single () {
337
338
return blockForSingle (o .single ());
@@ -346,7 +347,7 @@ public T single() {
346
347
* @param predicate
347
348
* a predicate function to evaluate items emitted by this {@link BlockingObservable}
348
349
* @return the single item emitted by this {@code BlockingObservable} that matches the predicate
349
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava Wiki : single()</a>
350
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava wiki : single()</a>
350
351
*/
351
352
public T single (Func1 <? super T , Boolean > predicate ) {
352
353
return blockForSingle (o .single (predicate ));
@@ -363,7 +364,7 @@ public T single(Func1<? super T, Boolean> predicate) {
363
364
* a default value to return if this {@code BlockingObservable} emits no items
364
365
* @return the single item emitted by this {@code BlockingObservable}, or the default value if it emits no
365
366
* items
366
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava Wiki : singleOrDefault()</a>
367
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava wiki : singleOrDefault()</a>
367
368
*/
368
369
public T singleOrDefault (T defaultValue ) {
369
370
return blockForSingle (o .map (UtilityFunctions .<T >identity ()).singleOrDefault (defaultValue ));
@@ -382,7 +383,7 @@ public T singleOrDefault(T defaultValue) {
382
383
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
383
384
* @return the single item emitted by the {@code BlockingObservable} that matches the predicate, or the
384
385
* default value if no such items are emitted
385
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava Wiki : singleOrDefault()</a>
386
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#single-and-singleordefault">RxJava wiki : singleOrDefault()</a>
386
387
*/
387
388
public T singleOrDefault (T defaultValue , Func1 <? super T , Boolean > predicate ) {
388
389
return blockForSingle (o .filter (predicate ).map (UtilityFunctions .<T >identity ()).singleOrDefault (defaultValue ));
@@ -400,7 +401,7 @@ public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
400
401
* <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
401
402
*
402
403
* @return a {@link Future} that expects a single item to be emitted by this {@code BlockingObservable}
403
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki : toFuture()</a>
404
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava wiki : toFuture()</a>
404
405
*/
405
406
public Future <T > toFuture () {
406
407
return BlockingOperatorToFuture .toFuture (o );
@@ -412,7 +413,7 @@ public Future<T> toFuture() {
412
413
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterable.png" alt="">
413
414
*
414
415
* @return an {@link Iterable} version of this {@code BlockingObservable}
415
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki : toIterable()</a>
416
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava wiki : toIterable()</a>
416
417
*/
417
418
public Iterable <T > toIterable () {
418
419
return new Iterable <T >() {
@@ -428,7 +429,7 @@ public Iterator<T> iterator() {
428
429
* <p>
429
430
* If the {@link Observable} errors, it will be thrown right away.
430
431
*
431
- * @return the actual item.
432
+ * @return the actual item
432
433
*/
433
434
private T blockForSingle (final Observable <? extends T > observable ) {
434
435
final AtomicReference <T > returnItem = new AtomicReference <T >();
0 commit comments