Skip to content

Commit 2654f60

Browse files
head, headOrElse, zip(3), zip(4)
1 parent 373f5b3 commit 2654f60

File tree

1 file changed

+82
-34
lines changed

1 file changed

+82
-34
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,20 +1524,33 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
15241524
*
15251525
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
15261526
*
1527-
* @param defaultValue
1527+
* @param default
15281528
* The default value to emit if the source Observable doesn't emit anything.
15291529
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
15301530
* @return an Observable that emits only the very first item from the source, or a default value
15311531
* if the source Observable completes without emitting any item.
15321532
*/
1533-
// TODO def headOrElse
15341533
def firstOrElse[U >: T](default: => U): Observable[U] = {
15351534
this.take(1).fold[Option[U]](None)((v: Option[U], e: U) => Some(e)).map({
15361535
case Some(element) => element
15371536
case None => default
15381537
})
15391538
}
15401539

1540+
/**
1541+
* Returns an Observable that emits only the very first item emitted by the source Observable, or
1542+
* a default value if the source Observable is empty.
1543+
*
1544+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
1545+
*
1546+
* @param default
1547+
* The default value to emit if the source Observable doesn't emit anything.
1548+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
1549+
* @return an Observable that emits only the very first item from the source, or a default value
1550+
* if the source Observable completes without emitting any item.
1551+
*/
1552+
def headOrElse[U >: T](default: => U): Observable[U] = firstOrElse(default)
1553+
15411554
/**
15421555
* Returns an Observable that emits only the very first item emitted by the source Observable.
15431556
* This is just a shorthand for `take(1)`.
@@ -1547,12 +1560,17 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
15471560
* @return an Observable that emits only the very first item from the source, or none if the
15481561
* source Observable completes without emitting a single item.
15491562
*/
1550-
// TODO def head
1551-
// TODO def tail
1552-
def first: Observable[T] = {
1553-
take(1)
1554-
}
1563+
def first: Observable[T] = take(1)
15551564

