@@ -19,7 +19,8 @@ object RxImplicits {
1919 import java .{ lang => jlang }
2020 import language .implicitConversions
2121
22- import rx .Observable
22+ import rx .{ Observable , Observer , Subscription }
23+ import rx .Observable .OnSubscribeFunc
2324 import rx .observables .BlockingObservable
2425 import rx .util .functions ._
2526
@@ -56,7 +57,7 @@ object RxImplicits {
5657 }
5758
5859 /**
59- * Converts a function shaped ilke compareTo into the equivalent Rx Func2
60+ * Converts a function shaped like compareTo into the equivalent Rx Func2
6061 */
6162 implicit def convertComparisonFuncToRxFunc2 [A ](f : (A , A ) => Int ): Func2 [A , A , jlang.Integer ] =
6263 new Func2 [A , A , jlang.Integer ] {
@@ -100,6 +101,11 @@ object RxImplicits {
100101 def call (a : A , b : B , c : C , d : D ) = f(a, b, c, d)
101102 }
102103
104+ implicit def onSubscribeFunc [A ](f : (Observer [_ >: A ]) => Subscription ): OnSubscribeFunc [A ] =
105+ new OnSubscribeFunc [A ] {
106+ override def call (a : Observer [_ >: A ]) = f(a)
107+ }
108+
103109 /**
104110 * This implicit class implements all of the methods necessary for including Observables in a
105111 * for-comprehension. Note that return type is always Observable, so that the ScalaObservable
@@ -131,7 +137,9 @@ class UnitTestSuite extends JUnitSuite {
131137 import org .mockito .Mockito ._
132138 import org .mockito .{ MockitoAnnotations , Mock }
133139 import rx .{ Notification , Observer , Observable , Subscription }
140+ import rx .Observable .OnSubscribeFunc
134141 import rx .observables .GroupedObservable
142+ import rx .subscriptions .Subscriptions
135143 import collection .mutable .ArrayBuffer
136144 import collection .JavaConverters ._
137145
@@ -175,7 +183,6 @@ class UnitTestSuite extends JUnitSuite {
175183 }
176184
177185 // tests of static methods
178-
179186 @ Test def testSingle {
180187 assertEquals(1 , Observable .from(1 ).toBlockingObservable.single)
181188 }
@@ -208,6 +215,11 @@ class UnitTestSuite extends JUnitSuite {
208215 case ex : Throwable => fail(" Caught unexpected exception " + ex.getCause + " , expected IllegalStateException" )
209216 }
210217 }
218+
219+ @ Test def testCreateFromOnSubscribeFunc {
220+ val created = Observable .create((o : Observer [_ >: Integer ]) => Subscriptions .empty)
221+ // no assertions on subscription, just testing the implicit
222+ }
211223
212224 @ Test def testFromJavaInterop {
213225 val observable = Observable .from(List (1 , 2 , 3 ).asJava)
0 commit comments