@@ -32,7 +32,8 @@ import scala.concurrent.Future
32
32
*/
33
33
trait ObservableImplicits {
34
34
35
- implicit class BoxedPublisher [T ](publisher : Publisher [T ]) extends Observable [T ] {
35
+ implicit class BoxedPublisher [T ](pub : => Publisher [T ]) extends Observable [T ] {
36
+ val publisher = pub
36
37
private def sub (observer : Observer [_ >: T ]): Unit = publisher.subscribe(observer)
37
38
38
39
/**
@@ -44,7 +45,8 @@ trait ObservableImplicits {
44
45
override def subscribe (s : Subscriber [_ >: T ]): Unit = sub(BoxedSubscriber (s))
45
46
}
46
47
47
- implicit class BoxedSubscriber [T ](subscriber : Subscriber [_ >: T ]) extends Observer [T ] {
48
+ implicit class BoxedSubscriber [T ](sub : => Subscriber [_ >: T ]) extends Observer [T ] {
49
+ val subscriber = sub
48
50
49
51
override def onSubscribe (subscription : Subscription ): Unit = subscriber.onSubscribe(subscription)
50
52
@@ -55,7 +57,7 @@ trait ObservableImplicits {
55
57
override def onNext (result : T ): Unit = subscriber.onNext(result)
56
58
}
57
59
58
- implicit class BoxedSubscription (subscription : JSubscription ) extends Subscription {
60
+ implicit class BoxedSubscription (subscription : => JSubscription ) extends Subscription {
59
61
val cancelled = new AtomicBoolean (false )
60
62
override def request (n : Long ): Unit = subscription.request(n)
61
63
@@ -68,11 +70,13 @@ trait ObservableImplicits {
68
70
69
71
}
70
72
71
- implicit class ToObservableString (publisher : Publisher [java.lang.String ]) extends Observable [String ] {
73
+ implicit class ToObservableString (pub : => Publisher [java.lang.String ]) extends Observable [String ] {
74
+ val publisher = pub
72
75
override def subscribe (observer : Observer [_ >: String ]): Unit = publisher.toObservable().subscribe(observer)
73
76
}
74
77
75
- implicit class ToSingleObservablePublisher [T ](publisher : Publisher [T ]) extends SingleObservable [T ] {
78
+ implicit class ToSingleObservablePublisher [T ](pub : => Publisher [T ]) extends SingleObservable [T ] {
79
+ val publisher = pub
76
80
77
81
/**
78
82
* Converts the [[Observable ]] to a single result [[Observable ]].
@@ -125,27 +129,32 @@ trait ObservableImplicits {
125
129
}
126
130
}
127
131
128
- implicit class ToSingleObservableInt (publisher : Publisher [java.lang.Integer ]) extends SingleObservable [Int ] {
132
+ implicit class ToSingleObservableInt (pub : => Publisher [java.lang.Integer ]) extends SingleObservable [Int ] {
133
+ val publisher = pub
129
134
override def subscribe (observer : Observer [_ >: Int ]): Unit =
130
135
publisher.toObservable().map(_.intValue()).toSingle().subscribe(observer)
131
136
}
132
137
133
- implicit class ToSingleObservableLong (publisher : Publisher [java.lang.Long ]) extends SingleObservable [Long ] {
138
+ implicit class ToSingleObservableLong (pub : => Publisher [java.lang.Long ]) extends SingleObservable [Long ] {
139
+ val publisher = pub
134
140
override def subscribe (observer : Observer [_ >: Long ]): Unit =
135
141
publisher.toObservable().map(_.longValue()).toSingle().subscribe(observer)
136
142
}
137
143
138
- implicit class ToSingleObservableObjectId (publisher : Publisher [org.bson.types.ObjectId ])
144
+ implicit class ToSingleObservableObjectId (pub : => Publisher [org.bson.types.ObjectId ])
139
145
extends SingleObservable [ObjectId ] {
146
+ val publisher = pub
140
147
override def subscribe (observer : Observer [_ >: ObjectId ]): Unit = publisher.toSingle().subscribe(observer)
141
148
}
142
149
143
- implicit class ToSingleObservableGridFS (publisher : Publisher [com.mongodb.client.gridfs.model.GridFSFile ])
150
+ implicit class ToSingleObservableGridFS (pub : => Publisher [com.mongodb.client.gridfs.model.GridFSFile ])
144
151
extends SingleObservable [GridFSFile ] {
152
+ val publisher = pub
145
153
override def subscribe (observer : Observer [_ >: GridFSFile ]): Unit = publisher.toSingle().subscribe(observer)
146
154
}
147
155
148
- implicit class ToSingleObservableVoid (publisher : Publisher [Void ]) extends SingleObservable [Void ] {
156
+ implicit class ToSingleObservableVoid (pub : => Publisher [Void ]) extends SingleObservable [Void ] {
157
+ val publisher = pub
149
158
override def subscribe (observer : Observer [_ >: Void ]): Unit =
150
159
publisher
151
160
.toSingle()
@@ -161,7 +170,8 @@ trait ObservableImplicits {
161
170
})
162
171
}
163
172
164
- implicit class ObservableFuture [T ](observable : Observable [T ]) {
173
+ implicit class ObservableFuture [T ](obs : => Observable [T ]) {
174
+ val observable = obs
165
175
166
176
/**
167
177
* Collects the [[Observable ]] results and converts to a [[scala.concurrent.Future ]].
@@ -176,7 +186,8 @@ trait ObservableImplicits {
176
186
177
187
}
178
188
179
- implicit class SingleObservableFuture [T ](observable : SingleObservable [T ]) {
189
+ implicit class SingleObservableFuture [T ](obs : => SingleObservable [T ]) {
190
+ val observable = obs
180
191
181
192
/**
182
193
* Collects the [[Observable ]] results and converts to a [[scala.concurrent.Future ]].
0 commit comments