Skip to content

Commit 2901c06

Browse files
add operation merge (as instance method)
1 parent 61c2218 commit 2901c06

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scalaimpl/Observable.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,6 +1243,25 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12431243
// TODO naming: follow C# (switch) or Java (switchOnNext)?
12441244
// public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences)
12451245

1246+
/**
1247+
* Flattens two Observables into one Observable, without any transformation.
1248+
* <p>
1249+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
1250+
* <p>
1251+
* You can combine items emitted by two Observables so that they act like a single
1252+
* Observable by using the {@code merge} method.
1253+
*
1254+
* @param that
1255+
* an Observable to be merged
1256+
* @return an Observable that emits items from {@code this} and {@code that} until
1257+
* {@code this} or {@code that} emits {@code onError} or {@code onComplete}.
1258+
*/
1259+
def merge[U >: T](that: Observable[U]): Observable[U] = {
1260+
val thisJava: rx.Observable[_ <: U] = this.asJava
1261+
val thatJava: rx.Observable[_ <: U] = that.asJava
1262+
new Observable[U](rx.Observable.merge(thisJava, thatJava))
1263+
}
1264+
12461265
/**
12471266
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
12481267
* operators).
@@ -1410,7 +1429,7 @@ object Observable {
14101429
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge</a>
14111430
*/
14121431
// public static <T> Observable<T> merge(List<? extends Observable<? extends T>> source)
1413-
// TODO decide if instance method mergeWith (?)
1432+
// TODO do we need this or is the binary instance method merge sufficient?
14141433

14151434
/**
14161435
* Flattens a sequence of Observables emitted by an Observable into one Observable, without any
@@ -1428,7 +1447,6 @@ object Observable {
14281447
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
14291448
*/
14301449
// public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source)
1431-
// TODO decide if instance method mergeWith (?)
14321450

14331451
/**
14341452
* Flattens a series of Observables into one Observable, without any transformation.

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scalatests/RxScalaDemo.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ class RxScalaDemo extends JUnitSuite {
8888
val slowNumbers = Observable.interval(400 millis).take(5).map("slow " + _)
8989
val fastNumbers = Observable.interval(200 millis).take(10).map("fast " + _)
9090

91-
// TODO
91+
(slowNumbers merge fastNumbers).subscribe(output(_))
92+
sleep(2500)
9293
}
9394

9495
}

0 commit comments

Comments
 (0)