@@ -19,7 +19,8 @@ object RxImplicits {
1919 import java .{ lang => jlang }
2020 import language .implicitConversions
2121
22- import rx .Observable
22+ import rx .{ Observable , Observer , Subscription }
23+ import rx .Observable .OnSubscribeFunc
2324 import rx .observables .BlockingObservable
2425 import rx .util .functions ._
2526
@@ -56,7 +57,7 @@ object RxImplicits {
5657 }
5758
5859 /**
59- * Converts a function shaped ilke compareTo into the equivalent Rx Func2
60+ * Converts a function shaped like compareTo into the equivalent Rx Func2
6061 */
6162 implicit def convertComparisonFuncToRxFunc2 [A ](f : (A , A ) => Int ): Func2 [A , A , jlang.Integer ] =
6263 new Func2 [A , A , jlang.Integer ] {
@@ -100,13 +101,18 @@ object RxImplicits {
100101 def call (a : A , b : B , c : C , d : D ) = f(a, b, c, d)
101102 }
102103
104+ implicit def onSubscribeFunc [A ](f : (Observer [_ >: A ]) => Subscription ): OnSubscribeFunc [A ] =
105+ new OnSubscribeFunc [A ] {
106+ override def onSubscribe (a : Observer [_ >: A ]) = f(a)
107+ }
108+
103109 /**
104110 * This implicit class implements all of the methods necessary for including Observables in a
105111 * for-comprehension. Note that return type is always Observable, so that the ScalaObservable
106112 * type never escapes the for-comprehension
107113 */
108114 implicit class ScalaObservable [A ](wrapped : Observable [A ]) {
109- def map [B ](f : A => B ): Observable [B ] = wrapped.map(f)
115+ def map [B ](f : A => B ): Observable [B ] = wrapped.map[ B ] (f)
110116 def flatMap [B ](f : A => Observable [B ]): Observable [B ] = wrapped.mapMany(f)
111117 def foreach (f : A => Unit ): Unit = wrapped.toBlockingObservable.forEach(f)
112118 def withFilter (p : A => Boolean ): WithFilter = new WithFilter (p)
@@ -131,7 +137,9 @@ class UnitTestSuite extends JUnitSuite {
131137 import org .mockito .Mockito ._
132138 import org .mockito .{ MockitoAnnotations , Mock }
133139 import rx .{ Notification , Observer , Observable , Subscription }
140+ import rx .Observable .OnSubscribeFunc
134141 import rx .observables .GroupedObservable
142+ import rx .subscriptions .Subscriptions
135143 import collection .mutable .ArrayBuffer
136144 import collection .JavaConverters ._
137145
@@ -147,7 +155,7 @@ class UnitTestSuite extends JUnitSuite {
147155 class ObservableWithException (s : Subscription , values : String * ) extends Observable [String ] {
148156 var t : Thread = null
149157
150- override def subscribe (observer : Observer [String ]): Subscription = {
158+ override def subscribe (observer : Observer [_ >: String ]): Subscription = {
151159 println(" ObservableWithException subscribed to ..." )
152160 t = new Thread (new Runnable () {
153161 override def run () {
@@ -175,7 +183,6 @@ class UnitTestSuite extends JUnitSuite {
175183 }
176184
177185 // tests of static methods
178-
179186 @ Test def testSingle {
180187 assertEquals(1 , Observable .from(1 ).toBlockingObservable.single)
181188 }
@@ -208,6 +215,11 @@ class UnitTestSuite extends JUnitSuite {
208215 case ex : Throwable => fail(" Caught unexpected exception " + ex.getCause + " , expected IllegalStateException" )
209216 }
210217 }
218+
219+ @ Test def testCreateFromOnSubscribeFunc {
220+ val created = Observable .create((o : Observer [_ >: Integer ]) => Subscriptions .empty)
221+ // no assertions on subscription, just testing the implicit
222+ }
211223
212224 @ Test def testFromJavaInterop {
213225 val observable = Observable .from(List (1 , 2 , 3 ).asJava)
@@ -248,7 +260,7 @@ class UnitTestSuite extends JUnitSuite {
248260
249261 @ Test def testFlattenMerge {
250262 val observable = Observable .from(Observable .from(1 , 2 , 3 ))
251- val merged = Observable .merge(observable)
263+ val merged = Observable .merge[ Int ] (observable)
252264 assertSubscribeReceives(merged)(1 , 2 , 3 )
253265 }
254266
@@ -272,6 +284,18 @@ class UnitTestSuite extends JUnitSuite {
272284 assertSubscribeReceives(synchronized )(1 , 2 , 3 )
273285 }
274286
287+ @ Test def testZip2 () {
288+ val colors : Observable [String ] = Observable .from(" red" , " green" , " blue" )
289+ val names : Observable [String ] = Observable .from(" lion-o" , " cheetara" , " panthro" )
290+
291+ case class Character (color : String , name : String )
292+
293+ val cheetara = Character (" green" , " cheetara" )
294+ val panthro = Character (" blue" , " panthro" )
295+ val characters = Observable .zip[String , String , Character ](colors, names, Character .apply _)
296+ assertSubscribeReceives(characters)(cheetara, panthro)
297+ }
298+
275299 @ Test def testZip3 () {
276300 val numbers = Observable .from(1 , 2 , 3 )
277301 val colors = Observable .from(" red" , " green" , " blue" )
@@ -283,7 +307,7 @@ class UnitTestSuite extends JUnitSuite {
283307 val cheetara = Character (2 , " green" , " cheetara" )
284308 val panthro = Character (3 , " blue" , " panthro" )
285309
286- val characters = Observable .zip(numbers, colors, names, Character .apply _)
310+ val characters = Observable .zip[ Int , String , String , Character ] (numbers, colors, names, Character .apply _)
287311 assertSubscribeReceives(characters)(liono, cheetara, panthro)
288312 }
289313
@@ -299,7 +323,7 @@ class UnitTestSuite extends JUnitSuite {
299323 val cheetara = Character (2 , " green" , " cheetara" , false )
300324 val panthro = Character (3 , " blue" , " panthro" , false )
301325
302- val characters = Observable .zip(numbers, colors, names, isLeader, Character .apply _)
326+ val characters = Observable .zip[ Int , String , String , Boolean , Character ] (numbers, colors, names, isLeader, Character .apply _)
303327 assertSubscribeReceives(characters)(liono, cheetara, panthro)
304328 }
305329
@@ -338,7 +362,8 @@ class UnitTestSuite extends JUnitSuite {
338362 @ Test def testMap {
339363 val numbers = Observable .from(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 )
340364 val mappedNumbers = ArrayBuffer .empty[Int ]
341- numbers.map((x : Int ) => x * x).subscribe((squareVal : Int ) => {
365+ val mapped : Observable [Int ] = numbers map ((x : Int ) => x * x)
366+ mapped.subscribe((squareVal : Int ) => {
342367 mappedNumbers.append(squareVal)
343368 })
344369 assertEquals(List (1 , 4 , 9 , 16 , 25 , 36 , 49 , 64 , 81 ), mappedNumbers.toList)
@@ -458,18 +483,9 @@ class UnitTestSuite extends JUnitSuite {
458483 assertSubscribeReceives(skipped)(3 , 4 )
459484 }
460485
461- /**
462- * Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
463- * observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
464- * it should produce onNext(first), onNext(second), and 1 onCompleted
465- *
466- * Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
467- */
468486 @ Test def testTake {
469- import rx .operators ._
470-
471487 val observable = Observable .from(1 , 2 , 3 , 4 , 5 )
472- val took = Observable .create( OperationTake . take(observable, 2 ) )
488+ val took = observable. take(2 )
473489 assertSubscribeReceives(took)(1 , 2 )
474490 }
475491
@@ -479,11 +495,11 @@ class UnitTestSuite extends JUnitSuite {
479495 assertSubscribeReceives(took)(1 , 3 , 5 )
480496 }
481497
482- /* @Test def testTakeWhileWithIndex {
483- val observable = Observable.from(1, 3, 5, 6 , 7, 9, 11, 12, 13, 15, 17)
484- val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4 )
485- assertSubscribeReceives(took)(9, 11)
486- } */
498+ @ Test def testTakeWhileWithIndex {
499+ val observable = Observable .from(1 , 3 , 5 , 7 , 9 , 11 , 12 , 13 , 15 , 17 )
500+ val took = observable.takeWhileWithIndex((i : Int , idx : Int ) => isOdd(i) && idx < 8 )
501+ assertSubscribeReceives(took)(1 , 3 , 5 , 7 , 9 , 11 )
502+ }
487503
488504 @ Test def testTakeLast {
489505 val observable = Observable .from(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 )
@@ -559,7 +575,7 @@ class UnitTestSuite extends JUnitSuite {
559575
560576 @ Test def testFilterInForComprehension {
561577 val doubler = (i : Int ) => Observable .from(i, i)
562- val filteredObservable = for {
578+ val filteredObservable : Observable [ Int ] = for {
563579 i : Int <- Observable .from(1 , 2 , 3 , 4 )
564580 j : Int <- doubler(i) if isOdd(i)
565581 } yield j
0 commit comments