Skip to content

Commit 3097f3e

Browse files
author
Christopher Grimm
committed
Added support for custom functions in combineLatest.
1 parent 83aa857 commit 3097f3e

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,15 @@ class RxScalaDemo extends JUnitSuite {
252252
sequenced subscribe {x => println(s"Emitted group: $x")}
253253
}
254254

255+
@Test def combineLatestExample() {
256+
val first_counter = Observable.interval(250 millis)
257+
val second_counter = Observable.interval(550 millis)
258+
val combined_counter = first_counter.combineLatest(second_counter,
259+
(x: Long, y: Long) => List(x,y)) take 10
260+
261+
combined_counter subscribe {x => println(s"Emitted group: $x")}
262+
}
263+
255264

256265
@Test def olympicsExample() {
257266
val medals = Olympics.mountainBikeMedals.publish

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,22 @@ trait Observable[+T]
14861486
toScalaObservable[(T, U)](rx.Observable.combineLatest[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, f))
14871487
}
14881488

1489+
1490+
/**
1491+
* Combines two observables, emitting some type `R` specified in the function f,
1492+
* each time an event is received from one of the source observables, where the aggregation
1493+
* is defined by the given function.
1494+
*
1495+
*@param that
1496+
* The second source observable.
1497+
*@param f
1498+
The function that is used combine the emissions of the two observables.
1499+
*@return An Observable that combines the source Observables according to the function f.
1500+
*/
1501+
def combineLatest[U,R](that: Observable[U], f: (T, U) => R): Observable[R] = {
1502+
toScalaObservable[R](rx.Observable.combineLatest[T, U, R](this.asJavaObservable, that.asJavaObservable, f))
1503+
}
1504+
14891505
/**
14901506
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
14911507
*

0 commit comments

Comments
 (0)