Skip to content

Commit dc7621c

Browse files
committed
Fix "window"; change "amb" parameter to varargs; add examples
1 parent 4d0b301 commit dc7621c

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ class RxScalaDemo extends JUnitSuite {
156156
).subscribe(output(_))
157157
}
158158

159+
@Test def windowExample2() {
160+
val windowObservable = Observable.interval(500 millis)
161+
val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1)
162+
(for ((o, i) <- o.window(windowObservable).zipWithIndex; n <- o)
163+
yield s"Observable#$i emits $n"
164+
).toBlocking.foreach(println)
165+
}
166+
159167
@Test def testReduce() {
160168
assertEquals(10, List(1, 2, 3, 4).toObservable.reduce(_ + _).toBlockingObservable.single)
161169
}
@@ -731,6 +739,23 @@ class RxScalaDemo extends JUnitSuite {
731739
println(result)
732740
}
733741

742+
@Test def ambWithVarargsExample(): Unit = {
743+
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
744+
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
745+
val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds)
746+
val result = Observable.amb(o1, o2, o3).toBlocking.toList
747+
println(result)
748+
}
749+
750+
@Test def ambWithSeqExample(): Unit = {
751+
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
752+
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
753+
val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds)
754+
val o = Seq(o1, o2, o3)
755+
val result = Observable.amb(o: _*).toBlocking.toList
756+
println(result)
757+
}
758+
734759
@Test def delayExample(): Unit = {
735760
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds)
736761
val result = o.toBlockingObservable.toList

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -670,12 +670,8 @@ trait Observable[+T]
670670
val func = new Func0[rx.Observable[_ <: Any]]() {
671671
override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable
672672
}
673-
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Any](func)
674-
val o2 = Observable.items(o1).map((x: rx.Observable[_]) => {
675-
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
676-
toScalaObservable[T](x2)
677-
})
678-
o2
673+
val jo: rx.Observable[rx.Observable[T]] = asJavaObservable.asInstanceOf[rx.Observable[T]].window[Any](func)
674+
toScalaObservable(jo).map(toScalaObservable[T](_))
679675
}
680676

681677
/**
@@ -3948,8 +3944,8 @@ object Observable {
39483944
* @return an Observable that emits the same sequence of items as whichever of the source Observables
39493945
* first emitted an item
39503946
*/
3951-
def amb[T](sources: Iterable[Observable[T]]): Observable[T] = {
3952-
toScalaObservable[T](rx.Observable.amb(sources.map(_.asJavaObservable.asInstanceOf[rx.Observable[T]]).asJava))
3947+
def amb[T](sources: Observable[T]*): Observable[T] = {
3948+
toScalaObservable[T](rx.Observable.amb[T](sources.map(_.asJavaObservable).asJava))
39533949
}
39543950
}
39553951

0 commit comments

Comments
 (0)