Skip to content

Commit f699cd4

Browse files
Made subjects safe
1 parent 5805deb commit f699cd4

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ object Observer {
6363
*/
6464
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
6565
new Observer[T]() {
66-
override def asJavaObserver: rx.Observer[_ >: T] = observer
66+
67+
override def asJavaObserver = observer
6768

6869
def onCompleted(): Unit = asJavaObserver.onCompleted()
6970
def onError(error: Throwable): Unit = asJavaObserver.onError(error)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,26 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import rx.joins.ObserverBase
19+
1820
/**
1921
* A Subject is an Observable and an Observer at the same time.
2022
*/
2123
trait Subject[-T, +R] extends Observable[R] with Observer[T] {
22-
val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
24+
val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
25+
2326
def asJavaObservable: rx.Observable[_ <: R] = asJavaSubject
24-
override def asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
27+
28+
// temporary hack to workaround bugs in rx Subjects
29+
override def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
30+
protected def onErrorCore(error: Throwable) = asJavaSubject.onError(error)
31+
protected def onCompletedCore() = asJavaSubject.onCompleted()
32+
protected def onNextCore(value: T) = asJavaSubject.onNext(value)
33+
}
2534

2635
def onCompleted(): Unit = asJavaObserver.onCompleted()
2736
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
2837
def onNext(value: T): Unit = asJavaObserver.onNext(value)
38+
2939
}
3040

0 commit comments

Comments
 (0)