Skip to content

Commit 1c76782

Browse files
Merge pull request #1160 from zsxwing/replay-multicast
Add `replay` and `multicast` variants to RxScala
2 parents eab66ba + 7d0105d commit 1c76782

File tree

3 files changed

+363
-33
lines changed

3 files changed

+363
-33
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,14 +343,54 @@ class RxScalaDemo extends JUnitSuite {
343343

344344
@Test def exampleWithReplay() {
345345
val numbers = Observable.interval(1000 millis).take(6)
346-
val (startFunc, sharedNumbers) = numbers.replay
346+
val sharedNumbers = numbers.replay
347347
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
348-
startFunc()
348+
sharedNumbers.connect
349349
// subscriber 2 subscribes later but still gets all numbers
350350
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
351351
waitFor(sharedNumbers)
352352
}
353353

354+
@Test def exampleWithReplay2() {
355+
val numbers = Observable.interval(100 millis).take(10)
356+
val sharedNumbers = numbers.replay(3)
357+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
358+
sharedNumbers.connect
359+
// subscriber 2 subscribes later but only gets the 3 buffered numbers and the following numbers
360+
Thread.sleep(700)
361+
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
362+
waitFor(sharedNumbers)
363+
}
364+
365+
@Test def exampleWithReplay3() {
366+
val numbers = Observable.interval(100 millis).take(10)
367+
val sharedNumbers = numbers.replay(300 millis)
368+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
369+
sharedNumbers.connect
370+
// subscriber 2 subscribes later but only gets the buffered numbers and the following numbers
371+
Thread.sleep(700)
372+
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
373+
waitFor(sharedNumbers)
374+
}
375+
376+
@Test def exampleWithReplay4() {
377+
val numbers = Observable.interval(100 millis).take(10)
378+
val sharedNumbers = numbers.replay(2, 300 millis)
379+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
380+
sharedNumbers.connect
381+
// subscriber 2 subscribes later but only gets the buffered numbers and the following numbers
382+
Thread.sleep(700)
383+
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
384+
waitFor(sharedNumbers)
385+
}
386+
387+
@Test def exampleWithReplay5() {
388+
val numbers = Observable.interval(100 millis).take(10)
389+
val sharedNumbers = numbers.replay[Long, Long]((o: Observable[Long]) => o.map(_ * 2))
390+
sharedNumbers.subscribe(n => println(s"subscriber gets $n"))
391+
waitFor(sharedNumbers)
392+
}
393+
354394
@Test def testSingleOption() {
355395
assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption)
356396
assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption)
@@ -720,7 +760,7 @@ class RxScalaDemo extends JUnitSuite {
720760
}
721761

722762
@Test def repeatExample1(): Unit = {
723-
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6)
763+
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat.take(6)
724764
assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList)
725765
}
726766

@@ -802,6 +842,21 @@ class RxScalaDemo extends JUnitSuite {
802842
}
803843
}
804844

845+
@Test def multicastExample1(): Unit = {
846+
val unshared = Observable.from(1 to 4)
847+
val shared = unshared.multicast(Subject())
848+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
849+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
850+
shared.connect
851+
}
852+
853+
@Test def multicastExample2(): Unit = {
854+
val unshared = Observable.from(1 to 4)
855+
val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _))
856+
shared.subscribe(n => println(s"subscriber 1 gets $n"))
857+
shared.subscribe(n => println(s"subscriber 2 gets $n"))
858+
}
859+
805860
@Test def startWithExample(): Unit = {
806861
val o1 = List(3, 4).toObservable
807862
val o2 = 1 +: 2 +: o1

0 commit comments

Comments
 (0)