Skip to content

Commit 9fc14a1

Browse files
add methods and examples
1 parent 2b7f062 commit 9fc14a1

File tree

4 files changed

+173
-7
lines changed

4 files changed

+173
-7
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ object ImplicitFunctionConversions {
7373
new Func2[A, B, jlang.Boolean] {
7474
def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue
7575
}
76+
77+
implicit def scalaFuncNToRxFuncN[R](f: Seq[java.lang.Object] => R): FuncN[R] =
78+
new FuncN[R] {
79+
def call(args: java.lang.Object*): R = f(args)
80+
}
7681

7782
/**
7883
* Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
199199
def zip[U](that: Observable[U]): Observable[(T, U)] = {
200200
Observable[(T, U)](JObservable.zip[T, U, (T, U)](this.asJava, that.asJava, (t: T, u: U) => (t, u)))
201201
}
202+
203+
// public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
202204

203205
/**
204206
* Zips this Observable with its indices.
@@ -1068,6 +1070,21 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
10681070
Observable[T](asJava.skip(n))
10691071
}
10701072

1073+
/**
1074+
* Returns an Observable that bypasses all items from the source Observable as long as the specified
1075+
* condition holds true. Emits all further source items as soon as the condition becomes false.
1076+
* <p>
1077+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipWhile.png">
1078+
*
1079+
* @param predicate
1080+
* A function to test each item emitted from the source Observable for a condition.
1081+
* @return an Observable that emits all items from the source Observable as soon as the condition
1082+
* becomes false.
1083+
*/
1084+
def dropWhile(predicate: T => Boolean): Observable[T] = {
1085+
Observable[T](asJava.skipWhile(predicate))
1086+
}
1087+
10711088
/**
10721089
* Returns an Observable that emits only the first <code>num</code> items emitted by the source
10731090
* Observable.
@@ -1436,6 +1453,56 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
14361453
def product[U >: T](implicit num: Numeric[U]): Observable[U] = {
14371454
fold(num.one)(num.times)
14381455
}
1456+
1457+
/**
1458+
* TODO doc&test
1459+
*/
1460+
def firstOrElse[U >: T](default: => U): Observable[U] = {
1461+
this.materialize.take(1).map((n: Notification[T]) => {
1462+
if (n.getKind() == rx.Notification.Kind.OnNext)
1463+
n.getValue
1464+
else
1465+
default
1466+
})
1467+
}
1468+
1469+
// TODO which of these two find variants do we want?
1470+
1471+
/**
1472+
* Finds the first element of the list satisfying a predicate, if any.
1473+
* @param p
1474+
* the predicate used to test elements.
1475+
* @return an Observable emitting an Option containing the first element in the source
1476+
* Observable that satisfies p, or None if none exists or onError was called.
1477+
*/
1478+
def find(p: T => Boolean): Observable[Option[T]] = {
1479+
this.filter(p).materialize.take(1).map((n: Notification[T]) => {
1480+
if (n.getKind() == rx.Notification.Kind.OnNext)
1481+
Some(n.getValue())
1482+
else
1483+
None
1484+
})
1485+
}
1486+
1487+
/**
1488+
* Finds the first element of the list satisfying a predicate, if any.
1489+
* @param p
1490+
* the predicate used to test elements.
1491+
* @return an Observable emitting an Option containing the first element in the source
1492+
* Observable that satisfies p, or None if none exists.
1493+
*/
1494+
private def findWhichTransmitsError(p: T => Boolean): Observable[Option[T]] = {
1495+
val o: Observable[Notification[Option[T]]] =
1496+
this.filter(p).materialize.take(1).map((n: Notification[T]) => {
1497+
if (n.getKind() == rx.Notification.Kind.OnCompleted)
1498+
Notification(None)
1499+
else if (n.getKind() == rx.Notification.Kind.OnNext)
1500+
Notification(Some(n.getValue()))
1501+
else
1502+
Notification(n.getThrowable())
1503+
})
1504+
o.dematerialize
1505+
}
14391506

14401507
/**
14411508
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
@@ -1461,6 +1528,7 @@ object Observable {
14611528
import rx.{Observable => JObservable}
14621529
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
14631530
import rx.lang.scala.util._
1531+
import rx.util.functions._
14641532
import rx.lang.scala.ImplicitFunctionConversions._
14651533

14661534
private[scala]
@@ -1676,19 +1744,19 @@ object Observable {
16761744
def never: Observable[Nothing] = {
16771745
Observable[Nothing](JObservable.never())
16781746
}
1679-
1747+
16801748
/*
16811749
def apply[T](f: Future[T]): Observable[T] = {
16821750
??? // TODO convert Scala Future to Java Future
16831751
}
16841752
*/
1685-
1753+
16861754
/*
16871755
def apply[T](f: Future[T], scheduler: Scheduler): Observable[T] = {
16881756
??? // TODO convert Scala Future to Java Future
16891757
}
16901758
*/
1691-
1759+
16921760
/*
16931761
def apply[T](f: Future[T], duration: Duration): Observable[T] = {
16941762
??? // TODO convert Scala Future to Java Future
@@ -1700,7 +1768,7 @@ object Observable {
17001768
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
17011769
* <p>
17021770
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
1703-
*
1771+
*
17041772
* @param o1
17051773
* The first source observable.
17061774
* @param o2
@@ -1712,14 +1780,57 @@ object Observable {
17121780
// public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction)
17131781
// TODO do we want this as an instance method?
17141782
// TODO then decide about combineLatest with > 2 Observables
1715-
1783+
17161784
// TODO what about these two?
17171785
// public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction)
17181786
// public static <R> Observable<R> zip(Collection<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
17191787

1788+
/**
1789+
* Given a Seq of N observables, returns an observable that emits Seqs of N elements each.
1790+
* The first emitted Seq will contain the first element of each source observable,
1791+
* the second Seq the second element of each source observable, and so on.
1792+
*
1793+
* @param observables
1794+
* A Seq of source Observables
1795+
* @return an Observable that emits the zipped Seqs
1796+
*/
1797+
def zip[T](observables: Seq[Observable[T]]): Observable[Seq[T]] = {
1798+
val f: FuncN[Seq[T]] = (args: Seq[java.lang.Object]) => {
1799+
val asSeq: Seq[Object] = args.toSeq
1800+
asSeq.asInstanceOf[Seq[T]]
1801+
}
1802+
val list = observables.map(_.asJava).asJava
1803+
val o = rx.Observable.zip(list, f)
1804+
Observable[Seq[T]](o)
1805+
}
1806+
1807+
/**
1808+
* Given an Observable emitting N source observables, returns an observable that emits Seqs of N elements each.
1809+
* The first emitted Seq will contain the first element of each source observable,
1810+
* the second Seq the second element of each source observable, and so on.
1811+
*
1812+
* @param observables
1813+
* An Observable emitting N source Observables
1814+
* @return an Observable that emits the zipped Seqs
1815+
*/
1816+
def zip[T](observables: Observable[Observable[T]]): Observable[Seq[T]] = {
1817+
val f: FuncN[Seq[T]] = (args: Seq[java.lang.Object]) => {
1818+
val asSeq: Seq[Object] = args.toSeq
1819+
asSeq.asInstanceOf[Seq[T]]
1820+
}
1821+
val list = observables.map(_.asJava).asJava
1822+
val o = rx.Observable.zip(list, f)
1823+
Observable[Seq[T]](o)
1824+
}
1825+
17201826
def interval(duration: Duration): Observable[Long] = {
17211827
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit))).map(_.longValue())
17221828
}
1829+
1830+
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = {
1831+
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit, scheduler))).map(_.longValue())
1832+
}
1833+
17231834
}
17241835

