Skip to content

Commit 4e31815

Browse files
add combineLatest, apply(java Future), remove docu without impl
1 parent b4019b9 commit 4e31815

File tree

2 files changed

+35
-115
lines changed

2 files changed

+35
-115
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 25 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717

1818
package rx.lang.scala
1919

20-
import org.scalatest.junit.JUnitSuite
21-
import scala.collection.Seq
22-
import rx.lang.scala.observables.BlockingObservable
23-
24-
2520
/**
2621
* The Observable interface that implements the Reactive Pattern.
2722
*/
@@ -38,6 +33,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
3833
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
3934
import rx.lang.scala.util._
4035
import rx.lang.scala.subjects.Subject
36+
import rx.lang.scala.observables.BlockingObservable
4137
import rx.lang.scala.ImplicitFunctionConversions._
4238

4339
/**
@@ -1149,17 +1145,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
11491145
* @return an Observable that emits items from the source Observable so long as the predicate
11501146
* continues to return <code>true</code> for each item, then completes
11511147
*/
1152-
// TODO: if we have zipWithIndex, takeWhileWithIndex is not needed any more
11531148
def takeWhileWithIndex(predicate: (T, Integer) => Boolean): Observable[T] = {
11541149
Observable[T](asJava.takeWhileWithIndex(predicate))
11551150
}
11561151

1157-
/* TODO zipWithIndex once it's in RxJava
1158-
def zipWithIndex: Observable[(T, Int)] = {
1159-
???
1160-
}
1161-
*/
1162-
11631152
/**
11641153
* Returns an Observable that emits only the last <code>count</code> items emitted by the source
11651154
* Observable.
@@ -1339,6 +1328,20 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
13391328
Observable[U](o5)
13401329
}
13411330

1331+
/**
1332+
* Combines two observables, emitting a pair of the latest values of each of
1333+
* the source observables each time an event is received from one of the source observables, where the
1334+
* aggregation is defined by the given function.
1335+
*
1336+
* @param that
1337+
* The second source observable.
1338+
* @return An Observable that combines the source Observables
1339+
*/
1340+
def combineLatest[U](that: Observable[U]): Observable[(T, U)] = {
1341+
val f: Func2[_ >: T, _ >: U, _ <: (T, U)] = (t: T, u: U) => (t, u)
1342+
Observable[(T, U)](rx.Observable.combineLatest[T, U, (T, U)](this.asJava, that.asJava, f))
1343+
}
1344+
13421345
/**
13431346
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
13441347
* <p>
@@ -1761,7 +1764,6 @@ object Observable {
17611764
import scala.collection.JavaConverters._
17621765
import scala.collection.immutable.Range
17631766
import scala.concurrent.duration.Duration
1764-
import scala.concurrent.Future
17651767
import rx.{Observable => JObservable}
17661768
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
17671769
import rx.lang.scala.util._
@@ -1896,75 +1898,6 @@ object Observable {
18961898
def just[T](value: T): Observable[T] = {
18971899
Observable[T](JObservable.just(value))
18981900
}
1899-
1900-
/**
1901-
* This behaves like {@link #merge(java.util.List)} except that if any of the merged Observables
1902-
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
1903-
* refrain from propagating that error notification until all of the merged Observables have
1904-
* finished emitting items.
1905-
* <p>
1906-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeDelayError.png">
1907-
* <p>
1908-
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its
1909-
* Observers once.
1910-
* <p>
1911-
* This method allows an Observer to receive all successfully emitted items from all of the
1912-
* source Observables without being interrupted by an error notification from one of them.
1913-
*
1914-
* @param source
1915-
* a list of Observables
1916-
* @return an Observable that emits items that are the result of flattening the items emitted by
1917-
* the {@code source} list of Observables
1918-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
1919-
*/
1920-
// public static <T> Observable<T> mergeDelayError(List<? extends Observable<? extends T>> source)
1921-
// TODO decide if instance method mergeWithDelayError (?)
1922-
1923-
/**
1924-
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables
1925-
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
1926-
* refrain from propagating that error notification until all of the merged Observables have
1927-
* finished emitting items.
1928-
* <p>
1929-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeDelayError.png">
1930-
* <p>
1931-
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its
1932-
* Observers once.
1933-
* <p>
1934-
* This method allows an Observer to receive all successfully emitted items from all of the
1935-
* source Observables without being interrupted by an error notification from one of them.
1936-
*
1937-
* @param source
1938-
* an Observable that emits Observables
1939-
* @return an Observable that emits items that are the result of flattening the items emitted by
1940-
* the Observables emitted by the {@code source} Observable
1941-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
1942-
*/
1943-
// public static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source)
1944-
// TODO decide if instance method mergeWithDelayError (?)
1945-
1946-
/**
1947-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1948-
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
1949-
* refrain from propagating that error notification until all of the merged Observables have
1950-
* finished emitting items.
1951-
* <p>
1952-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeDelayError.png">
1953-
* <p>
1954-
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its
1955-
* Observers once.
1956-
* <p>
1957-
* This method allows an Observer to receive all successfully emitted items from all of the
1958-
* source Observables without being interrupted by an error notification from one of them.
1959-
*
1960-
* @param source
1961-
* a series of Observables
1962-
* @return an Observable that emits items that are the result of flattening the items emitted by
1963-
* the {@code source} Observables
1964-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
1965-
*/
1966-
// public static <T> Observable<T> mergeDelayError(Observable<? extends T>... source)
1967-
// TODO decide if instance method mergeWithDelayError (?)
19681901

