@@ -158,47 +158,53 @@ class RxScalaDemo extends JUnitSuite {
158158 waitFor(o)
159159 }
160160
161- def sampleAllUntilComplete [T ](o : Observable [T ], period : Duration ): Observable [T ] = {
162- for ((element, tick) <- o zip Observable .interval(period)) yield element
161+ @ Test def testGroupByThenFlatMap () {
162+ val m = Observable (1 , 2 , 3 , 4 )
163+ val g = m.groupBy(i => i % 2 )
164+ val t = g.flatMap((p : (Int , Observable [Int ])) => p._2)
165+ assertEquals(List (1 , 2 , 3 , 4 ), t.toBlockingObservable.toList)
163166 }
164167
165- @ Ignore // TODO
166- @ Test def groupByExample () {
167- import Olympics ._
168- // `: _*` converts list to varargs
169- val medals = Observable [Medal ](Olympics .mountainBikeMedals : _* )
170-
171- // 1 second = 4 years :D
172- val medalsByYear = sampleAllUntilComplete(medals.groupBy(_.year), 1 seconds)
173-
174- /*
175- val t = (for ((year, medals) <- medalsByYear) yield medals).flatMap(ms => ms)
176- t.subscribe(println(_))
177- */
178-
179- val timedMedals = for ((year, medals) <- medalsByYear; medal <- medals) yield medal
180-
181- timedMedals.subscribe(println(_)) // doesn't print ???
182-
183- Thread .sleep(5000 )
184-
185- /*
186- medalsByYear.subscribe(p => println(p._1))
187-
188- //waitFor(medalsByYear)
168+ @ Test def testGroupByThenFlatMapByForComprehension () {
169+ val m = Observable (1 , 2 , 3 , 4 )
170+ val g = m.groupBy(i => i % 2 )
171+ val t = for ((i, o) <- g; n <- o) yield n
172+ assertEquals(List (1 , 2 , 3 , 4 ), t.toBlockingObservable.toList)
173+ }
174+
175+ @ Test def testGroupByThenFlatMapByForComprehensionWithTiming () {
176+ val m = Observable .interval(100 millis).take(4 )
177+ val g = m.groupBy(i => i % 2 )
178+ val t = for ((i, o) <- g; n <- o) yield n
179+ assertEquals(List (0 , 1 , 2 , 3 ), t.toBlockingObservable.toList)
180+ }
181+
182+ @ Test def groupByExampleTest () {
183+ val medalsByCountry = Olympics .mountainBikeMedals.groupBy(medal => medal.country)
189184
190- val byCountry = medals.groupBy(_.country)
185+ val firstMedalOfEachCountry =
186+ medalsByCountry.flatMap((p : (String , Observable [Olympics .Medal ])) => p._2.take(1 ))
187+
188+ firstMedalOfEachCountry.subscribe(medal => {
189+ println(s " ${medal.country} wins its first medal in ${medal.year}" )
190+ })
191191
192- def score(medals: Observable[Medal]) = medals.fold((0, 0, 0))((s, m) => (s, m.medal) match {
193- case ((gold, silver, bronze), "Gold") => (gold+1, silver, bronze)
194- case ((gold, silver, bronze), "Silver") => (gold, silver+1, bronze)
195- case ((gold, silver, bronze), "Bronze") => (gold, silver, bronze+1)
196- })
192+ // waitFor(firstMedalOfEachCountry)
193+ Thread .sleep(20000 )
194+ }
195+
196+ @ Ignore // TODO this test one does not terminate!
197+ @ Test def groupByExample () {
198+ val medalsByCountry = Olympics .mountainBikeMedals.groupBy(medal => medal.country)
197199
198- val scores = for ((country, medals) <- byCountry) yield (country, score(medals))
200+ val firstMedalOfEachCountry =
201+ for ((country, medals) <- medalsByCountry; firstMedal <- medals.take(1 )) yield firstMedal
202+
203+ firstMedalOfEachCountry.subscribe(medal => {
204+ println(s " ${medal.country} wins its first medal in ${medal.year}" )
205+ })
199206
200- println(scores.toBlockingObservable.toList)
201- */
207+ waitFor(firstMedalOfEachCountry)
202208 }
203209
204210 def output (s : String ): Unit = println(s)
0 commit comments