Skip to content

Commit 55a552a

Browse files
add parallel and example
1 parent 1495968 commit 55a552a

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,34 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
16741674
def toBlockingObservable: BlockingObservable[T] = {
16751675
new BlockingObservable[T](asJava.toBlockingObservable())
16761676
}
1677+
1678+
/**
1679+
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler} and return an {@code Observable<R>} with the output.
1680+
*
1681+
* @param f
1682+
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
1683+
* @return an Observable with the output of the function executed on a {@link Scheduler}
1684+
*/
1685+
def parallel[R](f: Observable[T] => Observable[R]): Observable[R] = {
1686+
val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
1687+
(jo: rx.Observable[T]) => f(Observable[T](jo)).asJava.asInstanceOf[rx.Observable[R]]
1688+
Observable[R](asJava.asInstanceOf[rx.Observable[T]].parallel[R](fJava))
1689+
}
1690+
1691+
/**
1692+
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Scheduler} and return an {@code Observable<R>} with the output.
1693+
*
1694+
* @param f
1695+
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
1696+
* @param s
1697+
* a {@link Scheduler} to perform the work on.
1698+
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
1699+
*/
1700+
def parallel[R](f: Observable[T] => Observable[R], scheduler: Scheduler): Observable[R] = {
1701+
val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
1702+
(jo: rx.Observable[T]) => f(Observable[T](jo)).asJava.asInstanceOf[rx.Observable[R]]
1703+
Observable[R](asJava.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler))
1704+
}
16771705

16781706
def withFilter(p: T => Boolean): WithFilter[T] = {
16791707
new WithFilter[T](p, asJava)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,28 @@ class RxScalaDemo extends JUnitSuite {
346346
assertEquals(10, Observable(-1, 0, 1).filter(condition).firstOrElse(10).toBlockingObservable.single)
347347
}
348348

349+
def square(x: Int): Int = {
350+
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId()}")
351+
Thread.sleep(100) // calculating a square is heavy work :)
352+
x*x
353+
}
354+
355+
def work(o1: Observable[Int]): Observable[String] = {
356+
println(s"map() is being called on thread ${Thread.currentThread().getId()}")
357+
o1.map(i => s"The square of $i is ${square(i)}")
358+
}
359+
360+
@Test def parallelExample() {
361+
val t0 = System.currentTimeMillis()
362+
Observable(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_))
363+
println(s"Work took ${System.currentTimeMillis()-t0} ms")
364+
}
365+
366+
@Test def exampleWithoutParallel() {
367+
val t0 = System.currentTimeMillis()
368+
work(Observable(1 to 10)).toBlockingObservable.foreach(println(_))
369+
println(s"Work took ${System.currentTimeMillis()-t0} ms")
370+
}
349371

350372
def output(s: String): Unit = println(s)
351373

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class CompletenessTest extends JUnitSuite {
4040
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
4141
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
4242
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
43+
"parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])",
44+
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
4345
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
4446
"reduce(R, Func2[R, _ >: T, R])" -> "fold(R)((R, T) => R)",
4547
"scan(Func2[T, T, T])" -> unnecessary,
@@ -169,7 +171,7 @@ class CompletenessTest extends JUnitSuite {
169171
def javaMethodSignatureToScala(s: String): String = {
170172
s.replaceAllLiterally("Long, TimeUnit", "Duration")
171173
.replaceAll("Action0", "() => Unit")
172-
// nested [] can't be parsed with regex, but we still do it, please forgive us ;-)
174+
// nested [] can't be parsed with regex, so these will have to be added manually
173175
.replaceAll("Action1\\[([^]]*)\\]", "$1 => Unit")
174176
.replaceAll("Action2\\[([^]]*), ([^]]*)\\]", "($1, $2) => Unit")
175177
.replaceAll("Func0\\[([^]]*)\\]", "() => $1")

0 commit comments

Comments
 (0)