Skip to content

Commit a995623

Browse files
Added join
1 parent bd87341 commit a995623

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,48 @@ trait Observable[+T]
13481348
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
13491349
}
13501350

1351+
/**
1352+
* Correlates the items emitted by two Observables based on overlapping durations.
1353+
* <p>
1354+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/join_.png">
1355+
*
1356+
* @param inner
1357+
* the second Observable to join items from
1358+
* @param leftDurationSelector
1359+
* a function to select a duration for each item emitted by the source Observable,
1360+
* used to determine overlap
1361+
* @param rightDurationSelector
1362+
* a function to select a duration for each item emitted by the inner Observable,
1363+
* used to determine overlap
1364+
* @param resultSelector
1365+
* a function that computes an item to be emitted by the resulting Observable for any
1366+
* two overlapping items emitted by the two Observables
1367+
* @return
1368+
* an Observable that emits items correlating to items emitted by the source Observables
1369+
* that have overlapping durations
1370+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#join">RxJava Wiki: join()</a>
1371+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229750.aspx">MSDN: Observable.Join</a>
1372+
*/
1373+
def join[S, LeftClosing, RightClosing, R] (
1374+
inner: Observable[S],
1375+
leftDurationSelector: T=>Observable[LeftClosing],
1376+
rightDurationSelector: S=>Observable[RightClosing],
1377+
resultSelector: (T,S) => R
1378+
): Observable[R] = {
1379+
1380+
val outer : rx.Observable[_ <: T] = this.asJavaObservable
1381+
val left: Func1[_ >: T, _<: rx.Observable[_ <: LeftClosing]] = (t: T) => leftDurationSelector(t).asJavaObservable
1382+
val right: Func1[_ >: S, _<: rx.Observable[_ <: RightClosing]] = (s: S) => rightDurationSelector(s).asJavaObservable
1383+
1384+
val o1 = outer.asInstanceOf[rx.Observable[T]].join[S, LeftClosing, RightClosing, R](
1385+
inner.asJavaObservable.asInstanceOf[rx.Observable[S]],
1386+
left. asInstanceOf[Func1[T, rx.Observable[LeftClosing]]],
1387+
right.asInstanceOf[Func1[S, rx.Observable[RightClosing]]],
1388+
resultSelector.asInstanceOf[Func2[T,S,R]])
1389+
1390+
toScalaObservable[R](o1)
1391+
}
1392+
13511393
/**
13521394
* Given an Observable that emits Observables, creates a single Observable that
13531395
* emits the items emitted by the most recently published of those Observables.

0 commit comments

Comments
 (0)