Skip to content

Commit 38f96db

Browse files
committed
Add concatMap to RxScala
1 parent c5691fb commit 38f96db

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,4 +1169,11 @@ class RxScalaDemo extends JUnitSuite {
11691169
(n: Int, m: Int) => n * m
11701170
).toBlocking.foreach(println)
11711171
}
1172+
1173+
@Test def concatMapExample() {
1174+
val o = Observable.items(10, 100)
1175+
o.concatMap(n => Observable.interval(200 millis).map(_ * n).take(10))
1176+
.take(20)
1177+
.toBlocking.foreach(println)
1178+
}
11721179
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,25 @@ trait Observable[+T]
298298
toScalaObservable[U](o5)
299299
}
300300

301+
/**
302+
* Returns a new Observable that emits items resulting from applying a function that you supply to each item
303+
* emitted by the source Observable, where that function returns an Observable, and then emitting the items
304+
* that result from concatinating those resulting Observables.
305+
*
306+
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/concatMap.png">
307+
*
308+
* @param f a function that, when applied to an item emitted by the source Observable, returns an Observable
309+
* @return an Observable that emits the result of applying the transformation function to each item emitted
310+
* by the source Observable and concatinating the Observables obtained from this transformation
311+
*/
312+
def concatMap[R](f: T => Observable[R]): Observable[R] = {
313+
toScalaObservable[R](asJavaObservable.concatMap[R](new Func1[T, rx.Observable[_ <: R]] {
314+
def call(t1: T): rx.Observable[_ <: R] = {
315+
f(t1).asJavaObservable
316+
}
317+
}))
318+
}
319+
301320
/**
302321
* Wraps this Observable in another Observable that ensures that the resulting
303322
* Observable is chronologically well-behaved.

0 commit comments

Comments
 (0)