Skip to content

Commit 0608155

Browse files
committed
Fixed the blocking/non-blocking first
1 parent 1586113 commit 0608155

File tree

4 files changed

+178
-25
lines changed

4 files changed

+178
-25
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5093,35 +5093,33 @@ public Observable<T> skip(int num) {
50935093

50945094
/**
50955095
* Returns an Observable that emits only the very first item emitted by the
5096-
* source Observable.
5096+
* source Observable, or an <code>IllegalArgumentException</code> if the source
5097+
* {@link Observable} is empty.
50975098
* <p>
50985099
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
50995100
*
5100-
* @return an Observable that emits only the very first item emitted by the
5101-
* source Observable, or nothing if the source Observable completes
5102-
* without emitting a single item
5101+
* @return an Observable that emits only the very first item from the
5102+
* source, or an <code>IllegalArgumentException</code> if the source {@link Observable} is empty.
51035103
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
5104-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
51055104
*/
51065105
public Observable<T> first() {
5107-
return take(1);
5106+
return takeFirst().last();
51085107
}
51095108

51105109
/**
51115110
* Returns an Observable that emits only the very first item emitted by the
5112-
* source Observable that satisfies a given condition.
5111+
* source Observable that satisfies a given condition, or an <code>IllegalArgumentException</code>
5112+
* if no such items are emitted.
51135113
* <p>
51145114
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstN.png">
51155115
*
51165116
* @param predicate the condition any source emitted item has to satisfy
51175117
* @return an Observable that emits only the very first item satisfying the
5118-
* given condition from the source, or nothing if the source
5119-
* Observable completes without emitting a single matching item
5118+
* given condition from the source, or an <code>IllegalArgumentException</code> if no such items are emitted.
51205119
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
5121-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
51225120
*/
51235121
public Observable<T> first(Func1<? super T, Boolean> predicate) {
5124-
return skipWhile(not(predicate)).take(1);
5122+
return takeFirst(predicate).last();
51255123
}
51265124

51275125
/**
@@ -5245,14 +5243,13 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
52455243
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
52465244
*
52475245
* @return an Observable that emits only the very first item from the
5248-
* source, or none if the source Observable completes without
5246+
* source, or an empty Observable if the source Observable completes without
52495247
* emitting a single item
52505248
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
52515249
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
5252-
* @see #first()
52535250
*/
52545251
public Observable<T> takeFirst() {
5255-
return first();
5252+
return take(1);
52565253
}
52575254

52585255
/**
@@ -5263,14 +5260,13 @@ public Observable<T> takeFirst() {
52635260
*
52645261
* @param predicate the condition any source emitted item has to satisfy
52655262
* @return an Observable that emits only the very first item satisfying the
5266-
* given condition from the source, or none if the source Observable
5263+
* given condition from the source, or an empty Observable if the source Observable
52675264
* completes without emitting a single matching item
52685265
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
52695266
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
5270-
* @see #first(Func1)
52715267
*/
52725268
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
5273-
return first(predicate);
5269+
return skipWhile(not(predicate)).take(1);
52745270
}
52755271

