Skip to content

Commit 5efe114

Browse files
add multicast, publish, replay
1 parent ac26e42 commit 5efe114

File tree

5 files changed

+69
-44
lines changed

5 files changed

+69
-44
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package rx.lang.scala
2020
import org.scalatest.junit.JUnitSuite
2121
import scala.collection.Seq
2222
import rx.lang.scala.observables.BlockingObservable
23-
import rx.lang.scala.observables.ConnectableObservable
2423

2524

2625
/**
@@ -38,6 +37,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
3837
import rx.util.functions._
3938
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
4039
import rx.lang.scala.util._
40+
import rx.lang.scala.subjects.Subject
4141
import rx.lang.scala.ImplicitFunctionConversions._
4242

4343
/**
@@ -132,11 +132,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
132132
* into
133133
* @param <R>
134134
* result type
135-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
136-
* push results into the specified {@link Subject}
135+
* @return a pair of a start function and an {@link Observable} such that when the start function
136+
* is called, the Observable starts to push results into the specified {@link Subject}
137137
*/
138-
// public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) TODO
139-
138+
def multicast[R](subject: Subject[T, R]): (() => Subscription, Observable[R]) = {
139+
val javaCO = asJava.multicast[R](subject)
140+
(() => javaCO.connect(), Observable[R](javaCO))
141+
}
140142

141143
/**
142144
* Returns an Observable that first emits the items emitted by this, and then the items emitted
@@ -904,11 +906,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
904906
* <p>
905907
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.png">
906908
*
907-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
908-
* emit items to its {@link Observer}s
909+
* @return a pair of a start function and an {@link Observable} such that when the start function
910+
* is called, the Observable starts to emit items to its {@link Observer}s
909911
*/
910-
def replay(): ConnectableObservable[T] = {
911-
new ConnectableObservable[T](asJava.replay())
912+
def replay(): (() => Subscription, Observable[T]) = {
913+
val javaCO = asJava.replay()
914+
(() => javaCO.connect(), Observable[T](javaCO))
912915
}
913916

914917
/**
@@ -937,11 +940,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
937940
* <p>
938941
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
939942
*
940-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
941-
* emit items to its {@link Observer}s
943+
* @return a pair of a start function and an {@link Observable} such that when the start function
944+
* is called, the Observable starts to emit items to its {@link Observer}s
942945
*/
943-
def publish: ConnectableObservable[T] = {
944-
new ConnectableObservable[T](asJava.publish())
946+
def publish: (() => Subscription, Observable[T]) = {
947+
val javaCO = asJava.publish()
948+
(() => javaCO.connect(), Observable[T](javaCO))
945949
}
946950

947951
// There is no aggregate function with signature

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,44 @@ class RxScalaDemo extends JUnitSuite {
191191
waitFor(firstMedalOfEachCountry)
192192
}
193193

194+
@Test def exampleWithoutPublish() {
195+
val unshared = Observable(1 to 4)
196+
unshared.subscribe(n => println(s"subscriber 1 gets $n"))
197+
unshared.subscribe(n => println(s"subscriber 2 gets $n"))
198+
}
199+
200+
@Test def exampleWithPublish() {
201+
val unshared = Observable(1 to 4)
202+
val (startFunc, shared) = unshared.publish
203+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
204+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
205+
startFunc()
206+
}
207+
208+
def doLater(waitTime: Duration, action: () => Unit): Unit = {
209+
Observable.interval(waitTime).take(1).subscribe(_ => action())
210+
}
211+
212+
@Test def exampleWithoutReplay() {
213+
val numbers = Observable.interval(1000 millis).take(6)
214+
val (startFunc, sharedNumbers) = numbers.publish
215+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
216+
startFunc()
217+
// subscriber 2 misses 0, 1, 2!
218+
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
219+
waitFor(sharedNumbers)
220+
}
221+
222+
@Test def exampleWithReplay() {
223+
val numbers = Observable.interval(1000 millis).take(6)
224+
val (startFunc, sharedNumbers) = numbers.replay
225+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
226+
startFunc()
227+
// subscriber 2 subscribes later but still gets all numbers
228+
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
229+
waitFor(sharedNumbers)
230+
}
231+
194232
def output(s: String): Unit = println(s)
195233

196234
// blocks until obs has completed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,6 @@ rx.plugins.RxJavaErrorHandler
5353
rx.plugins.RxJavaObservableExecutionHook
5454
rx.plugins.RxJavaPlugins
5555
56-
rx.subjects.AsyncSubject
57-
rx.subjects.BehaviorSubject
58-
rx.subjects.PublishSubject
59-
rx.subjects.ReplaySubject
60-
rx.subjects.Subject
61-
6256
rx.subscriptions.BooleanSubscription
6357
rx.subscriptions.CompositeSubscription
6458
rx.subscriptions.Subscriptions
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package rx.lang.scala
2+
3+
package object subjects {
4+
5+
// in Java: public abstract class Subject<T,R> extends Observable<R> implements Observer<T>
6+
type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R]
7+
8+
// TODO (including static methods of these classes)
9+
// rx.subjects.AsyncSubject
10+
// rx.subjects.BehaviorSubject
11+
// rx.subjects.PublishSubject
12+
// rx.subjects.ReplaySubject
13+
14+
}

0 commit comments

Comments
 (0)