Skip to content

Commit 96064c3

Browse files
committed
Implement the blocking/non-blocking single, singleOrDefault, first, firstOrDefault, last, lastOrDefault
1 parent e20613b commit 96064c3

File tree

9 files changed

+934
-281
lines changed

9 files changed

+934
-281
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 127 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,12 @@
5252
import rx.operators.OperationElementAt;
5353
import rx.operators.OperationFilter;
5454
import rx.operators.OperationFinally;
55-
import rx.operators.OperationFirstOrDefault;
5655
import rx.operators.OperationGroupBy;
5756
import rx.operators.OperationGroupByUntil;
5857
import rx.operators.OperationGroupJoin;
5958
import rx.operators.OperationInterval;
6059
import rx.operators.OperationJoin;
6160
import rx.operators.OperationJoinPatterns;
62-
import rx.operators.OperationLast;
6361
import rx.operators.OperationMap;
6462
import rx.operators.OperationMaterialize;
6563
import rx.operators.OperationMerge;
@@ -78,6 +76,7 @@
7876
import rx.operators.OperationSample;
7977
import rx.operators.OperationScan;
8078
import rx.operators.OperationSequenceEqual;
79+
import rx.operators.OperationSingle;
8180
import rx.operators.OperationSkip;
8281
import rx.operators.OperationSkipLast;
8382
import rx.operators.OperationSkipUntil;
@@ -5091,6 +5090,78 @@ public Observable<T> skip(int num) {
50915090
return create(OperationSkip.skip(this, num));
50925091
}
50935092

5093+
/**
5094+
* If the Observable completes after emitting a single item, return an
5095+
* Observable containing that item. If it emits more than one item or no
5096+
* item, throw an IllegalArgumentException.
5097+
*
5098+
* @return an Observable containing the single item emitted by the source
5099+
* Observable that matches the predicate.
5100+
* @throws IllegalArgumentException
5101+
* if the source emits more than one item or no item
5102+
*/
5103+
public Observable<T> single() {
5104+
return create(OperationSingle.<T> single(this));
5105+
}
5106+
5107+
/**
5108+
* If the Observable completes after emitting a single item that matches a
5109+
* predicate, return an Observable containing that item. If it emits more
5110+
* than one such item or no item, throw an IllegalArgumentException.
5111+
*
5112+
* @param predicate
5113+
* a predicate function to evaluate items emitted by the source
5114+
* Observable
5115+
* @return an Observable containing the single item emitted by the source
5116+
* Observable that matches the predicate.
5117+
* @throws IllegalArgumentException
5118+
* if the source emits more than one item or no item matching
5119+
* the predicate
5120+
*/
5121+
public Observable<T> single(Func1<? super T, Boolean> predicate) {
5122+
return filter(predicate).single();
5123+
}
5124+
5125+
/**
5126+
* If the Observable completes after emitting a single item, return an
5127+
* Observable containing that item. If it's empty, return an Observable
5128+
* containing the defaultValue. If it emits more than one item, throw an
5129+
* IllegalArgumentException.
5130+
*
5131+
* @param defaultValue
5132+
* a default value to return if the Observable emits no item
5133+
* @return an Observable containing the single item emitted by the source
5134+
* Observable, or an Observable containing the defaultValue if no
5135+
* item.
5136+
* @throws IllegalArgumentException
5137+
* if the source emits more than one item
5138+
*/
5139+
public Observable<T> singleOrDefault(T defaultValue) {
5140+
return create(OperationSingle.<T> singleOrDefault(this, defaultValue));
5141+
}
5142+
5143+
/**
5144+
* If the Observable completes after emitting a single item that matches a
5145+
* predicate, return an Observable containing that item. If it emits no such
5146+
* item, return an Observable containing the defaultValue. If it emits more
5147+
* than one such item, throw an IllegalArgumentException.
5148+
*
5149+
* @param defaultValue
5150+
* a default value to return if the {@link Observable} emits no
5151+
* matching items
5152+
* @param predicate
5153+
* a predicate function to evaluate items emitted by the
5154+
* Observable
5155+
* @return an Observable containing the single item emitted by the source
5156+
* Observable that matches the predicate, or an Observable
5157+
* containing the defaultValue if no item matches the predicate
5158+
* @throws IllegalArgumentException
5159+
* if the source emits more than one item matching the predicate
5160+
*/
5161+
public Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
5162+
return filter(predicate).singleOrDefault(defaultValue);
5163+
}
5164+
50945165
/**
50955166
* Returns an Observable that emits only the very first item emitted by the
50965167
* source Observable, or an <code>IllegalArgumentException</code> if the source
@@ -5103,7 +5174,7 @@ public Observable<T> skip(int num) {
51035174
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
51045175
*/
51055176
public Observable<T> first() {
5106-
return takeFirst().last();
5177+
return take(1).single();
51075178
}
51085179

51095180
/**
@@ -5119,7 +5190,7 @@ public Observable<T> first() {
51195190
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
51205191
*/
51215192
public Observable<T> first(Func1<? super T, Boolean> predicate) {
5122-
return takeFirst(predicate).last();
5193+
return takeFirst(predicate).single();
51235194
}
51245195

