Skip to content

Commit ec24d16

Browse files
committed
Add parallelMerge to RxScala
1 parent c8bf9e6 commit ec24d16

File tree

2 files changed

+57
-0
lines changed

2 files changed

+57
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,4 +1060,14 @@ class RxScalaDemo extends JUnitSuite {
10601060
}
10611061
o.unsubscribeOn(NewThreadScheduler()).subscribe(println(_))
10621062
}
1063+
1064+
@Test def parallelMergeExample() {
1065+
val o: Observable[Observable[Int]] = (1 to 100).toObservable.map(_ => (1 to 10).toObservable)
1066+
assertEquals(100, o.size.toBlockingObservable.single)
1067+
assertEquals(1000, o.flatten.size.toBlockingObservable.single)
1068+
1069+
val o2: Observable[Observable[Int]] = o.parallelMerge(10, ComputationScheduler())
1070+
assertEquals(10, o2.size.toBlockingObservable.single)
1071+
assertEquals(1000, o2.flatten.size.toBlockingObservable.single)
1072+
}
10631073
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2947,6 +2947,53 @@ trait Observable[+T]
29472947
toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler))
29482948
}
29492949

2950+
/**
2951+
* Converts an `Observable[Observable[T]]` into another `Observable[Observable[T]]` whose
2952+
* emitted Observables emit the same items, but the number of such Observables is restricted by `parallelObservables`.
2953+
*
2954+
* For example, if the original `Observable[Observable[T]]` emits 100 Observables and `parallelObservables` is 8,
2955+
* the items emitted by the 100 original Observables will be distributed among 8 Observables emitted by the resulting Observable.
2956+
*
2957+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/parallelMerge.png">
2958+
*
2959+
* This is a mechanism for efficiently processing `n` number of Observables on a smaller `m` number of resources (typically CPU cores).
2960+
*
2961+
* @param parallelObservables the number of Observables to merge into
2962+
* @return an Observable of Observables constrained in number by `parallelObservables`
2963+
*/
2964+
def parallelMerge[U](parallelObservables: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[Observable[U]] = {
2965+
val o2: Observable[Observable[U]] = this
2966+
val o3: Observable[rx.Observable[U]] = o2.map(_.asJavaObservable.asInstanceOf[rx.Observable[U]])
2967+
val o4: rx.Observable[rx.Observable[U]] = o3.asJavaObservable.asInstanceOf[rx.Observable[rx.Observable[U]]]
2968+
val o5: rx.Observable[rx.Observable[U]] = rx.Observable.parallelMerge[U](o4, parallelObservables)
2969+
toScalaObservable(o5).map(toScalaObservable[U](_))
2970+
}
2971+
2972+
/**
2973+
* Converts an `Observable[Observable[T]]` into another `Observable[Observable[T]]` whose
2974+
* emitted Observables emit the same items, but the number of such Observables is restricted by `parallelObservables`,
2975+
* and each runs on a defined Scheduler.
2976+
*
2977+
* For example, if the original Observable[Observable[T]]` emits 100 Observables and `parallelObservables` is 8,
2978+
* the items emitted by the 100 original Observables will be distributed among 8 Observables emitted by the resulting Observable.
2979+
*
2980+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/parallelMerge.png">
2981+
*
2982+
* This is a mechanism for efficiently processing n` number of Observables on a smaller `m`
2983+
* number of resources (typically CPU cores).
2984+
*
2985+
* @param parallelObservables the number of Observables to merge into
2986+
* @param scheduler the [[Scheduler]] to run each Observable on
2987+
* @return an Observable of Observables constrained in number by `parallelObservables`
2988+
*/
2989+
def parallelMerge[U](parallelObservables: Int, scheduler: Scheduler)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[Observable[U]] = {
2990+
val o2: Observable[Observable[U]] = this
2991+
val o3: Observable[rx.Observable[U]] = o2.map(_.asJavaObservable.asInstanceOf[rx.Observable[U]])
2992+
val o4: rx.Observable[rx.Observable[U]] = o3.asJavaObservable.asInstanceOf[rx.Observable[rx.Observable[U]]]
2993+
val o5: rx.Observable[rx.Observable[U]] = rx.Observable.parallelMerge[U](o4, parallelObservables, scheduler)
2994+
toScalaObservable(o5).map(toScalaObservable[U](_))
2995+
}
2996+
29502997
/** Tests whether a predicate holds for some of the elements of this `Observable`.
29512998
*
29522999
* @param p the predicate used to test elements.

0 commit comments

Comments
 (0)