Skip to content

Commit 0586b8a

Browse files
Merge pull request #1398 from zsxwing/rxscala-currying
RxScala: Use currying to improve type inference
2 parents 13ed3ca + 02699f9 commit 0586b8a

File tree

4 files changed

+149
-107
lines changed

4 files changed

+149
-107
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 69 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -182,27 +182,72 @@ class RxScalaDemo extends JUnitSuite {
182182
).toObservable.flatten(2).toBlocking.foreach(println(_))
183183
}
184184

185-
@Test def rangeAndBufferExample() {
185+
@Test def tumblingBufferExample() {
186186
val o = Observable.from(1 to 18)
187-
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
187+
o.tumblingBuffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
188188
}
189189

190-
@Test def bufferExample() {
190+
@Test def tumblingBufferExample2() {
191191
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
192192
val boundary = Observable.interval(500 millis)
193-
o.buffer(boundary).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
193+
o.tumblingBuffer(boundary).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
194194
}
195195

196-
@Test def windowExample() {
197-
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
196+
@Test def slidingBufferExample() {
197+
val o = Observable.from(1 to 18).slidingBuffer(4, 2)
198+
o.subscribe(println(_))
199+
}
200+
201+
@Test def slidingBufferExample2() {
202+
val open = Observable.interval(300 millis)
203+
val closing = Observable.interval(600 millis)
204+
val o = Observable.interval(100 millis).take(20).slidingBuffer(open)(_ => closing)
205+
o.zipWithIndex.toBlocking.foreach {
206+
case (seq, i) => println(s"Observable#$i emits $seq")
207+
}
208+
}
209+
210+
@Test def slidingBufferExample3() {
211+
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
212+
o.slidingBuffer(500 millis, 200 millis).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
213+
}
214+
215+
@Test def tumblingExample() {
216+
(for ((o, i) <- Observable.from(1 to 18).tumbling(5).zipWithIndex; n <- o)
198217
yield s"Observable#$i emits $n"
199218
).subscribe(output(_))
200219
}
201220

202-
@Test def windowExample2() {
221+
@Test def tumblingExample2() {
203222
val windowObservable = Observable.interval(500 millis)
204223
val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1)
205-
(for ((o, i) <- o.window(windowObservable).zipWithIndex; n <- o)
224+
(for ((o, i) <- o.tumbling(windowObservable).zipWithIndex; n <- o)
225+
yield s"Observable#$i emits $n"
226+
).toBlocking.foreach(println)
227+
}
228+
229+
@Test def slidingExample() {
230+
val o = Observable.from(1 to 18).sliding(4, 2)
231+
(for ((o, i) <- o.zipWithIndex;
232+
n <- o)
233+
yield s"Observable#$i emits $n"
234+
).toBlocking.foreach(println)
235+
}
236+
237+
@Test def slidingExample2() {
238+
val o = Observable.interval(100 millis).take(20).sliding(500 millis, 200 millis)
239+
(for ((o, i) <- o.zipWithIndex;
240+
n <- o)
241+
yield s"Observable#$i emits $n"
242+
).toBlocking.foreach(println)
243+
}
244+
245+
@Test def slidingExample3() {
246+
val open = Observable.interval(300 millis)
247+
val closing = Observable.interval(600 millis)
248+
val o = Observable.interval(100 millis).take(20).sliding(open)(_ => closing)
249+
(for ((o, i) <- o.zipWithIndex;
250+
n <- o)
206251
yield s"Observable#$i emits $n"
207252
).toBlocking.foreach(println)
208253
}
@@ -336,23 +381,22 @@ class RxScalaDemo extends JUnitSuite {
336381

337382
@Test def groupByUntilExample() {
338383
val numbers = Observable.interval(250 millis).take(14)
339-
val grouped = numbers.groupByUntil[Long](x => x % 2, {case (key, obs) => obs.filter(x => x == 7)})
384+
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
340385
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
341386
sequenced.subscribe(x => println(s"Emitted group: $x"))
342387
}
343388

344389
@Test def groupByUntilExample2() {
345390
val numbers = Observable.interval(250 millis).take(14)
346-
val grouped = numbers.groupByUntil[Long, Long](x => x % 2, x => x * 10, {case (key, obs) => Observable.interval(2 seconds)})
391+
val grouped = numbers.groupByUntil(x => x % 2, x => x * 10){ case (key, obs) => Observable.interval(2 seconds) }
347392
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
348393
sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
349394
}
350395

351396
@Test def combineLatestExample() {
352397
val firstCounter = Observable.interval(250 millis)
353398
val secondCounter = Observable.interval(550 millis)
354-
val combinedCounter = firstCounter.combineLatest(secondCounter,
355-
(x: Long, y: Long) => List(x,y)) take 10
399+
val combinedCounter = firstCounter.combineLatestWith(secondCounter)(List(_, _)) take 10
356400

357401
combinedCounter subscribe {x => println(s"Emitted group: $x")}
358402
waitFor(combinedCounter)
@@ -363,7 +407,7 @@ class RxScalaDemo extends JUnitSuite {
363407
val secondCounter = Observable.interval(550 millis)
364408
val thirdCounter = Observable.interval(850 millis)
365409
val sources = Seq(firstCounter, secondCounter, thirdCounter)
366-
val combinedCounter = Observable.combineLatest(sources, (items: Seq[Long]) => items.toList).take(10)
410+
val combinedCounter = Observable.combineLatest(sources)(_.toList).take(10)
367411

368412
combinedCounter subscribe {x => println(s"Emitted group: $x")}
369413
waitFor(combinedCounter)
@@ -564,6 +608,12 @@ class RxScalaDemo extends JUnitSuite {
564608
o1.zip(iter).toBlocking.foreach(println(_))
565609
}
566610

611+
@Test def zipWithExample() {
612+
val xs = Observable.items(1, 3, 5, 7)
613+
val ys = Observable.items(2, 4, 6, 8)
614+
xs.zipWith(ys)(_ * _).subscribe(println(_))
615+
}
616+
567617
@Test def takeFirstWithCondition() {
568618
val condition: Int => Boolean = _ >= 3
569619
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlocking.single)
@@ -1070,15 +1120,15 @@ class RxScalaDemo extends JUnitSuite {
10701120

10711121
@Test def multicastExample1(): Unit = {
10721122
val unshared = Observable.from(1 to 4)
1073-
val shared = unshared.multicast(Subject())
1123+
val shared = unshared.multicast(Subject[Int]())
10741124
shared.subscribe(n => println(s"subscriber 1 gets $n"))
10751125
shared.subscribe(n => println(s"subscriber 2 gets $n"))
10761126
shared.connect
10771127
}
10781128

10791129
@Test def multicastExample2(): Unit = {
10801130
val unshared = Observable.from(1 to 4)
1081-
val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _))
1131+
val shared = unshared.multicast(() => Subject[Int]())(o => o.map("No. " + _))
10821132
shared.subscribe(n => println(s"subscriber 1 gets $n"))
10831133
shared.subscribe(n => println(s"subscriber 2 gets $n"))
10841134
}
@@ -1298,10 +1348,7 @@ class RxScalaDemo extends JUnitSuite {
12981348

12991349
@Test def flatMapExample5() {
13001350
val o = Observable.items(1, 10, 100, 1000)
1301-
o.flatMap(
1302-
(n: Int) => Observable.interval(200 millis).take(5),
1303-
(n: Int, m: Long) => n * m
1304-
).toBlocking.foreach(println)
1351+
o.flatMapWith(_ => Observable.interval(200 millis).take(5))(_ * _).toBlocking.foreach(println)
13051352
}
13061353

13071354
@Test def flatMapIterableExample() {
@@ -1312,10 +1359,7 @@ class RxScalaDemo extends JUnitSuite {
13121359

13131360
@Test def flatMapIterableExample2() {
13141361
val o = Observable.items(1, 10, 100, 1000)
1315-
o.flatMapIterable(
1316-
(n: Int) => (1 to 5),
1317-
(n: Int, m: Int) => n * m
1318-
).toBlocking.foreach(println)
1362+
o.flatMapIterableWith(_=> (1 to 5))(_ * _).toBlocking.foreach(println)
13191363
}
13201364

13211365
@Test def concatMapExample() {
@@ -1369,20 +1413,14 @@ class RxScalaDemo extends JUnitSuite {
13691413
@Test def joinExample() {
13701414
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
13711415
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1372-
val o = o1.join(o2,
1373-
(_: String) => Observable.timer(300 millis),
1374-
(_: String) => Observable.timer(200 millis),
1375-
(t1: String, t2: String) => (t1, t2))
1416+
val o = o1.join(o2)(_ => Observable.timer(300 millis), _ => Observable.timer(200 millis), (_, _))
13761417
o.take(10).toBlocking.foreach(println)
13771418
}
13781419

13791420
@Test def groupJoinExample() {
13801421
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
13811422
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1382-
val o = o1.groupJoin(o2,
1383-
(_: String) => Observable.timer(300 millis),
1384-
(_: String) => Observable.timer(200 millis),
1385-
(t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single))
1423+
val o = o1.groupJoin(o2)(_ => Observable.timer(300 millis), _ => Observable.timer(200 millis), (t1, t2) => (t1, t2.toSeq.toBlocking.single))
13861424
o.take(3).toBlocking.foreach(println)
13871425
}
13881426

0 commit comments

Comments
 (0)