51255196
/**
@@ -5137,7 +5208,7 @@ public Observable<T> first(Func1<? super T, Boolean> predicate) {
51375208
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320.aspx">MSDN: Observable.FirstOrDefault</a>
51385209
*/
51395210
public Observable<T> firstOrDefault(T defaultValue) {
5140-
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
5211+
return take(1).singleOrDefault(defaultValue);
51415212
}
51425213

51435214
/**
@@ -5155,8 +5226,8 @@ public Observable<T> firstOrDefault(T defaultValue) {
51555226
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#firstordefault">RxJava Wiki: firstOrDefault()</a>
51565227
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759.aspx">MSDN: Observable.FirstOrDefault</a>
51575228
*/
5158-
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
5159-
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
5229+
public Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
5230+
return takeFirst(predicate).singleOrDefault(defaultValue);
51605231
}
51615232

51625233
/**
@@ -5268,7 +5339,7 @@ public Observable<T> takeFirst() {
52685339
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
52695340
*/
52705341
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
5271-
return skipWhile(not(predicate)).take(1);
5342+
return filter(predicate).take(1);
52725343
}
52735344

52745345
/**
@@ -5723,7 +5794,7 @@ public <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? sup
57235794
public Observable<Boolean> isEmpty() {
57245795
return create(OperationAny.isEmpty(this));
57255796
}
5726-
5797+
57275798
/**
57285799
* Returns an Observable that emits the last item emitted by the source or
57295800
* notifies observers of an <code>IllegalArgumentException</code> if the
@@ -5736,7 +5807,53 @@ public Observable<Boolean> isEmpty() {
57365807
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#last">RxJava Wiki: last()</a>
57375808
*/
57385809
public Observable<T> last() {
5739-
return create(OperationLast.last(this));
5810+
return takeLast(1).single();
5811+
}
5812+
5813+
/**
5814+
* Returns an Observable that emits only the last item emitted by the source
5815+
* Observable that satisfies a given condition, or an
5816+
* IllegalArgumentException if no such items are emitted.
5817+
*
5818+
* @param predicate
5819+
* the condition any source emitted item has to satisfy
5820+
* @return an Observable that emits only the last item satisfying the given
5821+
* condition from the source, or an IllegalArgumentException if no
5822+
* such items are emitted.
5823+
* @throws IllegalArgumentException
5824+
* if no such itmes are emmited
5825+
*/
5826+
public Observable<T> last(Func1<? super T, Boolean> predicate) {
5827+
return filter(predicate).takeLast(1).single();
5828+
}
5829+
5830+
/**
5831+
* Returns an Observable that emits only the last item emitted by the source
5832+
* Observable, or a default item if the source is empty.
5833+
*
5834+
* @param defaultValue
5835+
* the default item to emit if the source Observable is empty
5836+
* @return an Observable that emits only the last item from the source, or a
5837+
* default item if the source is empty
5838+
*/
5839+
public Observable<T> lastOrDefault(T defaultValue) {
5840+
return takeLast(1).singleOrDefault(defaultValue);
5841+
}
5842+
5843+
/**
5844+
* Returns an Observable that emits only the last item emitted by the source
5845+
* Observable that satisfies a given condition, or a default item otherwise.
5846+
*
5847+
* @param defaultValue
5848+
* the default item to emit if the source Observable doesn't emit
5849+
* anything that satisfies the given condition
5850+
* @param predicate
5851+
* the condition any source emitted item has to satisfy
5852+
* @return an Observable that emits only the last item from the source that
5853+
* satisfies the given condition, or a default item otherwise
5854+
*/
5855+
public Observable<T> lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
5856+
return filter(predicate).takeLast(1).singleOrDefault(defaultValue);
57405857
}
57415858

57425859
/**

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 28 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,6 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
6262
return new BlockingObservable<T>(o);
6363
}
6464

65-
private static <T> T _singleOrDefault(BlockingObservable<? extends T> source, boolean hasDefault, T defaultValue) {
66-
Iterator<? extends T> it = source.toIterable().iterator();
67-
68-
if (!it.hasNext()) {
69-
if (hasDefault) {
70-
return defaultValue;
71-
}
72-
throw new IllegalStateException("Expected single entry. Actually empty stream.");
73-
}
74-
75-
T result = it.next();
76-
77-
if (it.hasNext()) {
78-
throw new IllegalStateException("Expected single entry. Actually more than one entry.");
79-
}
80-
81-
return result;
82-
}
83-
8465
/**
8566
* Used for protecting against errors being thrown from {@link Observer} implementations and
8667
* ensuring onNext/onError/onCompleted contract compliance.
@@ -211,20 +192,7 @@ public T first(Func1<? super T, Boolean> predicate) {
211192
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320(v=vs.103).aspx">MSDN: Observable.FirstOrDefault</a>
212193
*/
213194
public T firstOrDefault(T defaultValue) {
214-
boolean found = false;
215-
T result = null;
216-
217-
for (T value : toIterable()) {
218-
found = true;
219-
result = value;
220-
break;
221-
}
222-
223-
if (!found) {
224-
return defaultValue;
225-
}
226-
227-
return result;
195+
return from(o.take(1)).singleOrDefault(defaultValue);
228196
}
229197

