Skip to content

Commit f2d3c4c

Browse files
committed
Fix that Subscriber.onStart isn't called
1 parent 5ee6073 commit f2d3c4c

File tree

3 files changed

+46
-2
lines changed

3 files changed

+46
-2
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,6 @@ class RxScalaDemo extends JUnitSuite {
539539
}
540540

541541
@Test def testSingleOption() {
542-
assertEquals(None, List(1, 2).toObservable.toBlocking.singleOption)
543542
assertEquals(Some(1), List(1).toObservable.toBlocking.singleOption)
544543
assertEquals(None, List().toObservable.toBlocking.singleOption)
545544
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ trait Subscriber[-T] extends Observer[T] with Subscription {
55
self =>
66

77
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] with SubscriberAdapter[T] {
8+
override def onStart(): Unit = self.onStart()
89
override def onNext(value: T): Unit = self.onNext(value)
910
override def onError(error: Throwable): Unit = self.onError(error)
1011
override def onCompleted(): Unit = self.onCompleted()
@@ -39,7 +40,7 @@ trait Subscriber[-T] extends Observer[T] with Subscription {
3940
}
4041

4142
def onStart(): Unit = {
42-
asJavaSubscriber.onStart()
43+
// do nothing by default
4344
}
4445

4546
protected final def request(n: Long): Unit = {
@@ -65,6 +66,7 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
6566
override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
6667
override val asJavaSubscription: rx.Subscription = asJavaSubscriber
6768

69+
override def onStart(): Unit = asJavaSubscriber.onStart()
6870
override def onNext(value: T): Unit = asJavaSubscriber.onNext(value)
6971
override def onError(error: Throwable): Unit = asJavaSubscriber.onError(error)
7072
override def onCompleted(): Unit = asJavaSubscriber.onCompleted()

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,47 @@ class SubscriberTests extends JUnitSuite {
8282
assertFalse("Subscriber did not call onError", didError)
8383
assertEquals(1, onNextValue)
8484
}
85+
86+
@Test def testOnStart(): Unit = {
87+
var called = false
88+
Observable.just(1).subscribe(new Subscriber[Int] {
89+
override def onStart(): Unit = {
90+
called = true
91+
}
92+
93+
override def onCompleted(): Unit = {
94+
}
95+
96+
override def onError(e: Throwable): Unit = {
97+
}
98+
99+
override def onNext(v: Int): Unit = {
100+
}
101+
})
102+
assertTrue("Subscriber.onStart should be called", called)
103+
}
104+
105+
@Test def testOnStart2(): Unit = {
106+
val items = scala.collection.mutable.ListBuffer[Int]()
107+
var calledOnCompleted = false
108+
Observable.just(1, 2, 3).subscribe(new Subscriber[Int] {
109+
override def onStart(): Unit = {
110+
request(1)
111+
}
112+
113+
override def onCompleted(): Unit = {
114+
calledOnCompleted = true
115+
}
116+
117+
override def onError(e: Throwable): Unit = {
118+
}
119+
120+
override def onNext(v: Int): Unit = {
121+
items += v
122+
request(1)
123+
}
124+
})
125+
assertEquals(List(1, 2, 3), items)
126+
assertTrue("Subscriber.onCompleted should be called", calledOnCompleted)
127+
}
85128
}

0 commit comments

Comments
 (0)