1
+ /**
2
+ * Copyright 2013 Netflix, Inc.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+ package rx .lang .scala
17
+
18
+ object RxImplicits {
19
+ import java .{ lang => jlang }
20
+ import language .implicitConversions
21
+
22
+ import rx .{ Observable , Observer , Subscription }
23
+ import rx .Observable .OnSubscribeFunc
24
+ import rx .observables .BlockingObservable
25
+ import rx .util .functions ._
26
+
27
+ /**
28
+ * Converts 0-arg function to Rx Action0
29
+ */
30
+ implicit def scalaFunction0ProducingUnitToAction0 (f : (() => Unit )): Action0 =
31
+ new Action0 {
32
+ def call (): Unit = f()
33
+ }
34
+
35
+ /**
36
+ * Converts 1-arg function to Rx Action1
37
+ */
38
+ implicit def scalaFunction1ProducingUnitToAction1 [A ](f : (A => Unit )): Action1 [A ] =
39
+ new Action1 [A ] {
40
+ def call (a : A ): Unit = f(a)
41
+ }
42
+
43
+ /**
44
+ * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean]
45
+ */
46
+ implicit def scalaBooleanFunction1ToRxBooleanFunc1 [A ](f : (A => Boolean )): Func1 [A , jlang.Boolean ] =
47
+ new Func1 [A , jlang.Boolean ] {
48
+ def call (a : A ): jlang.Boolean = f(a).booleanValue
49
+ }
50
+
51
+ /**
52
+ * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2
53
+ */
54
+ implicit def convertTakeWhileFuncToRxFunc2 [A ](f : (A , Int ) => Boolean ): Func2 [A , jlang.Integer , jlang.Boolean ] =
55
+ new Func2 [A , jlang.Integer , jlang.Boolean ] {
56
+ def call (a : A , b : jlang.Integer ): jlang.Boolean = f(a, b).booleanValue
57
+ }
58
+
59
+ /**
60
+ * Converts a function shaped like compareTo into the equivalent Rx Func2
61
+ */
62
+ implicit def convertComparisonFuncToRxFunc2 [A ](f : (A , A ) => Int ): Func2 [A , A , jlang.Integer ] =
63
+ new Func2 [A , A , jlang.Integer ] {
64
+ def call (a1 : A , a2 : A ): jlang.Integer = f(a1, a2).intValue
65
+ }
66
+
67
+ /*
68
+ * This implicit allows Scala code to use any exception type and still work
69
+ * with invariant Func1 interface
70
+ */
71
+ implicit def exceptionFunction1ToRxExceptionFunc1 [A <: Exception , B ](f : (A => B )): Func1 [Exception , B ] =
72
+ new Func1 [Exception , B ] {
73
+ def call (ex : Exception ): B = f(ex.asInstanceOf [A ])
74
+ }
75
+
76
+ /**
77
+ * The following implicits convert functions of different arities into the Rx equivalents
78
+ */
79
+ implicit def scalaFunction0ToRxFunc0 [A ](f : () => A ): Func0 [A ] =
80
+ new Func0 [A ] {
81
+ def call (): A = f()
82
+ }
83
+
84
+ implicit def scalaFunction1ToRxFunc1 [A , B ](f : (A => B )): Func1 [A , B ] =
85
+ new Func1 [A , B ] {
86
+ def call (a : A ): B = f(a)
87
+ }
88
+
89
+ implicit def scalaFunction2ToRxFunc2 [A , B , C ](f : (A , B ) => C ): Func2 [A , B , C ] =
90
+ new Func2 [A , B , C ] {
91
+ def call (a : A , b : B ) = f(a, b)
92
+ }
93
+
94
+ implicit def scalaFunction3ToRxFunc3 [A , B , C , D ](f : (A , B , C ) => D ): Func3 [A , B , C , D ] =
95
+ new Func3 [A , B , C , D ] {
96
+ def call (a : A , b : B , c : C ) = f(a, b, c)
97
+ }
98
+
99
+ implicit def scalaFunction4ToRxFunc4 [A , B , C , D , E ](f : (A , B , C , D ) => E ): Func4 [A , B , C , D , E ] =
100
+ new Func4 [A , B , C , D , E ] {
101
+ def call (a : A , b : B , c : C , d : D ) = f(a, b, c, d)
102
+ }
103
+
104
+ implicit def onSubscribeFunc [A ](f : (Observer [_ >: A ]) => Subscription ): OnSubscribeFunc [A ] =
105
+ new OnSubscribeFunc [A ] {
106
+ override def onSubscribe (a : Observer [_ >: A ]) = f(a)
107
+ }
108
+
109
+ /**
110
+ * This implicit class implements all of the methods necessary for including Observables in a
111
+ * for-comprehension. Note that return type is always Observable, so that the ScalaObservable
112
+ * type never escapes the for-comprehension
113
+ */
114
+ implicit class ScalaObservable [A ](wrapped : Observable [A ]) {
115
+ def map [B ](f : A => B ): Observable [B ] = wrapped.map[B ](f)
116
+ def flatMap [B ](f : A => Observable [B ]): Observable [B ] = wrapped.mapMany(f)
117
+ def foreach (f : A => Unit ): Unit = wrapped.toBlockingObservable.forEach(f)
118
+ def withFilter (p : A => Boolean ): WithFilter = new WithFilter (p)
119
+
120
+ class WithFilter (p : A => Boolean ) {
121
+ def map [B ](f : A => B ): Observable [B ] = wrapped.filter(p).map(f)
122
+ def flatMap [B ](f : A => Observable [B ]): Observable [B ] = wrapped.filter(p).flatMap(f)
123
+ def foreach (f : A => Unit ): Unit = wrapped.filter(p).toBlockingObservable.forEach(f)
124
+ def withFilter (p : A => Boolean ): Observable [A ] = wrapped.filter(p)
125
+ }
126
+ }
127
+ }
0 commit comments