230198
/**
@@ -244,28 +212,32 @@ public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
244212
}
245213

246214
/**
247-
* Returns the last item emitted by a specified {@link Observable}.
215+
* Returns the last item emitted by a specified {@link Observable}, or throws IllegalArgumentException
216+
* if source contains no elements.
248217
* <p>
249218
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.last.png">
250219
*
251220
* @return the last item emitted by the source {@link Observable}
252221
* @throws IllegalArgumentException if source contains no elements
253222
*/
254223
public T last() {
255-
return new BlockingObservable<T>(o.last()).single();
224+
return from(o.last()).single();
256225
}
257226

258227
/**
259-
* Returns the last item emitted by a specified {@link Observable} that matches a predicate.
228+
* Returns the last item emitted by a specified {@link Observable} that matches a predicate,
229+
* or throws IllegalArgumentException if no such items are emitted.
260230
* <p>
261231
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.last.p.png">
262232
*
263233
* @param predicate
264234
* a predicate function to evaluate items emitted by the {@link Observable}
265235
* @return the last item emitted by the {@link Observable} that matches the predicate
236+
* @throws IllegalArgumentException
237+
* if no such items are emitted.
266238
*/
267239
public T last(final Func1<? super T, Boolean> predicate) {
268-
return from(o.filter(predicate)).last();
240+
return from(o.last(predicate)).single();
269241
}
270242

271243
/**
@@ -280,19 +252,7 @@ public T last(final Func1<? super T, Boolean> predicate) {
280252
* are emitted
281253
*/
282254
public T lastOrDefault(T defaultValue) {
283-
boolean found = false;
284-
T result = null;
285-
286-
for (T value : toIterable()) {
287-
found = true;
288-
result = value;
289-
}
290-
291-
if (!found) {
292-
return defaultValue;
293-
}
294-
295-
return result;
255+
return from(o.takeLast(1)).singleOrDefault(defaultValue);
296256
}
297257

298258
/**
@@ -339,19 +299,19 @@ public Iterable<T> next() {
339299

340300
/**
341301
* If the {@link Observable} completes after emitting a single item, return that item,
342-
* otherwise throw an exception.
302+
* otherwise throw an IllegalArgumentException.
343303
* <p>
344304
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.png">
345305
*
346306
* @return the single item emitted by the {@link Observable}
347307
*/
348308
public T single() {
349-
return _singleOrDefault(this, false, null);
309+
return from(o.single()).toIterable().iterator().next();
350310
}
351311

352312
/**
353313
* If the {@link Observable} completes after emitting a single item that matches a given
354-
* predicate, return that item, otherwise throw an exception.
314+
* predicate, return that item, otherwise throw an IllegalArgumentException.
355315
* <p>
356316
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.p.png">
357317
*
@@ -360,12 +320,12 @@ public T single() {
360320
* @return the single item emitted by the source {@link Observable} that matches the predicate
361321
*/
362322
public T single(Func1<? super T, Boolean> predicate) {
363-
return _singleOrDefault(from(o.filter(predicate)), false, null);
323+
return from(o.single(predicate)).toIterable().iterator().next();
364324
}
365325

366326
/**
367327
* If the {@link Observable} completes after emitting a single item, return that item; if it
368-
* emits more than one item, throw an exception; if it emits no items, return a default value.
328+
* emits more than one item, throw an IllegalArgumentException; if it emits no items, return a default value.
369329
* <p>
370330
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.singleOrDefault.png">
371331
*
@@ -375,12 +335,22 @@ public T single(Func1<? super T, Boolean> predicate) {
375335
* are emitted
376336
*/
377337
public T singleOrDefault(T defaultValue) {
378-
return _singleOrDefault(this, true, defaultValue);
338+
Iterator<? extends T> it = this.toIterable().iterator();
339+
340+
if (!it.hasNext()) {
341+
return defaultValue;
342+
}
343+
344+
T result = it.next();
345+
if (it.hasNext()) {
346+
throw new IllegalArgumentException("Sequence contains too many elements");
347+
}
348+
return result;
379349
}
380350

381351
/**
382352
* If the {@link Observable} completes after emitting a single item that matches a predicate,
383-
* return that item; if it emits more than one such item, throw an exception; if it emits no
353+
* return that item; if it emits more than one such item, throw an IllegalArgumentException; if it emits no
384354
* items, return a default value.
385355
* <p>
386356
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.singleOrDefault.p.png">
@@ -393,7 +363,7 @@ public T singleOrDefault(T defaultValue) {
393363
* default value if no such items are emitted
394364
*/
395365
public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
396-
return _singleOrDefault(from(o.filter(predicate)), true, defaultValue);
366+
return from(o.filter(predicate)).singleOrDefault(defaultValue);
397367
}
398368

399369
/**

0 commit comments

Comments
 (0)