19691902
/**
19701903
* Returns an Observable that never sends any items or notifications to an {@link Observer}.
@@ -1979,45 +1912,23 @@ object Observable {
19791912
Observable[Nothing](JObservable.never())
19801913
}
19811914

1982-
/*
1915+
// TODO also support Scala Futures, but think well before. Do we want to Future and Observable
1916+
// to share a common base interface?
1917+
1918+
// private because it's not RxScala's responsability to provide this alias
1919+
private type Future[+T] = java.util.concurrent.Future[_ <: T]
1920+
19831921
def apply[T](f: Future[T]): Observable[T] = {
1984-
??? // TODO convert Scala Future to Java Future
1922+
Observable[T](rx.Observable.from(f))
19851923
}
1986-
*/
19871924

1988-
/*
19891925
def apply[T](f: Future[T], scheduler: Scheduler): Observable[T] = {
1990-
??? // TODO convert Scala Future to Java Future
1926+
Observable[T](rx.Observable.from(f, scheduler))
19911927
}
1992-
*/
19931928

1994-
/*
19951929
def apply[T](f: Future[T], duration: Duration): Observable[T] = {
1996-
??? // TODO convert Scala Future to Java Future
1930+
Observable[T](rx.Observable.from(f, duration.length, duration.unit))
19971931
}
1998-
*/
1999-
2000-
/**
2001-
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
2002-
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
2003-
* <p>
2004-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
2005-
*
2006-
* @param o1
2007-
* The first source observable.
2008-
* @param o2
2009-
* The second source observable.
2010-
* @param combineFunction
2011-
* The aggregation function used to combine the source observable values.
2012-
* @return An Observable that combines the source Observables with the given combine function
2013-
*/
2014-
// public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction)
2015-
// TODO do we want this as an instance method?
2016-
// TODO then decide about combineLatest with > 2 Observables
2017-
2018-
// TODO what about these two?
2019-
// public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction)
2020-
// public static <R> Observable<R> zip(Collection<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
20211932

20221933
/**
20231934
* Given a Seq of N observables, returns an observable that emits Seqs of N elements each.
@@ -2088,7 +1999,7 @@ class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <:
20881999
// there is no foreach here, that's only available on BlockingObservable
20892000
}
20902001

2091-
class UnitTestSuite extends JUnitSuite {
2002+
class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
20922003
import scala.concurrent.duration._
20932004
import org.junit.{Before, Test, Ignore}
20942005
import org.junit.Assert._

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,16 @@ class CompletenessTest extends JUnitSuite {
6666
"averageFloats(Observable[Float])" -> averageProblem,
6767
"averageLongs(Observable[Long])" -> averageProblem,
6868
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
69+
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
6970
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
7071
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
7172
"empty()" -> "apply(T*)",
7273
"error(Throwable)" -> "apply(Throwable)",
7374
"from(Array[T])" -> "apply(T*)",
7475
"from(Iterable[_ <: T])" -> "apply(T*)",
76+
"from(Future[_ <: T])" -> "apply(Future[T])",
77+
"from(Future[_ <: T], Long, TimeUnit)" -> "apply(Future[T], Duration)",
78+
"from(Future[_ <: T], Scheduler)" -> "apply(Future[T], Scheduler)",
7579
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
7680
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
7781
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
@@ -109,7 +113,12 @@ class CompletenessTest extends JUnitSuite {
109113
).drop(2).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
110114
// mergeDelayError 3-9:
111115
"mergeDelayError(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).flattenDelayError instead]"
112-
).drop(2).toMap
116+
).drop(2).toMap ++ (3 to 9).map(i => {
117+
// combineLatest 3-9:
118+
val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("")
119+
val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("")
120+
("combineLatest(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", "[If C# doesn't need it, Scala doesn't need it either ;-)]")
121+
}).toMap
113122

114123
def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2")
115124

0 commit comments

Comments
 (0)