Skip to content

Commit 5805deb

Browse files
Double negation Observer.
1 parent 25ce82f commit 5805deb

File tree

3 files changed

+32
-67
lines changed

3 files changed

+32
-67
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import rx.Observable.OnSubscribeFunc
2222
/**
2323
* The Observable interface that implements the Reactive Pattern.
2424
*
25-
* @param asJavaObservable the underlying Java observable
26-
*
2725
* @define subscribeObserverMain
2826
* Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving
2927
* items and notifications from the Observable.
@@ -1855,7 +1853,7 @@ object Observable {
18551853
/**
18561854
* Creates a new Scala Observable from a given Java Observable.
18571855
*/
1858-
def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
1856+
private [scala] def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
18591857
new Observable[T]{
18601858
def asJavaObservable = observable
18611859
}
@@ -1878,13 +1876,13 @@ object Observable {
18781876
*
18791877
*
18801878
* @tparam T
1881-
* the type of the items that this Observable emits
1879+
* the type of the items that this Observable emits.
18821880
* @param func
18831881
* a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods
18841882
* as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to
1885-
* canceling the subscription
1886-
* @return an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given
1887-
* function
1883+
* canceling the subscription.
1884+
* @return
1885+
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
18881886
*/
18891887
def apply[T](func: Observer[T] => Subscription): Observable[T] = {
18901888
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
@@ -1903,15 +1901,17 @@ object Observable {
19031901
}
19041902

19051903
/**
1906-
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
1904+
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
1905+
* method when the Observer subscribes to it.
19071906
*
19081907
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/error.png">
19091908
*
19101909
* @param exception
19111910
* the particular error to report
19121911
* @tparam T
19131912
* the type of the items (ostensibly) emitted by the Observable
1914-
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
1913+
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
1914+
* method when the Observer subscribes to it
19151915
*/
19161916
def apply[T](exception: Throwable): Observable[T] = {
19171917
Observable[T](rx.Observable.error(exception))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 18 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ import rx.joins.ObserverBase
2626
*/
2727
trait Observer[-T] {
2828

29-
def asJavaObserver: rx.Observer[_ >: T]
29+
private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
30+
protected def onCompletedCore(): Unit = onCompleted()
31+
protected def onErrorCore(error: Throwable): Unit = onError(error)
32+
protected def onNextCore(value: T): Unit = onNext(value)
33+
}
3034

3135
/**
3236
* Provides the Observer with new data.
@@ -35,78 +39,35 @@ trait Observer[-T] {
3539
*
3640
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
3741
*/
38-
def onNext(value: T): Unit = asJavaObserver.onNext(value)
42+
def onNext(value: T): Unit
3943

4044
/**
4145
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
4246
*
4347
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
4448
*/
45-
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
49+
def onError(error: Throwable): Unit
4650

4751
/**
4852
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
4953
*
5054
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
5155
*/
52-
def onCompleted(): Unit = asJavaObserver.onCompleted()
56+
def onCompleted(): Unit
5357

5458
}
5559

5660
object Observer {
57-
58-
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
59-
new Observer[T]() {
60-
def asJavaObserver: rx.Observer[_ >: T] = observer
61-
}
62-
}
63-
6461
/**
65-
* Creates an [[rx.lang.scala.Observer]]
66-
* @param onNext the onNext action
67-
* @param onError the onError action
68-
* @param onCompleted the onCompleted action
62+
* Assume that the underlying rx.Observer does not need to be wrapped
6963
*/
70-
def apply[T](onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observer[T] = {
71-
Observer(new ObserverBase[T] () {
72-
protected def onCompletedCore(): Unit = onCompleted()
73-
protected def onErrorCore(error: Throwable): Unit = onError(error)
74-
protected def onNextCore(value: T): Unit = onNext(value)
75-
})
76-
}
77-
78-
/**
79-
* Creates an [[rx.lang.scala.Observer]]
80-
* @param onNext the onNext action
81-
* @param onError the onError action
82-
*/
83-
def apply[T](onNext: T => Unit, onError: Throwable => Unit): Observer[T] = {
84-
Observer(new ObserverBase[T] () {
85-
protected def onCompletedCore(): Unit = {}
86-
protected def onErrorCore(error: Throwable): Unit = onError(error)
87-
protected def onNextCore(value: T): Unit = onNext(value)
88-
})
89-
}
90-
91-
def apply[T](onNext: T => Unit, onCompleted: () => Unit): Observer[T] = {
92-
Observer(new ObserverBase[T] () {
93-
protected def onCompletedCore(): Unit = onCompleted()
94-
protected def onErrorCore(error: Throwable): Unit = {}
95-
protected def onNextCore(value: T): Unit = onNext(value)
96-
})
97-
}
98-
99-
/**
100-
* Creates an [[rx.lang.scala.Observer]]
101-
* @param onNext the onNext action
102-
*/
103-
def apply[T](onNext: T => Unit): Observer[T] = {
104-
Observer(new ObserverBase[T] () {
105-
protected def onCompletedCore(): Unit = {}
106-
protected def onErrorCore(error: Throwable): Unit = {}
107-
protected def onNextCore(value: T): Unit = onNext(value)
108-
})
109-
}
110-
}
111-
64+
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
65+
new Observer[T]() {
66+
override def asJavaObserver: rx.Observer[_ >: T] = observer
11267

68+
def onCompleted(): Unit = asJavaObserver.onCompleted()
69+
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
70+
def onNext(value: T): Unit = asJavaObserver.onNext(value)
71+
}
72+
}
73+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ package rx.lang.scala
2121
trait Subject[-T, +R] extends Observable[R] with Observer[T] {
2222
val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
2323
def asJavaObservable: rx.Observable[_ <: R] = asJavaSubject
24-
def asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
24+
override def asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
25+
26+
def onCompleted(): Unit = asJavaObserver.onCompleted()
27+
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
28+
def onNext(value: T): Unit = asJavaObserver.onNext(value)
2529
}
2630

0 commit comments

Comments
 (0)