Skip to content

Commit 531449e

Browse files
committed
Add first, last, single to rxjava-scala
1 parent 7ce4c84 commit 531449e

File tree

3 files changed

+104
-21
lines changed

3 files changed

+104
-21
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,18 @@ class RxScalaDemo extends JUnitSuite {
388388
assertEquals(10, List(-1, 0, 1).toObservable.filter(condition).firstOrElse(10).toBlockingObservable.single)
389389
}
390390

391+
@Test def firstLastSingleExample() {
392+
assertEquals(1, List(1, 2, 3, 4).toObservable.head.toBlockingObservable.single)
393+
assertEquals(1, List(1, 2, 3, 4).toObservable.first.toBlockingObservable.single)
394+
assertEquals(4, List(1, 2, 3, 4).toObservable.last.toBlockingObservable.single)
395+
assertEquals(1, List(1).toObservable.single.toBlockingObservable.single)
396+
397+
assertEquals(1, List(1, 2, 3, 4).toObservable.toBlockingObservable.head)
398+
assertEquals(1, List(1, 2, 3, 4).toObservable.toBlockingObservable.first)
399+
assertEquals(4, List(1, 2, 3, 4).toObservable.toBlockingObservable.last)
400+
assertEquals(1, List(1).toObservable.toBlockingObservable.single)
401+
}
402+
391403
def square(x: Int): Int = {
392404
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
393405
Thread.sleep(100) // calculating a square is heavy work :)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1949,36 +1949,64 @@ trait Observable[+T]
19491949
def headOrElse[U >: T](default: => U): Observable[U] = firstOrElse(default)
19501950

19511951
/**
1952-
* Returns an Observable that emits only the very first item emitted by the source Observable.
1953-
* This is just a shorthand for `take(1)`.
1954-
*
1952+
* Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
1953+
* `IllegalArgumentException` if the source Observable is empty.
1954+
* <p>
19551955
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
1956-
*
1957-
* @return an Observable that emits only the very first item from the source, or none if the
1958-
* source Observable completes without emitting a single item.
1956+
*
1957+
* @return an Observable that emits only the very first item emitted by the source Observable, or raises an
1958+
* `IllegalArgumentException` if the source Observable is empty
1959+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
1960+
* @see "MSDN: Observable.firstAsync()"
19591961
*/
1960-
def first: Observable[T] = take(1)
1962+
def first: Observable[T] = {
1963+
toScalaObservable[T](asJavaObservable.first)
1964+
}
19611965

1962-
/*
1963-
1964-
TODO once https://github.com/Netflix/RxJava/issues/417 is fixed, we can add head and tail methods
1966+
/**
1967+
* Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
1968+
* `IllegalArgumentException` if the source Observable is empty.
1969+
* <p>
1970+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
1971+
*
1972+
* @return an Observable that emits only the very first item emitted by the source Observable, or raises an
1973+
* `IllegalArgumentException` if the source Observable is empty
1974+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
1975+
* @see "MSDN: Observable.firstAsync()"
1976+
* @see [[Observable.first]]
1977+
*/
1978+
def head: Observable[T] = first
19651979

19661980
/**
1967-
* emits NoSuchElementException("head of empty Observable") if empty
1981+
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
1982+
* an `IllegalArgumentException` if the source Observable is empty.
1983+
*
1984+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
1985+
*
1986+
* @return an Observable that emits the last item from the source Observable or notifies observers of an
1987+
* error
1988+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#wiki-last">RxJava Wiki: last()</a>
1989+
* @see "MSDN: Observable.lastAsync()"
19681990
*/
1969-
def head: Observable[T] = {
1970-
this.take(1).fold[Option[T]](None)((v: Option[T], e: T) => Some(e)).map({
1971-
case Some(element) => element
1972-
case None => throw new NoSuchElementException("head of empty Observable")
1973-
})
1991+
def last: Observable[T] = {
1992+
toScalaObservable[T](asJavaObservable.last)
19741993
}
1975-
1994+
19761995
/**
1977-
* emits an UnsupportedOperationException("tail of empty list") if empty
1996+
* If the source Observable completes after emitting a single item, return an Observable that emits that
1997+
* item. If the source Observable emits more than one item or no items, throw an `IllegalArgumentException`.
1998+
*
1999+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
2000+
*
2001+
* @return an Observable that emits the single item emitted by the source Observable
2002+
* @throws IllegalArgumentException
2003+
* if the source emits more than one item or no items
2004+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
2005+
* @see "MSDN: Observable.singleAsync()"
19782006
*/
1979-
def tail: Observable[T] = ???
1980-
1981-
*/
2007+
def single: Observable[T] = {
2008+
toScalaObservable[T](asJavaObservable.single)
2009+
}
19822010

19832011
/**
19842012
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,49 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
5353
new WithFilter[T](p, asJava)
5454
}
5555

56+
/**
57+
* Returns the last item emitted by a specified [[Observable]], or
58+
* throws `IllegalArgumentException` if it emits no items.
59+
*
60+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.last.png">
61+
*
62+
* @return the last item emitted by the source [[Observable]]
63+
* @throws IllegalArgumentException
64+
* if source contains no elements
65+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: last()</a>
66+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.last.aspx">MSDN: Observable.Last</a>
67+
*/
68+
def last : T = {
69+
asJava.last : T
70+
}
71+
72+
/**
73+
* Returns the first item emitted by a specified [[Observable]], or
74+
* `IllegalArgumentException` if source contains no elements.
75+
*
76+
* @return the first item emitted by the source [[Observable]]
77+
* @throws IllegalArgumentException
78+
* if source contains no elements
79+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki: first()</a>
80+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
81+
*/
82+
def first : T = {
83+
asJava.first : T
84+
}
85+
86+
/**
87+
* Returns the first item emitted by a specified [[Observable]], or
88+
* `IllegalArgumentException` if source contains no elements.
89+
*
90+
* @return the first item emitted by the source [[Observable]]
91+
* @throws IllegalArgumentException
92+
* if source contains no elements
93+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki: first()</a>
94+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
95+
* @see [[BlockingObservable.first]]
96+
*/
97+
def head : T = first
98+
5699
// last -> use toIterable.last
57100
// lastOrDefault -> use toIterable.lastOption
58101
// first -> use toIterable.head

0 commit comments

Comments
 (0)