Skip to content

Commit 4f793bf

Browse files
Merge pull request #763 from chrisgrimm/master
Added support for custom functions in combineLatest.
2 parents 50096fc + 3097f3e commit 4f793bf

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,15 @@ class RxScalaDemo extends JUnitSuite {
249249
sequenced.subscribe(x => println(s"Emitted group: $x"))
250250
}
251251

252+
@Test def combineLatestExample() {
253+
val first_counter = Observable.interval(250 millis)
254+
val second_counter = Observable.interval(550 millis)
255+
val combined_counter = first_counter.combineLatest(second_counter,
256+
(x: Long, y: Long) => List(x,y)) take 10
257+
258+
combined_counter subscribe {x => println(s"Emitted group: $x")}
259+
}
260+
252261

253262
@Test def olympicsExample() {
254263
val medals = Olympics.mountainBikeMedals.publish

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,6 +1528,22 @@ trait Observable[+T]
15281528
toScalaObservable[(T, U)](rx.Observable.combineLatest[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, f))
15291529
}
15301530

1531+
1532+
/**
1533+
* Combines two observables, emitting some type `R` specified in the function f,
1534+
* each time an event is received from one of the source observables, where the aggregation
1535+
* is defined by the given function.
1536+
*
1537+
*@param that
1538+
* The second source observable.
1539+
*@param f
1540+
The function that is used combine the emissions of the two observables.
1541+
*@return An Observable that combines the source Observables according to the function f.
1542+
*/
1543+
def combineLatest[U,R](that: Observable[U], f: (T, U) => R): Observable[R] = {
1544+
toScalaObservable[R](rx.Observable.combineLatest[T, U, R](this.asJavaObservable, that.asJavaObservable, f))
1545+
}
1546+
15311547
/**
15321548
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
15331549
*

0 commit comments

Comments
 (0)