Skip to content

Commit b236f89

Browse files
add zipWithIndex, sum, product, and an explanation why no average
1 parent 1179bfd commit b236f89

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,20 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
198198
*/
199199
def zip[U](that: Observable[U]): Observable[(T, U)] = {
200200
Observable[(T, U)](JObservable.zip[T, U, (T, U)](this.asJava, that.asJava, (t: T, u: U) => (t, u)))
201-
}
201+
}
202202

203+
/**
204+
* Zips this Observable with its indices.
205+
*
206+
* @return An Observable emitting pairs consisting of all elements of this Observable paired with
207+
* their index. Indices start at 0.
208+
*/
209+
def zipWithIndex: Observable[(T, Int)] = {
210+
val fScala: (T, Integer) => (T, Int) = (elem: T, index: Integer) => (elem, index)
211+
val fJava : Func2[_ >: T, Integer, _ <: (T, Int)] = fScala
212+
Observable[(T, Int)](asJava.mapWithIndex[(T, Int)](fJava))
213+
}
214+
203215
/**
204216
* Creates an Observable which produces buffers of collected values.
205217
*
@@ -1384,7 +1396,27 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
13841396
def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = {
13851397
Observable[T](asJava.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
13861398
}
1399+
1400+
/**
1401+
* Returns an Observable that sums up the elements of this Observable.
1402+
*
1403+
* @return an Observable emitting the sum of all the elements of the source Observable
1404+
* as its single item.
1405+
*/
1406+
def sum[U >: T](implicit num: Numeric[U]): Observable[U] = {
1407+
fold(num.zero)(num.plus)
1408+
}
13871409

1410+
/**
1411+
* Returns an Observable that multiplies up the elements of this Observable.
1412+
*
1413+
* @return an Observable emitting the product of all the elements of the source Observable
1414+
* as its single item.
1415+
*/
1416+
def product[U >: T](implicit num: Numeric[U]): Observable[U] = {
1417+
fold(num.one)(num.times)
1418+
}
1419+
13881420
/**
13891421
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
13901422
* operators).

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,33 @@ class RxScalaDemo extends JUnitSuite {
233233
assertEquals(None, Observable(1, 2).toBlockingObservable.singleOption)
234234
assertEquals(Some(1), Observable(1) .toBlockingObservable.singleOption)
235235
assertEquals(None, Observable() .toBlockingObservable.singleOption)
236-
}
236+
}
237+
238+
// We can't put a general average method into Observable.scala, because Scala's Numeric
239+
// does not have scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum)
240+
def doubleAverage(o: Observable[Double]): Observable[Double] = {
241+
for ((finalSum, finalCount) <- o.fold((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
242+
yield finalSum / finalCount
243+
}
244+
245+
@Test def averageExample() {
246+
println(doubleAverage(Observable()).toBlockingObservable.single)
247+
println(doubleAverage(Observable(0)).toBlockingObservable.single)
248+
println(doubleAverage(Observable(4.44)).toBlockingObservable.single)
249+
println(doubleAverage(Observable(1, 2, 3.5)).toBlockingObservable.single)
250+
}
251+
252+
@Test def testSum() {
253+
assertEquals(10, Observable(1, 2, 3, 4).sum.toBlockingObservable.single)
254+
assertEquals(6, Observable(4, 2).sum.toBlockingObservable.single)
255+
assertEquals(0, Observable[Int]().sum.toBlockingObservable.single)
256+
}
257+
258+
@Test def testProduct() {
259+
assertEquals(24, Observable(1, 2, 3, 4).product.toBlockingObservable.single)
260+
assertEquals(8, Observable(4, 2).product.toBlockingObservable.single)
261+
assertEquals(1, Observable[Int]().product.toBlockingObservable.single)
262+
}
237263

238264
def output(s: String): Unit = println(s)
239265

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ import java.lang.reflect.Modifier
1212
class CompletenessTest extends JUnitSuite {
1313

1414
val unnecessary = "[considered unnecessary in Scala land]"
15+
16+
val averageProblem = "[We can't have a general average method because Scala's Numeric does not have " +
17+
"scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum). " +
18+
"You can use fold instead to accumulate sum and numberOfElements and divide at the end.]"
1519

1620
val correspondence = defaultMethodCorrespondence ++ Map(
1721
// manually added entries for Java instance methods
@@ -44,6 +48,10 @@ class CompletenessTest extends JUnitSuite {
4448
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
4549

4650
// manually added entries for Java static methods
51+
"average(Observable[Integer])" -> averageProblem,
52+
"averageDoubles(Observable[Double])" -> averageProblem,
53+
"averageFloats(Observable[Float])" -> averageProblem,
54+
"averageLongs(Observable[Long])" -> averageProblem,
4755
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
4856
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
4957
"empty()" -> "apply(T*)",
@@ -55,6 +63,10 @@ class CompletenessTest extends JUnitSuite {
5563
"range(Int, Int)" -> "apply(Range)",
5664
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use (first zip second) map (p => p._1 == p._2)]",
5765
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use (first zip second) map (p => equality(p._1, p._2))]",
66+
"sum(Observable[Integer])" -> "sum(Numeric[U])",
67+
"sumDoubles(Observable[Double])" -> "sum(Numeric[U])",
68+
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
69+
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
5870
"switchDo(Observable[_ <: Observable[_ <: T]])" -> "switch",
5971
"synchronize(Observable[_ <: T])" -> "synchronize",
6072
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]"

0 commit comments

Comments
 (0)