File tree Expand file tree Collapse file tree 2 files changed +6
-8
lines changed
language-adaptors/rxjava-scala/src
examples/scala/rx/lang/scala/examples Expand file tree Collapse file tree 2 files changed +6
-8
lines changed Original file line number Diff line number Diff line change @@ -312,9 +312,9 @@ class RxScalaDemo extends JUnitSuite {
312
312
313
313
@ Test def exampleWithReplay () {
314
314
val numbers = Observable .interval(1000 millis).take(6 )
315
- val (startFunc, sharedNumbers) = numbers.replay
315
+ val sharedNumbers = numbers.replay
316
316
sharedNumbers.subscribe(n => println(s " subscriber 1 gets $n" ))
317
- startFunc()
317
+ sharedNumbers.connect
318
318
// subscriber 2 subscribes later but still gets all numbers
319
319
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s " subscriber 2 gets $n" )) })
320
320
waitFor(sharedNumbers)
Original file line number Diff line number Diff line change @@ -215,10 +215,9 @@ trait Observable[+T]
215
215
* @return a pair of a start function and an [[rx.lang.scala.Observable ]] such that when the start function
216
216
* is called, the Observable starts to push results into the specified Subject
217
217
*/
218
- def multicast [R >: T ](subject : rx.lang.scala.Subject [R ]): (() => Subscription , Observable [R ]) = {
218
+ def multicast [R >: T ](subject : rx.lang.scala.Subject [R ]): ConnectableObservable [R ] = {
219
219
val s : rx.subjects.Subject [_ >: T , _< : R ] = subject.asJavaSubject
220
- val javaCO : rx.observables.ConnectableObservable [R ] = asJavaObservable.multicast(s)
221
- (() => javaCO.connect(), toScalaObservable(javaCO))
220
+ new ConnectableObservable [R ](asJavaObservable.multicast(s))
222
221
}
223
222
224
223
/**
@@ -1032,9 +1031,8 @@ trait Observable[+T]
1032
1031
* @return a pair of a start function and an [[rx.lang.scala.Observable ]] such that when the start function
1033
1032
* is called, the Observable starts to emit items to its [[rx.lang.scala.Observer ]]s
1034
1033
*/
1035
- def replay : (() => Subscription , Observable [T ]) = {
1036
- val javaCO = asJavaObservable.replay()
1037
- (() => javaCO.connect(), toScalaObservable[T ](javaCO))
1034
+ def replay : ConnectableObservable [T ] = {
1035
+ new ConnectableObservable [T ](asJavaObservable.replay())
1038
1036
}
1039
1037
1040
1038
/**
You can’t perform that action at this time.
0 commit comments