Skip to content

Commit 7d0105d

Browse files
committed
Merge branch 'master' into replay-multicast
Conflicts: language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
2 parents 7253be3 + 3f03935 commit 7d0105d

File tree

40 files changed

+1826
-534
lines changed

40 files changed

+1826
-534
lines changed

CHANGES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# RxJava Releases #
22

3+
### Version 0.18.3 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.18.3%22)) ###
4+
5+
* [Pull 1161] (https://github.com/Netflix/RxJava/pull/1161) Removed use of deprecated API from tests & operators
6+
* [Pull 1162] (https://github.com/Netflix/RxJava/pull/1162) fix to remove drift from schedulePeriodic
7+
* [Pull 1159] (https://github.com/Netflix/RxJava/pull/1159) Rxscala improvement
8+
* [Pull 1164] (https://github.com/Netflix/RxJava/pull/1164) JMH Perf Tests for Schedulers.computation
9+
* [Pull 1158] (https://github.com/Netflix/RxJava/pull/1158) Scheduler correctness improvements.
10+
311
### Version 0.18.2 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.18.2%22)) ###
412

513

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.lang.scala.examples
1717

1818
import java.io.IOException
19+
import java.util.concurrent.CountDownLatch
20+
import java.util.concurrent.TimeUnit
1921

2022
import scala.concurrent.duration.Duration
2123
import scala.concurrent.duration.DurationInt
@@ -142,6 +144,12 @@ class RxScalaDemo extends JUnitSuite {
142144
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
143145
}
144146

147+
@Test def bufferExample() {
148+
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
149+
val boundary = Observable.interval(500 millis)
150+
o.buffer(boundary).toBlockingObservable.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
151+
}
152+
145153
@Test def windowExample() {
146154
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
147155
yield s"Observable#$i emits $n"
@@ -682,6 +690,21 @@ class RxScalaDemo extends JUnitSuite {
682690
println(result)
683691
}
684692

693+
@Test def delayExample3(): Unit = {
694+
val o = List(100, 500, 200).toObservable.delay(
695+
(i: Int) => Observable.items(i).delay(i millis)
696+
)
697+
o.toBlockingObservable.foreach(println(_))
698+
}
699+
700+
@Test def delayExample4(): Unit = {
701+
val o = List(100, 500, 200).toObservable.delay(
702+
() => Observable.interval(500 millis).take(1),
703+
(i: Int) => Observable.items(i).delay(i millis)
704+
)
705+
o.toBlockingObservable.foreach(println(_))
706+
}
707+
685708
@Test def delaySubscriptionExample(): Unit = {
686709
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
687710
val result = o.toBlockingObservable.toList
@@ -844,4 +867,96 @@ class RxScalaDemo extends JUnitSuite {
844867
val o = List(1, 2).toObservable :+ 3 :+ 4
845868
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
846869
}
870+
871+
@Test def sequenceEqualExampe(): Unit = {
872+
val o1 = List(1, 2, 3).toObservable
873+
val o2 = List(1, 2, 3).toObservable
874+
val o3 = List(1, 2).toObservable
875+
val o4 = List(1.0, 2.0, 3.0).toObservable
876+
assertTrue(o1.sequenceEqual(o2).toBlockingObservable.single)
877+
assertFalse(o1.sequenceEqual(o3).toBlockingObservable.single)
878+
assertTrue(o1.sequenceEqual(o4).toBlockingObservable.single)
879+
}
880+
881+
@Test def takeExample(): Unit = {
882+
val o = (1 to 20).toObservable
883+
.zip(Observable.interval(300 millis))
884+
.map(_._1)
885+
.take(2 seconds)
886+
println(o.toBlockingObservable.toList)
887+
}
888+
889+
@Test def takeRightExample(): Unit = {
890+
val o = (1 to 6).toObservable.takeRight(3)
891+
assertEquals(List(4, 5, 6), o.toBlockingObservable.toList)
892+
}
893+
894+
@Test def takeRightExample2(): Unit = {
895+
val o = (1 to 10).toObservable
896+
.zip(Observable.interval(100 millis))
897+
.map(_._1)
898+
.takeRight(300 millis)
899+
println(o.toBlockingObservable.toList)
900+
}
901+
902+
@Test def takeRightExample3(): Unit = {
903+
val o = (1 to 10).toObservable
904+
.zip(Observable.interval(100 millis))
905+
.map(_._1)
906+
.takeRight(2, 300 millis)
907+
println(o.toBlockingObservable.toList)
908+
}
909+
910+
@Test def timeIntervalExample(): Unit = {
911+
val o = (1 to 10).toObservable
912+
.zip(Observable.interval(100 millis))
913+
.map(_._1)
914+
.timeInterval
915+
println(o.toBlockingObservable.toList)
916+
}
917+
918+
@Test def schedulerExample1(): Unit = {
919+
val latch = new CountDownLatch(1)
920+
val worker = IOScheduler().createWorker
921+
worker.schedule {
922+
println("Hello from Scheduler")
923+
latch.countDown()
924+
}
925+
latch.await(5, TimeUnit.SECONDS)
926+
}
927+
928+
@Test def schedulerExample2(): Unit = {
929+
val latch = new CountDownLatch(1)
930+
val worker = IOScheduler().createWorker
931+
worker.schedule(1 seconds) {
932+
println("Hello from Scheduler after 1 second")
933+
latch.countDown()
934+
}
935+
latch.await(5, TimeUnit.SECONDS)
936+
}
937+
938+
@Test def schedulerExample3(): Unit = {
939+
val worker = IOScheduler().createWorker
940+
var no = 1
941+
val subscription = worker.schedulePeriodically(initialDelay = 1 seconds, period = 100 millis) {
942+
println(s"Hello(${no}) from Scheduler")
943+
no += 1
944+
}
945+
TimeUnit.SECONDS.sleep(2)
946+
subscription.unsubscribe()
947+
}
948+
949+
@Test def schedulerExample4(): Unit = {
950+
val worker = IOScheduler().createWorker
951+
var no = 1
952+
def hello: Unit = {
953+
println(s"Hello(${no}) from Scheduler")
954+
no += 1
955+
worker.schedule(100 millis)(hello)
956+
}
957+
val subscription = worker.schedule(1 seconds)(hello)
958+
TimeUnit.SECONDS.sleep(2)
959+
subscription.unsubscribe()
960+
}
961+
847962
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ object Notification {
137137
* @param notification
138138
* The [[rx.lang.scala.Notification]] to be deconstructed
139139
* @return
140-
* The [[java.lang.Throwable]] value contained in this notification.
140+
* The `java.lang.Throwable` value contained in this notification.
141141
*/
142142
def unapply[U](notification: Notification[U]): Option[Throwable] = notification match {
143143
case onError: OnError[U] => Some(onError.error)

0 commit comments

Comments
 (0)