Skip to content

Commit fe1c127

Browse files
first and firstOrElse
1 parent 9fc14a1 commit fe1c127

File tree

3 files changed

+71
-58
lines changed

3 files changed

+71
-58
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 59 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,57 +1453,39 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
14531453
def product[U >: T](implicit num: Numeric[U]): Observable[U] = {
14541454
fold(num.one)(num.times)
14551455
}
1456-
1456+
14571457
/**
1458-
* TODO doc&test
1458+
* Returns an Observable that emits only the very first item emitted by the source Observable, or
1459+
* a default value if the source Observable is empty.
1460+
* <p>
1461+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
1462+
*
1463+
* @param defaultValue
1464+
* The default value to emit if the source Observable doesn't emit anything.
1465+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
1466+
* @return an Observable that emits only the very first item from the source, or a default value
1467+
* if the source Observable completes without emitting any item.
14591468
*/
14601469
def firstOrElse[U >: T](default: => U): Observable[U] = {
1461-
this.materialize.take(1).map((n: Notification[T]) => {
1462-
if (n.getKind() == rx.Notification.Kind.OnNext)
1463-
n.getValue
1464-
else
1465-
default
1470+
this.take(1).fold[Option[U]](None)((v: Option[U], e: U) => Some(e)).map({
1471+
case Some(element) => element
1472+
case None => default
14661473
})
14671474
}
1468-
1469-
// TODO which of these two find variants do we want?
1470-
1475+
14711476
/**
1472-
* Finds the first element of the list satisfying a predicate, if any.
1473-
* @param p
1474-
* the predicate used to test elements.
1475-
* @return an Observable emitting an Option containing the first element in the source
1476-
* Observable that satisfies p, or None if none exists or onError was called.
1477-
*/
1478-
def find(p: T => Boolean): Observable[Option[T]] = {
1479-
this.filter(p).materialize.take(1).map((n: Notification[T]) => {
1480-
if (n.getKind() == rx.Notification.Kind.OnNext)
1481-
Some(n.getValue())
1482-
else
1483-
None
1484-
})
1477+
* Returns an Observable that emits only the very first item emitted by the source Observable.
1478+
* This is just a shorthand for {@code take(1)}.
1479+
* <p>
1480+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
1481+
*
1482+
* @return an Observable that emits only the very first item from the source, or none if the
1483+
* source Observable completes without emitting a single item.
1484+
*/
1485+
def first: Observable[T] = {
1486+
take(1)
14851487
}
14861488

1487-
/**
1488-
* Finds the first element of the list satisfying a predicate, if any.
1489-
* @param p
1490-
* the predicate used to test elements.
1491-
* @return an Observable emitting an Option containing the first element in the source
1492-
* Observable that satisfies p, or None if none exists.
1493-
*/
1494-
private def findWhichTransmitsError(p: T => Boolean): Observable[Option[T]] = {
1495-
val o: Observable[Notification[Option[T]]] =
1496-
this.filter(p).materialize.take(1).map((n: Notification[T]) => {
1497-
if (n.getKind() == rx.Notification.Kind.OnCompleted)
1498-
Notification(None)
1499-
else if (n.getKind() == rx.Notification.Kind.OnNext)
1500-
Notification(Some(n.getValue()))
1501-
else
1502-
Notification(n.getThrowable())
1503-
})
1504-
o.dematerialize
1505-
}
1506-
15071489
/**
15081490
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
15091491
* operators).
@@ -1586,8 +1568,8 @@ object Observable {
15861568
* the type of the items (ostensibly) emitted by the Observable
15871569
* @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when the Observer subscribes to it
15881570
*/
1589-
def apply(exception: Throwable): Observable[Nothing] = {
1590-
Observable[Nothing](JObservable.error(exception))
1571+
def apply[T](exception: Throwable): Observable[T] = {
1572+
Observable[T](JObservable.error(exception))
15911573
}
15921574

15931575
/**
@@ -1886,13 +1868,39 @@ class UnitTestSuite extends JUnitSuite {
18861868

18871869
assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3))
18881870
}
1871+
1872+
// Test that Java's firstOrDefault propagates errors.
1873+
// If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse
1874+
// should be changed accordingly.
1875+
@Test def testJavaFirstOrDefault() {
1876+
assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable().single)
1877+
assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable().single)
1878+
val msg = "msg6251"
1879+
var receivedMsg = "none"
1880+
try {
1881+
rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable().single
1882+
} catch {
1883+
case e: Exception => receivedMsg = e.getCause().getMessage()
1884+
}
1885+
assertEquals(receivedMsg, msg)
1886+
}
18891887

1890-
@Test def testFind() {
1891-
assertEquals(Some(3), Observable(1, 3, 5).find(_ >= 2).toBlockingObservable.single)
1892-
assertEquals(Some(1), Observable(1, 3, 5).find(_ => true).toBlockingObservable.single)
1893-
assertEquals(None, Observable(1, 3, 5).find(_ > 10).toBlockingObservable.single)
1894-
assertEquals(None, Observable(new Exception()).find((i: Int) => i > 10).toBlockingObservable.single)
1895-
assertEquals(None, Observable().find((i: Int) => i > 10).toBlockingObservable.single)
1888+
@Test def testFirstOrElse() {
1889+
def mustNotBeCalled: String = error("this method should not be called")
1890+
def mustBeCalled: String = "this is the default value"
1891+
assertEquals("hello", Observable("hello").firstOrElse(mustNotBeCalled).toBlockingObservable.single)
1892+
assertEquals("this is the default value", Observable().firstOrElse(mustBeCalled).toBlockingObservable.single)
1893+
}
1894+
1895+
@Test def testFirstOrElseWithError() {
1896+
val msg = "msg6251"
1897+
var receivedMsg = "none"
1898+
try {
1899+
Observable[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single
1900+
} catch {
1901+
case e: Exception => receivedMsg = e.getCause().getMessage()
1902+
}
1903+
assertEquals(receivedMsg, msg)
18961904
}
18971905

18981906
@Test def testTest() = {

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,16 @@ class RxScalaDemo extends JUnitSuite {
337337

338338
@Test def takeFirstWithCondition() {
339339
val condition: Int => Boolean = _ >= 3
340-
assertEquals(3, Observable(1, 2, 3, 4).dropWhile(!condition(_)).take(1).toBlockingObservable.single)
340+
assertEquals(3, Observable(1, 2, 3, 4).filter(condition).first.toBlockingObservable.single)
341341
}
342342

343+
@Test def firstOrDefaultWithCondition() {
344+
val condition: Int => Boolean = _ >= 3
345+
assertEquals(3, Observable(1, 2, 3, 4).filter(condition).firstOrElse(10).toBlockingObservable.single)
346+
assertEquals(10, Observable(-1, 0, 1).filter(condition).firstOrElse(10).toBlockingObservable.single)
347+
}
348+
349+
343350
def output(s: String): Unit = println(s)
344351

345352
// blocks until obs has completed

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ class CompletenessTest extends JUnitSuite {
1717
"scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum). " +
1818
"You can use fold instead to accumulate sum and numberOfElements and divide at the end.]"
1919

20-
val commentForFirst = "[use take(1)]"
21-
val commentForFirstWithPredicate = "[use .dropWhile(!condition).take(1) or use find(condition)]"
20+
val commentForFirstWithPredicate = "[use .filter(condition).first]"
2221

2322
val correspondence = defaultMethodCorrespondence ++ Map(
2423
// manually added entries for Java instance methods
@@ -28,10 +27,9 @@ class CompletenessTest extends JUnitSuite {
2827
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
2928
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
3029
"dematerialize()" -> "dematerialize(<:<[T, Notification[U]])",
31-
"first()" -> commentForFirst,
3230
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
33-
//"firstOrDefault(T)" ->
34-
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use find and map instead]", // TODO maybe firstOrElse method?
31+
"firstOrDefault(T)" -> "firstOrElse(=> U)",
32+
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use .filter(condition).firstOrElse(default)]",
3533
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K)",
3634
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
3735
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine zipWithIndex with map or with a for comprehension]",
@@ -47,7 +45,7 @@ class CompletenessTest extends JUnitSuite {
4745
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
4846
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
4947
"startWith(Iterable[T])" -> "[unnecessary because we can just use ++ instead]",
50-
"takeFirst()" -> commentForFirst,
48+
"takeFirst()" -> "first",
5149
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
5250
"takeLast(Int)" -> "takeRight(Int)",
5351
"toList()" -> "toSeq",

0 commit comments

Comments
 (0)