52765272
/**

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,75 @@ public Iterator<T> getIterator() {
172172
return OperationToIterator.toIterator(o);
173173
}
174174

175+
/**
176+
* Returns the first item emitted by a specified {@link Observable},
177+
* or IllegalArgumentException if source contains no elements
178+
*
179+
* @return the first item emitted by the source {@link Observable}
180+
* @throws IllegalArgumentException
181+
* if source contains no elements
182+
*/
183+
public T first() {
184+
return from(o.first()).single();
185+
}
186+
187+
/**
188+
* Returns the first item emitted by a specified {@link Observable} that matches a predicate,
189+
* or IllegalArgumentException if no such items are emitted.
190+
*
191+
* @param predicate
192+
* a predicate function to evaluate items emitted by the {@link Observable}
193+
* @return the first item emitted by the {@link Observable} that matches the predicate
194+
* @throws IllegalArgumentException
195+
* if no such items are emitted.
196+
*/
197+
public T first(Func1<? super T, Boolean> predicate) {
198+
return from(o.first(predicate)).single();
199+
}
200+
201+
/**
202+
* Returns the first item emitted by a specified {@link Observable}, or a default value if no
203+
* items are emitted.
204+
*
205+
* @param defaultValue
206+
* a default value to return if the {@link Observable} emits no items
207+
* @return the first item emitted by the {@link Observable}, or the default value if no items
208+
* are emitted
209+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320(v=vs.103).aspx">MSDN: Observable.FirstOrDefault</a>
210+
*/
211+
public T firstOrDefault(T defaultValue) {
212+
boolean found = false;
213+
T result = null;
214+
215+
for (T value : toIterable()) {
216+
found = true;
217+
result = value;
218+
break;
219+
}
220+
221+
if (!found) {
222+
return defaultValue;
223+
}
224+
225+
return result;
226+
}
227+
228+
/**
229+
* Returns the first item emitted by a specified {@link Observable} that matches a predicate, or
230+
* a default value if no such items are emitted.
231+
*
232+
* @param defaultValue
233+
* a default value to return if the {@link Observable} emits no matching items
234+
* @param predicate
235+
* a predicate function to evaluate items emitted by the {@link Observable}
236+
* @return the first item emitted by the {@link Observable} that matches the predicate, or the
237+
* default value if no matching items are emitted
238+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759(v=vs.103).aspx">MSDN: Observable.FirstOrDefault</a>
239+
*/
240+
public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
241+
return from(o.filter(predicate)).firstOrDefault(defaultValue);
242+
}
243+
175244
/**
176245
* Returns the last item emitted by a specified {@link Observable}.
177246
* <p>

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,43 +164,61 @@ public Subscription onSubscribe(Observer<? super String> obsv) {
164164
verify(w, times(1)).onError(any(RuntimeException.class));
165165
}
166166

167-
public void testFirstWithPredicateOfSome() {
167+
public void testTakeFirstWithPredicateOfSome() {
168168
Observable<Integer> observable = Observable.from(1, 3, 5, 4, 6, 3);
169-
observable.first(IS_EVEN).subscribe(w);
169+
observable.takeFirst(IS_EVEN).subscribe(w);
170170
verify(w, times(1)).onNext(anyInt());
171171
verify(w).onNext(4);
172172
verify(w, times(1)).onCompleted();
173173
verify(w, never()).onError(any(Throwable.class));
174174
}
175175

176176
@Test
177-
public void testFirstWithPredicateOfNoneMatchingThePredicate() {
177+
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
178178
Observable<Integer> observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1);
179-
observable.first(IS_EVEN).subscribe(w);
179+
observable.takeFirst(IS_EVEN).subscribe(w);
180180
verify(w, never()).onNext(anyInt());
181181
verify(w, times(1)).onCompleted();
182182
verify(w, never()).onError(any(Throwable.class));
183183
}
184184

185185
@Test
186-
public void testFirstOfSome() {
186+
public void testTakeFirstOfSome() {
187187
Observable<Integer> observable = Observable.from(1, 2, 3);
188-
observable.first().subscribe(w);
188+
observable.takeFirst().subscribe(w);
189189
verify(w, times(1)).onNext(anyInt());
190190
verify(w).onNext(1);
191191
verify(w, times(1)).onCompleted();
192192
verify(w, never()).onError(any(Throwable.class));
193193
}
194194

195195
@Test
196-
public void testFirstOfNone() {
196+
public void testTakeFirstOfNone() {
197197
Observable<Integer> observable = Observable.empty();
198-
observable.first().subscribe(w);
198+
observable.takeFirst().subscribe(w);
199199
verify(w, never()).onNext(anyInt());
200200
verify(w, times(1)).onCompleted();
201201
verify(w, never()).onError(any(Throwable.class));
202202
}
203203

204+
@Test
205+
public void testFirstOfNone() {
206+
Observable<Integer> observable = Observable.empty();
207+
observable.first().subscribe(w);
208+
verify(w, never()).onNext(anyInt());
209+
verify(w, never()).onCompleted();
210+
verify(w, times(1)).onError(isA(IllegalArgumentException.class));
211+
}
212+
213+
@Test
214+
public void testFirstWithPredicateOfNoneMatchingThePredicate() {
215+
Observable<Integer> observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1);
216+
observable.first(IS_EVEN).subscribe(w);
217+
verify(w, never()).onNext(anyInt());
218+
verify(w, never()).onCompleted();
219+
verify(w, times(1)).onError(isA(IllegalArgumentException.class));
220+
}
221+
204222
@Test
205223
public void testReduce() {
206224
Observable<Integer> observable = Observable.from(1, 2, 3, 4);

rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,76 @@ public void call(String t1) {
257257
}
258258
}
259259

260+
@Test
261+
public void testFirst() {
262+
BlockingObservable<String> observable = BlockingObservable.from(Observable.from("one", "two", "three"));
263+
assertEquals("one", observable.first());
264+
}
265+
266+
@Test(expected = IllegalArgumentException.class)
267+
public void testFirstWithEmpty() {
268+
BlockingObservable.from(Observable.<String>empty()).first();
269+
}
270+
271+
@Test
272+
public void testFirstWithPredicate() {
273+
BlockingObservable<String> observable = BlockingObservable.from(Observable.from("one", "two", "three"));
274+
String first = observable.first(new Func1<String, Boolean>() {
275+
@Override
276+
public Boolean call(String args) {
277+
return args.length() > 3;
278+
}
279+
});
280+
assertEquals("three", first);
281+
}
282+
283+
@Test(expected = IllegalArgumentException.class)
284+
public void testFirstWithPredicateAndEmpty() {
285+
BlockingObservable<String> observable = BlockingObservable.from(Observable.from("one", "two", "three"));
286+
observable.first(new Func1<String, Boolean>() {
287+
@Override
288+
public Boolean call(String args) {
289+
return args.length() > 5;
290+
}
291+
});
292+
}
293+
294+
@Test
295+
public void testFirstOrDefault() {
296+
BlockingObservable<String> observable = BlockingObservable.from(Observable.from("one", "two", "three"));
297+
assertEquals("one", observable.firstOrDefault("default"));
298+
}
299+
300+
@Test
301+
public void testFirstOrDefaultWithEmpty() {
302+
BlockingObservable<String> observable = BlockingObservable.from(Observable.<String>empty());
303+
assertEquals("default", observable.firstOrDefault("default"));
304+
}
305+
306+
@Test
307+
public void testFirstOrDefaultWithPredicate() {
308+
BlockingObservable<String> observable = BlockingObservable.from(Observable.from("one", "two", "three"));
309+
String first = observable.firstOrDefault("default", new Func1<String, Boolean>() {
310+
@Override
311+
public Boolean call(String args) {
312+
return args.length() > 3;
313+
}
314+
});
315+
assertEquals("three", first);
316+
}
317+
318+
@Test
319+
public void testFirstOrDefaultWithPredicateAndEmpty() {
320+
BlockingObservable<String> observable = BlockingObservable.from(Observable.from("one", "two", "three"));
321+
String first = observable.firstOrDefault("default", new Func1<String, Boolean>() {
322+
@Override
323+
public Boolean call(String args) {
324+
return args.length() > 5;
325+
}
326+
});
327+
assertEquals("default", first);
328+
}
329+
260330
private static class TestException extends RuntimeException {
261331
private static final long serialVersionUID = 1L;
262332
}

0 commit comments

Comments
 (0)