17251836
// Cannot yet have inner class because of this error message:
@@ -1776,6 +1887,14 @@ class UnitTestSuite extends JUnitSuite {
17761887
assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3))
17771888
}
17781889

1890+
@Test def testFind() {
1891+
assertEquals(Some(3), Observable(1, 3, 5).find(_ >= 2).toBlockingObservable.single)
1892+
assertEquals(Some(1), Observable(1, 3, 5).find(_ => true).toBlockingObservable.single)
1893+
assertEquals(None, Observable(1, 3, 5).find(_ > 10).toBlockingObservable.single)
1894+
assertEquals(None, Observable(new Exception()).find((i: Int) => i > 10).toBlockingObservable.single)
1895+
assertEquals(None, Observable().find((i: Int) => i > 10).toBlockingObservable.single)
1896+
}
1897+
17791898
@Test def testTest() = {
17801899
val a: Observable[Int] = Observable()
17811900
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.junit.{Before, Test, Ignore}
2323
import org.junit.Assert._
2424
import rx.lang.scala.concurrency.NewThreadScheduler
2525

26-
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
26+
//@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
2727
class RxScalaDemo extends JUnitSuite {
2828

2929
@Test def intervalExample() {
@@ -310,6 +310,36 @@ class RxScalaDemo extends JUnitSuite {
310310
assertEquals(1, Observable[Int]().product.toBlockingObservable.single)
311311
}
312312

313+
@Test def mapWithIndexExample() {
314+
// We don't need mapWithIndex because we already have zipWithIndex, which we can easily
315+
// combine with map:
316+
Observable("a", "b", "c").zipWithIndex.map(pair => pair._1 + " has index " + pair._2)
317+
.toBlockingObservable.foreach(println(_))
318+
319+
// Or even nicer with for-comprehension syntax:
320+
(for ((letter, index) <- Observable("a", "b", "c").zipWithIndex) yield letter + " has index " + index)
321+
.toBlockingObservable.foreach(println(_))
322+
}
323+
324+
// source Observables are in a List:
325+
@Test def zipManySeqExample() {
326+
val observables = List(Observable(1, 2), Observable(10, 20), Observable(100, 200))
327+
(for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")"))
328+
.toBlockingObservable.foreach(println(_))
329+
}
330+
331+
// source Observables are in an Observable:
332+
@Test def zipManyObservableExample() {
333+
val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200))
334+
(for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")"))
335+
.toBlockingObservable.foreach(println(_))
336+
}
337+
338+
@Test def takeFirstWithCondition() {
339+
val condition: Int => Boolean = _ >= 3
340+
assertEquals(3, Observable(1, 2, 3, 4).dropWhile(!condition(_)).take(1).toBlockingObservable.single)
341+
}
342+
313343
def output(s: String): Unit = println(s)
314344

