Skip to content

Commit 2ba5fe8

Browse files
committed
Add flatten variant to RxScala
1 parent bd44425 commit 2ba5fe8

File tree

3 files changed

+48
-1
lines changed

3 files changed

+48
-1
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class RxScalaDemo extends JUnitSuite {
130130
o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_))
131131
}
132132

133-
@Test def fattenSomeExample() {
133+
@Test def flattenSomeExample() {
134134
// To merge some observables which are all known already:
135135
List(
136136
Observable.interval(200 millis),
@@ -139,6 +139,24 @@ class RxScalaDemo extends JUnitSuite {
139139
).toObservable.flatten.take(12).toBlockingObservable.foreach(println(_))
140140
}
141141

142+
@Test def flattenExample() {
143+
List(
144+
Observable.interval(200 millis).map(_ => 1).take(5),
145+
Observable.interval(200 millis).map(_ => 2).take(5),
146+
Observable.interval(200 millis).map(_ => 3).take(5),
147+
Observable.interval(200 millis).map(_ => 4).take(5)
148+
).toObservable.flatten.toBlocking.foreach(println(_))
149+
}
150+
151+
@Test def flattenExample2() {
152+
List(
153+
Observable.interval(200 millis).map(_ => 1).take(5),
154+
Observable.interval(200 millis).map(_ => 2).take(5),
155+
Observable.interval(200 millis).map(_ => 3).take(5),
156+
Observable.interval(200 millis).map(_ => 4).take(5)
157+
).toObservable.flatten(2).toBlocking.foreach(println(_))
158+
}
159+
142160
@Test def rangeAndBufferExample() {
143161
val o = Observable.from(1 to 18)
144162
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,28 @@ trait Observable[+T]
23292329
toScalaObservable[U](o5)
23302330
}
23312331

2332+
/**
2333+
* Flattens an Observable that emits Observables into a single Observable that emits the items emitted by
2334+
* those Observables, without any transformation, while limiting the maximum number of concurrent
2335+
* subscriptions to these Observables.
2336+
*
2337+
* <img width="640" height="370" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.oo.png">
2338+
*
2339+
* You can combine the items emitted by multiple Observables so that they appear as a single Observable, by
2340+
* using the `flatten` method.
2341+
*
2342+
* @param maxConcurrent the maximum number of Observables that may be subscribed to concurrently
2343+
* @return an Observable that emits items that are the result of flattening the Observables emitted by the `source` Observable
2344+
* @throws IllegalArgumentException if `maxConcurrent` is less than or equal to 0
2345+
*/
2346+
def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
2347+
val o2: Observable[Observable[U]] = this
2348+
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
2349+
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
2350+
val o5 = rx.Observable.merge[U](o4, maxConcurrent)
2351+
toScalaObservable[U](o5)
2352+
}
2353+
23322354
/**
23332355
* This behaves like `flatten` except that if any of the merged Observables
23342356
* notify of an error via [[rx.lang.scala.Observer.onError onError]], this method will

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ class CompletenessTest extends JUnitSuite {
191191
"just(T, Scheduler)" -> "[use `items(T*).subscribeOn(scheduler)`]",
192192
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
193193
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
194+
"merge(Observable[_ <: Observable[_ <: T]], Int)" -> "flatten(Int)(<:<[Observable[T], Observable[Observable[U]]])",
195+
"merge(Array[Observable[_ <: T]])" -> "[use `Observable.from(array).flatten`]",
196+
"merge(Array[Observable[_ <: T]], Scheduler)" -> "[use `Observable.from(array, scheduler).flatten`]",
197+
"merge(Iterable[_ <: Observable[_ <: T]])" -> "[use `Observable.from(iter).flatten`]",
198+
"merge(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `Observable.from(iter).flatten(n)`]",
199+
"merge(Iterable[_ <: Observable[_ <: T]], Int, Scheduler)" -> "[use `Observable.from(iter, scheduler).flatten(n)]",
200+
"merge(Iterable[_ <: Observable[_ <: T]], Scheduler)" -> "[use `Observable.from(iter, scheduler).flatten`]",
194201
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
195202
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
196203
"parallelMerge(Observable[Observable[T]], Int)" -> "parallelMerge(Int)(<:<[Observable[T], Observable[Observable[U]]])",

0 commit comments

Comments
 (0)