@@ -32,6 +32,15 @@ import org.scalatest.junit.JUnitSuite
3232import rx .lang .scala ._
3333import rx .lang .scala .schedulers ._
3434
35+ /**
36+ * Demo how the different operators can be used. In Eclipse, you can right-click
37+ * a test and choose "Run As" > "Scala JUnit Test".
38+ *
39+ * For each operator added to Observable.java, we add a little usage demo here.
40+ * It does not need to test the functionality (that's already done by the tests in
41+ * RxJava core), but it should demonstrate how it can be used, to make sure that
42+ * the method signature makes sense.
43+ */
3544@ Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
3645class RxScalaDemo extends JUnitSuite {
3746
@@ -78,13 +87,9 @@ class RxScalaDemo extends JUnitSuite {
7887 val first = Observable .from(List (10 , 11 , 12 ))
7988 val second = Observable .from(List (10 , 11 , 12 ))
8089
81- val b1 = (first zip second) map (p => p._1 == p._2) forall (b => b)
82-
83- val equality = (a : Any , b : Any ) => a == b
84- val b2 = (first zip second) map (p => equality(p._1, p._2)) forall (b => b)
90+ val b = (first zip second) forall { case (a, b) => a == b }
8591
86- assertTrue(b1.toBlockingObservable.single)
87- assertTrue(b2.toBlockingObservable.single)
92+ assertTrue(b.toBlockingObservable.single)
8893 }
8994
9095 @ Test def testObservableComparisonWithForComprehension () {
@@ -93,7 +98,7 @@ class RxScalaDemo extends JUnitSuite {
9398
9499 val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
95100
96- val b1 = booleans.forall(_ == true ) // without `== true`, b1 is assigned the forall function
101+ val b1 = booleans.forall(identity)
97102
98103 assertTrue(b1.toBlockingObservable.single)
99104 }
@@ -216,18 +221,21 @@ class RxScalaDemo extends JUnitSuite {
216221 }).flatten.toBlockingObservable.foreach(println(_))
217222 }
218223
219- @ Ignore // TODO something's bad here
220224 @ Test def timingTest1 () {
221225 val numbersByModulo3 = Observable .interval(1000 millis).take(9 ).groupBy(_ % 3 )
222226
223227 val t0 = System .currentTimeMillis
224228
225229 (for ((modulo, numbers) <- numbersByModulo3) yield {
226230 println(" Observable for modulo" + modulo + " started at t = " + (System .currentTimeMillis - t0))
227- numbers.take(1 ) // <- TODO very unexpected
228- // numbers
231+ numbers.map(n => s " ${n} is in the modulo- $modulo group " )
229232 }).flatten.toBlockingObservable.foreach(println(_))
230233 }
234+
235+ @ Test def testOlympicYearTicks () {
236+ Olympics .yearTicks.subscribe(println(_))
237+ waitFor(Olympics .yearTicks)
238+ }
231239
232240 @ Test def groupByExample () {
233241 val medalsByCountry = Olympics .mountainBikeMedals.groupBy(medal => medal.country)
@@ -238,8 +246,10 @@ class RxScalaDemo extends JUnitSuite {
238246 firstMedalOfEachCountry.subscribe(medal => {
239247 println(s " ${medal.country} wins its first medal in ${medal.year}" )
240248 })
249+
250+ Olympics .yearTicks.subscribe(year => println(s " \n Year $year starts. " ))
241251
242- waitFor(firstMedalOfEachCountry )
252+ waitFor(Olympics .yearTicks )
243253 }
244254
245255 @ Test def groupByUntilExample () {
@@ -250,30 +260,36 @@ class RxScalaDemo extends JUnitSuite {
250260 }
251261
252262 @ Test def combineLatestExample () {
253- val first_counter = Observable .interval(250 millis)
254- val second_counter = Observable .interval(550 millis)
255- val combined_counter = first_counter .combineLatest(second_counter ,
263+ val firstCounter = Observable .interval(250 millis)
264+ val secondCounter = Observable .interval(550 millis)
265+ val combinedCounter = firstCounter .combineLatest(secondCounter ,
256266 (x : Long , y : Long ) => List (x,y)) take 10
257267
258- combined_counter subscribe {x => println(s " Emitted group: $x" )}
268+ combinedCounter subscribe {x => println(s " Emitted group: $x" )}
269+ waitFor(combinedCounter)
259270 }
260271
272+ @ Test def olympicsExampleWithoutPublish () {
273+ val medals = Olympics .mountainBikeMedals.doOnEach(_ => println(" onNext" ))
274+ medals.subscribe(println(_)) // triggers an execution of medals Observable
275+ waitFor(medals) // triggers another execution of medals Observable
276+ }
261277
262- @ Test def olympicsExample () {
263- val medals = Olympics .mountainBikeMedals.publish
264- medals.subscribe(println(_))
278+ @ Test def olympicsExampleWithPublish () {
279+ val medals = Olympics .mountainBikeMedals.doOnEach(_ => println( " onNext " )). publish
280+ medals.subscribe(println(_)) // triggers an execution of medals Observable
265281 medals.connect
266- // waitFor(medals)
282+ waitFor(medals) // triggers another execution of medals Observable
267283 }
268284
269285 @ Test def exampleWithoutPublish () {
270- val unshared = List (1 to 4 ).toObservable
286+ val unshared = Observable .from (1 to 4 )
271287 unshared.subscribe(n => println(s " subscriber 1 gets $n" ))
272288 unshared.subscribe(n => println(s " subscriber 2 gets $n" ))
273289 }
274290
275291 @ Test def exampleWithPublish () {
276- val unshared = List (1 to 4 ).toObservable
292+ val unshared = Observable .from (1 to 4 )
277293 val shared = unshared.publish
278294 shared.subscribe(n => println(s " subscriber 1 gets $n" ))
279295 shared.subscribe(n => println(s " subscriber 2 gets $n" ))
@@ -402,7 +418,7 @@ class RxScalaDemo extends JUnitSuite {
402418 }
403419
404420 @ Test def timestampExample () {
405- val timestamped = Observable .interval(100 millis).take(3 ).timestamp.toBlockingObservable
421+ val timestamped = Observable .interval(100 millis).take(6 ).timestamp.toBlockingObservable
406422 for ((millis, value) <- timestamped if value > 0 ) {
407423 println(value + " at t = " + millis)
408424 }
@@ -441,35 +457,57 @@ class RxScalaDemo extends JUnitSuite {
441457 val oc3 : rx.Notification [_ <: Int ] = oc2.asJavaNotification
442458 val oc4 : rx.Notification [_ <: Any ] = oc2.asJavaNotification
443459 }
444-
445- @ Test def elementAtReplacement () {
446- assertEquals(" b" , List (" a" , " b" , " c" ).toObservable.drop(1 ).first.toBlockingObservable.single)
447- }
448-
449- @ Test def elementAtOrDefaultReplacement () {
450- assertEquals(" b" , List (" a" , " b" , " c" ).toObservable.drop(1 ).firstOrElse(" !" ).toBlockingObservable.single)
451- assertEquals(" !!" , List (" a" , " b" , " c" ).toObservable.drop(10 ).firstOrElse(" !!" ).toBlockingObservable.single)
452- }
453460
454461 @ Test def takeWhileWithIndexAlternative {
455462 val condition = true
456463 List (" a" , " b" ).toObservable.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
457464 }
458465
459- @ Test def createExample () {
466+ def calculateElement (index : Int ): String = {
467+ println(" omg I'm calculating so hard" )
468+ index match {
469+ case 0 => " a"
470+ case 1 => " b"
471+ case _ => throw new IllegalArgumentException
472+ }
473+ }
474+
475+ /**
476+ * This is a bad way of using Observable.create, because even if the consumer unsubscribes,
477+ * all elements are calculated.
478+ */
479+ @ Test def createExampleBad () {
460480 val o = Observable .create[String ](observer => {
461- // this is bad because you cannot unsubscribe!
462- observer.onNext(" a" )
463- observer.onNext(" b" )
481+ observer.onNext(calculateElement(0 ))
482+ observer.onNext(calculateElement(1 ))
464483 observer.onCompleted()
465484 Subscription {}
466485 })
467- o.subscribe(println(_))
486+ o.take(1 ).subscribe(println(_))
487+ }
488+
489+ /**
490+ * This is the good way of doing it: If the consumer unsubscribes, no more elements are
491+ * calculated.
492+ */
493+ @ Test def createExampleGood () {
494+ val o = Observable [String ](subscriber => {
495+ var i = 0
496+ while (i < 2 && ! subscriber.isUnsubscribed) {
497+ subscriber.onNext(calculateElement(i))
498+ i += 1
499+ }
500+ if (! subscriber.isUnsubscribed) subscriber.onCompleted()
501+ })
502+ o.take(1 ).subscribe(println(_))
468503 }
469504
470505 def output (s : String ): Unit = println(s)
471506
472- // blocks until obs has completed
507+ /** Subscribes to obs and waits until obs has completed. Note that if you subscribe to
508+ * obs yourself and also call waitFor(obs), all side-effects of subscribing to obs
509+ * will happen twice.
510+ */
473511 def waitFor [T ](obs : Observable [T ]): Unit = {
474512 obs.toBlockingObservable.toIterable.last
475513 }
0 commit comments