1565+
def head: Observable[T] = {
1566+
this.take(1).fold[Option[T]](None)((v: Option[T], e: T) => Some(e)).map({
1567+
case Some(element) => element
1568+
case None => throw new NoSuchElementException("head of empty Observable")
1569+
})
1570+
}
1571+
1572+
// TODO def tail
1573+
15561574
/**
15571575
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
15581576
*
@@ -1870,31 +1888,36 @@ object Observable {
18701888
}
18711889

18721890
/**
1873-
* Given a Seq of N observables, returns an observable that emits Seqs of N elements each.
1874-
* The first emitted Seq will contain the first element of each source observable,
1875-
* the second Seq the second element of each source observable, and so on.
1891+
* Given 3 observables, returns an observable that emits Tuples of 3 elements each.
1892+
* The first emitted Tuple will contain the first element of each source observable,
1893+
* the second Tuple the second element of each source observable, and so on.
18761894
*
1877-
* @param observables
1878-
* A Seq of source Observables
1879-
* @return an Observable that emits the zipped Seqs
1880-
*/
1881-
def zip[A,B,C](obA: Observable[A], obB: Observable[B], obC: Observable[B]): Observable[(A, B, C)]
1882-
// TODO until 6
1883-
def zip[T](observables: Observable[T]*): Observable[Seq[T]] = {
1884-
val f: FuncN[Seq[T]] = (args: Seq[java.lang.Object]) => {
1885-
val asSeq: Seq[Object] = args.toSeq
1886-
asSeq.asInstanceOf[Seq[T]]
1887-
}
1888-
val list = observables.map(_.asJava).asJava
1889-
val o = rx.Observable.zip(list, f)
1890-
Observable[Seq[T]](o)
1895+
* @return an Observable that emits the zipped Observables
1896+
*/
1897+
def zip[A, B, C](obA: Observable[A], obB: Observable[B], obC: Observable[C]): Observable[(A, B, C)] = {
1898+
Observable[(A, B, C)](rx.Observable.zip[A, B, C, (A, B, C)](obA.asJava, obB.asJava, obC.asJava, (a: A, b: B, c: C) => (a, b, c)))
18911899
}
18921900

18931901
/**
1894-
* Given an Observable emitting N source observables, returns an observable that emits Seqs of N elements each.
1902+
* Given 4 observables, returns an observable that emits Tuples of 4 elements each.
1903+
* The first emitted Tuple will contain the first element of each source observable,
1904+
* the second Tuple the second element of each source observable, and so on.
1905+
*
1906+
* @return an Observable that emits the zipped Observables
1907+
*/
1908+
def zip[A, B, C, D](obA: Observable[A], obB: Observable[B], obC: Observable[C], obD: Observable[D]): Observable[(A, B, C, D)] = {
1909+
Observable[(A, B, C, D)](rx.Observable.zip[A, B, C, D, (A, B, C, D)](obA.asJava, obB.asJava, obC.asJava, obD.asJava, (a: A, b: B, c: C, d: D) => (a, b, c, d)))
1910+
}
1911+
1912+
/**
1913+
* Given an Observable emitting `N` source observables, returns an observable that
1914+
* emits Seqs of `N` elements each.
18951915
* The first emitted Seq will contain the first element of each source observable,
18961916
* the second Seq the second element of each source observable, and so on.
18971917
*
1918+
* Note that the returned Observable will only start emitting items once the given
1919+
* `Observable[Observable[T]]` has completed, because otherwise it cannot know `N`.
1920+
*
18981921
* @param observables
18991922
* An Observable emitting N source Observables
19001923
* @return an Observable that emits the zipped Seqs
@@ -1908,17 +1931,32 @@ object Observable {
19081931
val o = rx.Observable.zip(list, f)
19091932
Observable[Seq[T]](o)
19101933
}
1911-
1934+
1935+
/**
1936+
* Emits 0, 1, 2, ... with a delay of `duration` between consecutive numbers.
1937+
*
1938+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/interval.png">
1939+
*
1940+
* @param duration
1941+
* duration between two consecutive numbers
1942+
* @return An Observable that emits a number each time interval.
1943+
*/
1944+
def interval(duration: Duration): Observable[Long] = {
1945+
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit))).map(_.longValue())
1946+
}
1947+
19121948
/**
1913-
* TODO (SG) ScalaDoc
1914-
* TODO Provide implicit scheduler:
1949+
* Emits 0, 1, 2, ... with a delay of `duration` between consecutive numbers.
19151950
*
1916-
* def interval(duration: Duration)(implicit scheduler: Scheduler): Observable[Long]
1917-
* def interval(duration: Duration)(scheduler: Scheduler): Observable[Long]
1918-
* def interval(scheduler: Scheduler)(duration: Duration): Observable[Long]
1919-
* def interval(duration: Duration, scheduler: Scheduler): Observable[Long] && def interval(duration: Duration): Observable[Long]
1951+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/interval.png">
1952+
*
1953+
* @param duration
1954+
* duration between two consecutive numbers
1955+
* @param scheduler
1956+
* the scheduler to use
1957+
* @return An Observable that emits a number each time interval.
19201958
*/
1921-
def interval(duration: Duration)(implicit scheduler: Scheduler): Observable[Long] = {
1959+
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = {
19221960
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit, scheduler))).map(_.longValue())
19231961
}
19241962

@@ -1949,7 +1987,7 @@ private[scala] class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
19491987
import scala.concurrent.duration._
19501988
import org.junit.{Before, Test, Ignore}
19511989
import org.junit.Assert._
1952-
import org.mockito.Matchers.any
1990+
import org.mockito.Matchers._
19531991
import org.mockito.Mockito._
19541992
import org.mockito.{ MockitoAnnotations, Mock }
19551993

@@ -2012,6 +2050,16 @@ private[scala] class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
20122050
assertEquals(receivedMsg, msg)
20132051
}
20142052

2053+
@Test def testHead() {
2054+
val observer = mock(classOf[Observer[Int]])
2055+
val o = Observable().head
2056+
val sub = o.subscribe(observer)
2057+
2058+
verify(observer, never).onNext(any(classOf[Int]))
2059+
verify(observer, never).onCompleted()
2060+
verify(observer, times(1)).onError(any(classOf[NoSuchElementException]))
2061+
}
2062+
20152063
@Test def testTest() = {
20162064
val a: Observable[Int] = Observable()
20172065
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last)

0 commit comments

Comments
 (0)