@@ -1430,6 +1430,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
1430
1430
new BlockingObservable [T ](asJava.toBlockingObservable())
1431
1431
}
1432
1432
1433
+ def withFilter (p : T => Boolean ): WithFilter [T ] = {
1434
+ new WithFilter [T ](p, asJava)
1435
+ }
1436
+
1433
1437
}
1434
1438
1435
1439
object Observable {
@@ -1796,18 +1800,25 @@ object Observable {
1796
1800
}
1797
1801
}
1798
1802
1799
- // Cannot yet have inner class because of this error message:
1800
- // implementation restriction: nested class is not allowed in value class.
1801
- // This restriction is planned to be removed in subsequent releases.
1802
- class WithFilter [+ T ] private [scala] (p : T => Boolean , wrapped : rx.Observable [_ <: T ]) {
1803
+ // Cannot yet have inner class because of this error message:
1804
+ // " implementation restriction: nested class is not allowed in value class.
1805
+ // This restriction is planned to be removed in subsequent releases."
1806
+ class WithFilter [+ T ] private [scala] (p : T => Boolean , asJava : rx.Observable [_ <: T ]) {
1803
1807
import rx .lang .scala .internal .ImplicitFunctionConversions ._
1804
1808
1805
- def map [B ](f : T => B ): Observable [B ] = new Observable [B ](wrapped.filter(p).map(f))
1809
+ def map [B ](f : T => B ): Observable [B ] = {
1810
+ Observable [B ](asJava.filter(p).map[B ](f))
1811
+ }
1812
+
1806
1813
def flatMap [B ](f : T => Observable [B ]): Observable [B ] = {
1807
- ??? // TODO
1814
+ Observable [ B ](asJava.filter(p).flatMap[ B ](( x : T ) => f(x).asJava))
1808
1815
}
1809
- def foreach (f : T => Unit ): Unit = wrapped.filter(p).toBlockingObservable.forEach(f)
1810
- def withFilter (p : T => Boolean ): Observable [T ] = new Observable [T ](wrapped.filter(p))
1816
+
1817
+ def withFilter (q : T => Boolean ): Observable [T ] = {
1818
+ Observable [T ](asJava.filter((x : T ) => p(x) && q(x)))
1819
+ }
1820
+
1821
+ // there is no foreach here, that's only available on BlockingObservable
1811
1822
}
1812
1823
1813
1824
class UnitTestSuite extends JUnitSuite {
0 commit comments