315345
// blocks until obs has completed

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ class CompletenessTest extends JUnitSuite {
1616
val averageProblem = "[We can't have a general average method because Scala's Numeric does not have " +
1717
"scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum). " +
1818
"You can use fold instead to accumulate sum and numberOfElements and divide at the end.]"
19+
20+
val commentForFirst = "[use take(1)]"
21+
val commentForFirstWithPredicate = "[use .dropWhile(!condition).take(1) or use find(condition)]"
1922

2023
val correspondence = defaultMethodCorrespondence ++ Map(
2124
// manually added entries for Java instance methods
@@ -25,8 +28,13 @@ class CompletenessTest extends JUnitSuite {
2528
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
2629
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
2730
"dematerialize()" -> "dematerialize(<:<[T, Notification[U]])",
31+
"first()" -> commentForFirst,
32+
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
33+
//"firstOrDefault(T)" ->
34+
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use find and map instead]", // TODO maybe firstOrElse method?
2835
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K)",
2936
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
37+
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine zipWithIndex with map or with a for comprehension]",
3038
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
3139
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
3240
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
@@ -39,6 +47,8 @@ class CompletenessTest extends JUnitSuite {
3947
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
4048
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
4149
"startWith(Iterable[T])" -> "[unnecessary because we can just use ++ instead]",
50+
"takeFirst()" -> commentForFirst,
51+
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
4252
"takeLast(Int)" -> "takeRight(Int)",
4353
"toList()" -> "toSeq",
4454
"toSortedList()" -> unnecessary,
@@ -70,7 +80,9 @@ class CompletenessTest extends JUnitSuite {
7080
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
7181
"synchronize(Observable[T])" -> "synchronize",
7282
"switchDo(Observable[_ <: Observable[_ <: T]])" -> "switch",
73-
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]"
83+
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]",
84+
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use zip in companion object and map]",
85+
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use zip in companion object and map]"
7486
) ++ List.iterate("T", 9)(s => s + ", T").map(
7587
// all 9 overloads of startWith:
7688
"startWith(" + _ + ")" -> "[unnecessary because we can just use ++ instead]"

0 commit comments

Comments
 (0)