Skip to content

Commit 2b7f062

Browse files
add merge operation and examples
and try to make Olympics groupBy work with timing, but did not work due to problems with RxJava groupBy, see pull #289
1 parent a078522 commit 2b7f062

File tree

4 files changed

+80
-6
lines changed

4 files changed

+80
-6
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,26 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12391239
Observable[U](rx.Observable.merge(thisJava, thatJava))
12401240
}
12411241

1242+
/**
1243+
* Flattens the sequence of Observables emitted by {@code this} into one Observable, without any
1244+
* transformation.
1245+
* <p>
1246+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
1247+
* <p>
1248+
* You can combine the items emitted by multiple Observables so that they act like a single
1249+
* Observable, by using the {@code merge} method.
1250+
*
1251+
* @return an Observable that emits items that are the result of flattening the items emitted
1252+
* by the Observables emitted by {@code this}
1253+
*/
1254+
def merge[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
1255+
val o2: Observable[Observable[U]] = this
1256+
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJava)
1257+
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJava
1258+
val o5 = rx.Observable.merge[U](o4)
1259+
Observable[U](o5)
1260+
}
1261+
12421262
/**
12431263
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
12441264
* <p>

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ object Olympics {
5151
def fourYearsEmpty: Observable[Medal] = {
5252
// TODO this should return an observable which emits nothing during fourYears and then completes
5353
// Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests
54+
// And this https://github.com/Netflix/RxJava/pull/289#issuecomment-24738668 also causes problems
5455
// So we don't use this:
5556
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
5657
// But we just return empty, which completes immediately

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,33 @@ class RxScalaDemo extends JUnitSuite {
9595
println((before ++ source).toBlockingObservable.toList)
9696
}
9797

98-
@Test def mergeExample() {
98+
@Test def mergeTwoExample() {
9999
val slowNumbers = Observable.interval(400 millis).take(5).map("slow " + _)
100100
val fastNumbers = Observable.interval(200 millis).take(10).map("fast " + _)
101101
val o = (slowNumbers merge fastNumbers)
102102
o.subscribe(output(_))
103103
waitFor(o)
104104
}
105105

106+
def myInterval(period: Long): Observable[String] = {
107+
Observable.interval(period.millis).map(n => s"Obs-$period emits $n")
108+
}
109+
110+
@Test def mergeManyExample() {
111+
val o = Observable.interval(500 millis).map(n => myInterval((n+1)*100))
112+
val stopper = Observable.interval(5 seconds)
113+
o.merge.takeUntil(stopper).toBlockingObservable.foreach(println(_))
114+
}
115+
116+
@Test def mergeSomeExample() {
117+
// To merge some observables which are all known already:
118+
Observable(
119+
Observable.interval(200 millis),
120+
Observable.interval(400 millis),
121+
Observable.interval(800 millis)
122+
).merge.take(12).toBlockingObservable.foreach(println(_))
123+
}
124+
106125
@Test def rangeAndBufferExample() {
107126
val o = Observable(1 to 18)
108127
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
@@ -178,6 +197,29 @@ class RxScalaDemo extends JUnitSuite {
178197
assertEquals(List(0, 1, 2, 3), t.toBlockingObservable.toList)
179198
}
180199

200+
@Test def timingTest() {
201+
val firstOnly = false
202+
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)
203+
204+
(for ((modulo, numbers) <- numbersByModulo3) yield {
205+
println("Observable for modulo" + modulo + " started")
206+
207+
if (firstOnly) numbers.take(1) else numbers
208+
}).merge.toBlockingObservable.foreach(println(_))
209+
}
210+
211+
@Test def timingTest1() {
212+
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)
213+
214+
val t0 = System.currentTimeMillis
215+
216+
(for ((modulo, numbers) <- numbersByModulo3) yield {
217+
println("Observable for modulo" + modulo + " started at t = " + (System.currentTimeMillis - t0))
218+
numbers.take(1) // <- TODO very unexpected
219+
//numbers
220+
}).merge.toBlockingObservable.foreach(println(_))
221+
}
222+
181223
@Test def groupByExample() {
182224
val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country)
183225

@@ -191,6 +233,13 @@ class RxScalaDemo extends JUnitSuite {
191233
waitFor(firstMedalOfEachCountry)
192234
}
193235

236+
@Test def olympicsExample() {
237+
val (go, medals) = Olympics.mountainBikeMedals.publish
238+
medals.subscribe(println(_))
239+
go()
240+
waitFor(medals)
241+
}
242+
194243
@Test def exampleWithoutPublish() {
195244
val unshared = Observable(1 to 4)
196245
unshared.subscribe(n => println(s"subscriber 1 gets $n"))
@@ -260,7 +309,7 @@ class RxScalaDemo extends JUnitSuite {
260309
assertEquals(8, Observable(4, 2).product.toBlockingObservable.single)
261310
assertEquals(1, Observable[Int]().product.toBlockingObservable.single)
262311
}
263-
312+
264313
def output(s: String): Unit = println(s)
265314

266315
// blocks until obs has completed

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ class CompletenessTest extends JUnitSuite {
5858
"error(Throwable)" -> "apply(Throwable)",
5959
"from(Array[T])" -> "apply(T*)",
6060
"from(Iterable[_ <: T])" -> "apply(T*)",
61-
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[T])",
61+
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
62+
"merge(Observable[_ <: Observable[_ <: T]])" -> "merge(<:<[Observable[T], Observable[Observable[U]]])",
6263
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[T])",
6364
"range(Int, Int)" -> "apply(Range)",
6465
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use (first zip second) map (p => p._1 == p._2)]",
@@ -67,8 +68,8 @@ class CompletenessTest extends JUnitSuite {
6768
"sumDoubles(Observable[Double])" -> "sum(Numeric[U])",
6869
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
6970
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
71+
"synchronize(Observable[T])" -> "synchronize",
7072
"switchDo(Observable[_ <: Observable[_ <: T]])" -> "switch",
71-
"synchronize(Observable[_ <: T])" -> "synchronize",
7273
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]"
7374
) ++ List.iterate("T", 9)(s => s + ", T").map(
7475
// all 9 overloads of startWith:
@@ -84,8 +85,11 @@ class CompletenessTest extends JUnitSuite {
8485
val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("")
8586
val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("")
8687
("zip(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", unnecessary)
87-
}).toMap
88-
88+
}).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
89+
// merge 3-9:
90+
"merge(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).merge instead]"
91+
).drop(2).toMap
92+
8993
def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2")
9094

9195
def methodMembersToMethodStrings(members: Iterable[Symbol]): Iterable[String] = {

0 commit comments

Comments
 (0)