Skip to content

Commit 58c26d5

Browse files
committed
Use currying to improve type inference
1 parent 9c1ca02 commit 58c26d5

File tree

4 files changed

+87
-83
lines changed

4 files changed

+87
-83
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,19 @@ class RxScalaDemo extends JUnitSuite {
187187
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
188188
}
189189

190-
@Test def bufferExample() {
190+
@Test def tumblingBufferExample() {
191191
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
192192
val boundary = Observable.interval(500 millis)
193-
o.buffer(boundary).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
193+
o.tumblingBuffer(boundary).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
194+
}
195+
196+
@Test def slidingBufferExample() {
197+
val open = Observable.interval(300 millis)
198+
val closing = Observable.interval(600 millis)
199+
val o = Observable.interval(100 millis).take(20).slidingBuffer(open)(_ => closing)
200+
o.zipWithIndex.toBlocking.foreach {
201+
case (seq, i) => println(s"Observable#$i emits $seq")
202+
}
194203
}
195204

196205
@Test def windowExample() {
@@ -199,10 +208,20 @@ class RxScalaDemo extends JUnitSuite {
199208
).subscribe(output(_))
200209
}
201210

202-
@Test def windowExample2() {
211+
@Test def tumblingExample() {
203212
val windowObservable = Observable.interval(500 millis)
204213
val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1)
205-
(for ((o, i) <- o.window(windowObservable).zipWithIndex; n <- o)
214+
(for ((o, i) <- o.tumbling(windowObservable).zipWithIndex; n <- o)
215+
yield s"Observable#$i emits $n"
216+
).toBlocking.foreach(println)
217+
}
218+
219+
@Test def slidingExample() {
220+
val open = Observable.interval(300 millis)
221+
val closing = Observable.interval(600 millis)
222+
val o = Observable.interval(100 millis).take(20).sliding(open)(_ => closing)
223+
(for ((o, i) <- o.zipWithIndex;
224+
n <- o)
206225
yield s"Observable#$i emits $n"
207226
).toBlocking.foreach(println)
208227
}
@@ -336,23 +355,22 @@ class RxScalaDemo extends JUnitSuite {
336355

337356
@Test def groupByUntilExample() {
338357
val numbers = Observable.interval(250 millis).take(14)
339-
val grouped = numbers.groupByUntil[Long](x => x % 2, {case (key, obs) => obs.filter(x => x == 7)})
358+
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
340359
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
341360
sequenced.subscribe(x => println(s"Emitted group: $x"))
342361
}
343362

344363
@Test def groupByUntilExample2() {
345364
val numbers = Observable.interval(250 millis).take(14)
346-
val grouped = numbers.groupByUntil[Long, Long](x => x % 2, x => x * 10, {case (key, obs) => Observable.interval(2 seconds)})
365+
val grouped = numbers.groupByUntil(x => x % 2, x => x * 10){ case (key, obs) => Observable.interval(2 seconds) }
347366
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
348367
sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
349368
}
350369

351370
@Test def combineLatestExample() {
352371
val firstCounter = Observable.interval(250 millis)
353372
val secondCounter = Observable.interval(550 millis)
354-
val combinedCounter = firstCounter.combineLatest(secondCounter,
355-
(x: Long, y: Long) => List(x,y)) take 10
373+
val combinedCounter = firstCounter.combineLatestWith(secondCounter)(List(_, _)) take 10
356374

357375
combinedCounter subscribe {x => println(s"Emitted group: $x")}
358376
waitFor(combinedCounter)
@@ -363,7 +381,7 @@ class RxScalaDemo extends JUnitSuite {
363381
val secondCounter = Observable.interval(550 millis)
364382
val thirdCounter = Observable.interval(850 millis)
365383
val sources = Seq(firstCounter, secondCounter, thirdCounter)
366-
val combinedCounter = Observable.combineLatest(sources, (items: Seq[Long]) => items.toList).take(10)
384+
val combinedCounter = Observable.combineLatest(sources)(_.toList).take(10)
367385

368386
combinedCounter subscribe {x => println(s"Emitted group: $x")}
369387
waitFor(combinedCounter)
@@ -564,6 +582,12 @@ class RxScalaDemo extends JUnitSuite {
564582
o1.zip(iter).toBlocking.foreach(println(_))
565583
}
566584

585+
@Test def zipWithExample() {
586+
val xs = Observable.items(1, 3, 5, 7)
587+
val ys = Observable.items(2, 4, 6, 8)
588+
xs.zipWith(ys)(_ * _).subscribe(println(_))
589+
}
590+
567591
@Test def takeFirstWithCondition() {
568592
val condition: Int => Boolean = _ >= 3
569593
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlocking.single)
@@ -1070,15 +1094,15 @@ class RxScalaDemo extends JUnitSuite {
10701094

10711095
@Test def multicastExample1(): Unit = {
10721096
val unshared = Observable.from(1 to 4)
1073-
val shared = unshared.multicast(Subject())
1097+
val shared = unshared.multicast(Subject[Int]())
10741098
shared.subscribe(n => println(s"subscriber 1 gets $n"))
10751099
shared.subscribe(n => println(s"subscriber 2 gets $n"))
10761100
shared.connect
10771101
}
10781102

10791103
@Test def multicastExample2(): Unit = {
10801104
val unshared = Observable.from(1 to 4)
1081-
val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _))
1105+
val shared = unshared.multicast(() => Subject[Int]())(o => o.map("No. " + _))
10821106
shared.subscribe(n => println(s"subscriber 1 gets $n"))
10831107
shared.subscribe(n => println(s"subscriber 2 gets $n"))
10841108
}
@@ -1298,10 +1322,7 @@ class RxScalaDemo extends JUnitSuite {
12981322

12991323
@Test def flatMapExample5() {
13001324
val o = Observable.items(1, 10, 100, 1000)
1301-
o.flatMap(
1302-
(n: Int) => Observable.interval(200 millis).take(5),
1303-
(n: Int, m: Long) => n * m
1304-
).toBlocking.foreach(println)
1325+
o.flatMapWith(_ => Observable.interval(200 millis).take(5))(_ * _).toBlocking.foreach(println)
13051326
}
13061327

13071328
@Test def flatMapIterableExample() {
@@ -1312,10 +1333,7 @@ class RxScalaDemo extends JUnitSuite {
13121333

13131334
@Test def flatMapIterableExample2() {
13141335
val o = Observable.items(1, 10, 100, 1000)
1315-
o.flatMapIterable(
1316-
(n: Int) => (1 to 5),
1317-
(n: Int, m: Int) => n * m
1318-
).toBlocking.foreach(println)
1336+
o.flatMapIterableWith(_=> (1 to 5))(_ * _).toBlocking.foreach(println)
13191337
}
13201338

13211339
@Test def concatMapExample() {
@@ -1369,20 +1387,14 @@ class RxScalaDemo extends JUnitSuite {
13691387
@Test def joinExample() {
13701388
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
13711389
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1372-
val o = o1.join(o2,
1373-
(_: String) => Observable.timer(300 millis),
1374-
(_: String) => Observable.timer(200 millis),
1375-
(t1: String, t2: String) => (t1, t2))
1390+
val o = o1.join(o2)(_ => Observable.timer(300 millis), _ => Observable.timer(200 millis), (_, _))
13761391
o.take(10).toBlocking.foreach(println)
13771392
}
13781393

13791394
@Test def groupJoinExample() {
13801395
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
13811396
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1382-
val o = o1.groupJoin(o2,
1383-
(_: String) => Observable.timer(300 millis),
1384-
(_: String) => Observable.timer(200 millis),
1385-
(t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single))
1397+
val o = o1.groupJoin(o2)(_ => Observable.timer(300 millis), _ => Observable.timer(200 millis), (t1, t2) => (t1, t2.toSeq.toBlocking.single))
13861398
o.take(3).toBlocking.foreach(println)
13871399
}
13881400

0 commit comments

Comments
 (0)