Skip to content

Commit bcf34f7

Browse files
committed
Add 'multicast' variants to RxScala
1 parent 024a427 commit bcf34f7

File tree

3 files changed

+32
-0
lines changed

3 files changed

+32
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,4 +708,19 @@ class RxScalaDemo extends JUnitSuite {
708708
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
709709
}
710710
}
711+
712+
@Test def multicastExample1(): Unit = {
713+
val unshared = Observable.from(1 to 4)
714+
val shared = unshared.multicast(Subject())
715+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
716+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
717+
shared.connect
718+
}
719+
720+
@Test def multicastExample2(): Unit = {
721+
val unshared = Observable.from(1 to 4)
722+
val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _))
723+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
724+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
725+
}
711726
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,21 @@ trait Observable[+T]
220220
new ConnectableObservable[R](asJavaObservable.multicast(s))
221221
}
222222

223+
/**
224+
* Returns an Observable that emits items produced by multicasting the source Observable within a selector function.
225+
*
226+
* @param subjectFactory the `Subject` factory
227+
* @param selector the selector function, which can use the multicasted source Observable subject to the policies
228+
* enforced by the created `Subject`
229+
* @return an Observable that emits the items produced by multicasting the source Observable within a selector function
230+
*/
231+
def multicast[R >: T, U](subjectFactory: () => rx.lang.scala.Subject[R], selector: Observable[R] => Observable[U]): Observable[U] = {
232+
val subjectFactoryJava: Func0[rx.subjects.Subject[_ >: T, _ <: R]] = () => subjectFactory().asJavaSubject
233+
val selectorJava: Func1[rx.Observable[R], rx.Observable[U]] =
234+
(jo: rx.Observable[R]) => selector(toScalaObservable[R](jo)).asJavaObservable.asInstanceOf[rx.Observable[U]]
235+
toScalaObservable[U](asJavaObservable.multicast[R, U](subjectFactoryJava, selectorJava))
236+
}
237+
223238
/**
224239
* Returns an Observable that first emits the items emitted by `this`, and then the items emitted
225240
* by `that`.

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ class CompletenessTest extends JUnitSuite {
140140
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
141141
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
142142
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
143+
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
144+
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
143145
"range(Int, Int)" -> "apply(Range)",
144146
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]",
145147
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]",

0 commit comments

Comments
 (0)