@@ -19,6 +19,7 @@ import java.io.IOException
1919import java .util .concurrent .CountDownLatch
2020import java .util .concurrent .TimeUnit
2121
22+ import scala .collection .mutable
2223import scala .concurrent .duration .Duration
2324import scala .concurrent .duration .DurationInt
2425import scala .concurrent .duration .DurationLong
@@ -47,6 +48,29 @@ import rx.lang.scala.schedulers._
4748@ Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
4849class RxScalaDemo extends JUnitSuite {
4950
51+ @ Test def subscribeExample () {
52+ val o = Observable .items(1 , 2 , 3 )
53+
54+ // Generally, we have two methods, `subscribe` and `foreach`, to listen to the messages from an Observable.
55+ // `foreach` is just an alias to `subscribe`.
56+ o.subscribe(
57+ n => println(n),
58+ e => e.printStackTrace(),
59+ () => println(" done" )
60+ )
61+
62+ o.foreach(
63+ n => println(n),
64+ e => e.printStackTrace(),
65+ () => println(" done" )
66+ )
67+
68+ // For-comprehension is also an alternative, if you are only interested in `onNext`
69+ for (i <- o) {
70+ println(i)
71+ }
72+ }
73+
5074 @ Test def intervalExample () {
5175 val o = Observable .interval(200 millis).take(5 )
5276 o.subscribe(n => println(" n = " + n))
@@ -130,7 +154,7 @@ class RxScalaDemo extends JUnitSuite {
130154 o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_))
131155 }
132156
133- @ Test def fattenSomeExample () {
157+ @ Test def flattenSomeExample () {
134158 // To merge some observables which are all known already:
135159 List (
136160 Observable .interval(200 millis),
@@ -139,6 +163,24 @@ class RxScalaDemo extends JUnitSuite {
139163 ).toObservable.flatten.take(12 ).toBlockingObservable.foreach(println(_))
140164 }
141165
166+ @ Test def flattenExample () {
167+ List (
168+ Observable .interval(200 millis).map(_ => 1 ).take(5 ),
169+ Observable .interval(200 millis).map(_ => 2 ).take(5 ),
170+ Observable .interval(200 millis).map(_ => 3 ).take(5 ),
171+ Observable .interval(200 millis).map(_ => 4 ).take(5 )
172+ ).toObservable.flatten.toBlocking.foreach(println(_))
173+ }
174+
175+ @ Test def flattenExample2 () {
176+ List (
177+ Observable .interval(200 millis).map(_ => 1 ).take(5 ),
178+ Observable .interval(200 millis).map(_ => 2 ).take(5 ),
179+ Observable .interval(200 millis).map(_ => 3 ).take(5 ),
180+ Observable .interval(200 millis).map(_ => 4 ).take(5 )
181+ ).toObservable.flatten(2 ).toBlocking.foreach(println(_))
182+ }
183+
142184 @ Test def rangeAndBufferExample () {
143185 val o = Observable .from(1 to 18 )
144186 o.buffer(5 ).subscribe((l : Seq [Int ]) => println(l.mkString(" [" , " , " , " ]" )))
@@ -276,6 +318,13 @@ class RxScalaDemo extends JUnitSuite {
276318 sequenced.subscribe(x => println(s " Emitted group: $x" ))
277319 }
278320
321+ @ Test def groupByUntilExample2 () {
322+ val numbers = Observable .interval(250 millis).take(14 )
323+ val grouped = numbers.groupByUntil[Long , Long ](x => x % 2 , x => x * 10 , {case (key, obs) => Observable .interval(2 seconds)})
324+ val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
325+ sequenced.toBlocking.foreach(x => println(s " Emitted group: $x" ))
326+ }
327+
279328 @ Test def combineLatestExample () {
280329 val firstCounter = Observable .interval(250 millis)
281330 val secondCounter = Observable .interval(550 millis)
@@ -286,6 +335,17 @@ class RxScalaDemo extends JUnitSuite {
286335 waitFor(combinedCounter)
287336 }
288337
338+ @ Test def combineLatestExample2 () {
339+ val firstCounter = Observable .interval(250 millis)
340+ val secondCounter = Observable .interval(550 millis)
341+ val thirdCounter = Observable .interval(850 millis)
342+ val sources = Seq (firstCounter, secondCounter, thirdCounter)
343+ val combinedCounter = Observable .combineLatest(sources, (items : Seq [Long ]) => items.toList).take(10 )
344+
345+ combinedCounter subscribe {x => println(s " Emitted group: $x" )}
346+ waitFor(combinedCounter)
347+ }
348+
289349 @ Test def olympicsExampleWithoutPublish () {
290350 val medals = Olympics .mountainBikeMedals.doOnEach(_ => println(" onNext" ))
291351 medals.subscribe(println(_)) // triggers an execution of medals Observable
@@ -829,6 +889,40 @@ class RxScalaDemo extends JUnitSuite {
829889 println(m.toBlockingObservable.single)
830890 }
831891
892+ @ Test def toMultimapExample1 (): Unit = {
893+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
894+ val keySelector = (s : String ) => s.head
895+ val m = o.toMultimap(keySelector)
896+ println(m.toBlocking.single)
897+ }
898+
899+ @ Test def toMultimapExample2 (): Unit = {
900+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
901+ val keySelector = (s : String ) => s.head
902+ val valueSelector = (s : String ) => s.tail
903+ val m = o.toMultimap(keySelector, valueSelector)
904+ println(m.toBlocking.single)
905+ }
906+
907+ @ Test def toMultimapExample3 (): Unit = {
908+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
909+ val keySelector = (s : String ) => s.head
910+ val valueSelector = (s : String ) => s.tail
911+ val mapFactory = () => mutable.Map ('d' -> mutable.Buffer (" oug" ))
912+ val m = o.toMultimap(keySelector, valueSelector, mapFactory)
913+ println(m.toBlocking.single.mapValues(_.toList))
914+ }
915+
916+ @ Test def toMultimapExample4 (): Unit = {
917+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
918+ val keySelector = (s : String ) => s.head
919+ val valueSelector = (s : String ) => s.tail
920+ val mapFactory = () => mutable.Map ('d' -> mutable.ListBuffer (" oug" ))
921+ val bufferFactory = (k : Char ) => mutable.ListBuffer [String ]()
922+ val m = o.toMultimap(keySelector, valueSelector, mapFactory, bufferFactory)
923+ println(m.toBlocking.single)
924+ }
925+
832926 @ Test def containsExample (): Unit = {
833927 val o1 = List (1 , 2 , 3 ).toObservable.contains(2 )
834928 assertTrue(o1.toBlockingObservable.single)
@@ -857,6 +951,28 @@ class RxScalaDemo extends JUnitSuite {
857951 assertEquals(List (" alice" , " bob" , " carol" ), o.retry(3 ).toBlockingObservable.toList)
858952 }
859953
954+ @ Test def retryExample3 (): Unit = {
955+ var isFirst = true
956+ val o = Observable {
957+ (subscriber : Subscriber [String ]) =>
958+ if (isFirst) {
959+ subscriber.onNext(" alice" )
960+ subscriber.onError(new IOException (" Oops" ))
961+ isFirst = false
962+ }
963+ else {
964+ subscriber.onNext(" bob" )
965+ subscriber.onError(new RuntimeException (" Oops" ))
966+ }
967+ }
968+ o.retry {
969+ (times, e) => e match {
970+ case e : IOException => times <= 3
971+ case _ => false
972+ }
973+ }.subscribe(s => println(s), e => e.printStackTrace())
974+ }
975+
860976 @ Test def liftExample1 (): Unit = {
861977 // Add "No. " in front of each item
862978 val o = List (1 , 2 , 3 ).toObservable.lift {
@@ -1176,4 +1292,82 @@ class RxScalaDemo extends JUnitSuite {
11761292 .take(20 )
11771293 .toBlocking.foreach(println)
11781294 }
1295+
1296+ @ Test def onErrorResumeNextExample () {
1297+ val o = Observable {
1298+ (subscriber : Subscriber [Int ]) =>
1299+ subscriber.onNext(1 )
1300+ subscriber.onNext(2 )
1301+ subscriber.onError(new IOException (" Oops" ))
1302+ subscriber.onNext(3 )
1303+ subscriber.onNext(4 )
1304+ }
1305+ o.onErrorResumeNext(_ => Observable .items(10 , 11 , 12 )).subscribe(println(_))
1306+ }
1307+
1308+ @ Test def onErrorFlatMapExample () {
1309+ val o = Observable {
1310+ (subscriber : Subscriber [Int ]) =>
1311+ subscriber.onNext(1 )
1312+ subscriber.onNext(2 )
1313+ subscriber.onError(new IOException (" Oops" ))
1314+ subscriber.onNext(3 )
1315+ subscriber.onNext(4 )
1316+ }
1317+ o.onErrorFlatMap((_, _) => Observable .items(10 , 11 , 12 )).subscribe(println(_))
1318+ }
1319+
1320+ @ Test def onErrorFlatMapExample2 () {
1321+ val o = Observable .items(4 , 2 , 0 ).map(16 / _).onErrorFlatMap {
1322+ (e, op) => op match {
1323+ case Some (v) if v == 0 => Observable .items(Int .MinValue )
1324+ case _ => Observable .empty
1325+ }
1326+ }
1327+ o.subscribe(println(_))
1328+ }
1329+
1330+ @ Test def switchMapExample () {
1331+ val o = Observable .interval(300 millis).take(5 ).switchMap[String ] {
1332+ n => Observable .interval(50 millis).take(10 ).map(i => s " Seq ${n}: ${i}" )
1333+ }
1334+ o.toBlocking.foreach(println)
1335+ }
1336+
1337+ @ Test def joinExample () {
1338+ val o1 = Observable .interval(500 millis).map(n => " 1: " + n)
1339+ val o2 = Observable .interval(100 millis).map(n => " 2: " + n)
1340+ val o = o1.join(o2,
1341+ (_ : String ) => Observable .timer(300 millis),
1342+ (_ : String ) => Observable .timer(200 millis),
1343+ (t1 : String , t2 : String ) => (t1, t2))
1344+ o.take(10 ).toBlocking.foreach(println)
1345+ }
1346+
1347+ @ Test def groupJoinExample () {
1348+ val o1 = Observable .interval(500 millis).map(n => " 1: " + n)
1349+ val o2 = Observable .interval(100 millis).map(n => " 2: " + n)
1350+ val o = o1.groupJoin(o2,
1351+ (_ : String ) => Observable .timer(300 millis),
1352+ (_ : String ) => Observable .timer(200 millis),
1353+ (t1 : String , t2 : Observable [String ]) => (t1, t2.toSeq.toBlocking.single))
1354+ o.take(3 ).toBlocking.foreach(println)
1355+ }
1356+
1357+ @ Test def pivotExample () {
1358+ val o1 = (1 to 20 ).toObservable.groupBy(i => if (i <= 10 ) " x" else " y" ).map {
1359+ case (t : String , o : Observable [Int ]) => (t, o.groupBy(i => i % 2 == 0 ))
1360+ }
1361+ println(" o1:" )
1362+ (for ((k1, o) <- o1;
1363+ (k2, vs) <- o;
1364+ v <- vs
1365+ ) yield (k1, k2, v)).subscribe(println(_))
1366+ val o2 = o1.pivot
1367+ println(" o2:" )
1368+ (for ((k1, o) <- o2;
1369+ (k2, vs) <- o;
1370+ v <- vs
1371+ ) yield (k1, k2, v)).subscribe(println(_))
1372+ }
11791373}
0 commit comments