Skip to content

Commit a20bed8

Browse files
committed
Fix the bug that the inner Subscriber can not be unsubscribed and elimilate scaladoc warnings
1 parent 8cf821a commit a20bed8

File tree

4 files changed

+76
-13
lines changed

4 files changed

+76
-13
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ object Notification {
137137
* @param notification
138138
* The [[rx.lang.scala.Notification]] to be deconstructed
139139
* @return
140-
* The [[java.lang.Throwable]] value contained in this notification.
140+
* The `java.lang.Throwable` value contained in this notification.
141141
*/
142142
def unapply[U](notification: Notification[U]): Option[Throwable] = notification match {
143143
case onError: OnError[U] => Some(onError.error)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,28 @@ import collection.JavaConversions._
4848
* the observer
4949
* @define subscribeObserverParamScheduler
5050
* the [[rx.lang.scala.Scheduler]] on which Observers subscribe to the Observable
51+
*
52+
* @define subscribeSubscriberMain
53+
* Call this method to subscribe an [[Subscriber]] for receiving items and notifications from the [[Observable]].
54+
*
55+
* A typical implementation of `subscribe` does the following:
56+
*
57+
* It stores a reference to the Observer in a collection object, such as a `List[T]` object.
58+
*
59+
* It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables [[Subscriber]]s to
60+
* unsubscribe, that is, to stop receiving items and notifications before the Observable stops
61+
* sending them, which also invokes the Subscriber's [[rx.lang.scala.Observer.onCompleted onCompleted]] method.
62+
*
63+
* An [[Observable]] instance is responsible for accepting all subscriptions
64+
* and notifying all [[Subscriber]]s. Unless the documentation for a particular
65+
* [[Observable]] implementation indicates otherwise, [[Subscriber]]s should make no
66+
* assumptions about the order in which multiple [[Subscriber]]s will receive their notifications.
67+
*
68+
* @define subscribeSubscriberParamObserver
69+
* the [[Subscriber]]
70+
* @define subscribeSubscriberParamScheduler
71+
* the [[rx.lang.scala.Scheduler]] on which [[Subscriber]]s subscribe to the Observable
72+
*
5173
* @define subscribeAllReturn
5274
* a [[rx.lang.scala.Subscription]] reference whose `unsubscribe` method can be called to stop receiving items
5375
* before the Observable has finished sending them
@@ -125,6 +147,39 @@ trait Observable[+T]
125147
*/
126148
def apply(observer: Observer[T]): Subscription = subscribe(observer)
127149

150+
/**
151+
* $subscribeSubscriberMain
152+
*
153+
* @param subscriber $subscribeSubscriberParamObserver
154+
* @param scheduler $subscribeSubscriberParamScheduler
155+
* @return $subscribeAllReturn
156+
*/
157+
def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = {
158+
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
159+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
160+
thisJava.subscribe(subscriber.asJavaSubscriber, scheduler)
161+
}
162+
163+
/**
164+
* $subscribeSubscriberMain
165+
*
166+
* @param subscriber $subscribeSubscriberParamObserver
167+
* @return $subscribeAllReturn
168+
*/
169+
def subscribe(subscriber: Subscriber[T]): Subscription = {
170+
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
171+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
172+
thisJava.subscribe(subscriber.asJavaSubscriber)
173+
}
174+
175+
/**
176+
* $subscribeSubscriberMain
177+
*
178+
* @param subscriber $subscribeSubscriberParamObserver
179+
* @return $subscribeAllReturn
180+
*/
181+
def apply(subscriber: Subscriber[T]): Subscription = subscribe(subscriber)
182+
128183
/**
129184
* $subscribeCallbacksMainNoNotifications
130185
*
@@ -2405,8 +2460,7 @@ trait Observable[+T]
24052460

24062461
/**
24072462
* Perform work in parallel by sharding an `Observable[T]` on a
2408-
* [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation computation]]
2409-
* [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output.
2463+
* [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output.
24102464
*
24112465
* @param f
24122466
* a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]`
@@ -2636,12 +2690,10 @@ trait Observable[+T]
26362690
* those emitted by the source Observable
26372691
* @throws IndexOutOfBoundsException
26382692
* if index is greater than or equal to the number of items emitted by the source
2639-
* Observable
2640-
* @throws IndexOutOfBoundsException
2641-
* if index is less than 0
2693+
* Observable, or index is less than 0
26422694
* @see `Observable.elementAt`
2643-
* @deprecated("Use `elementAt`", "0.18.0")
26442695
*/
2696+
@deprecated("Use `elementAt`", "0.18.0")
26452697
def apply(index: Int): Observable[T] = elementAt(index)
26462698

26472699
/**
@@ -2656,9 +2708,7 @@ trait Observable[+T]
26562708
* those emitted by the source Observable
26572709
* @throws IndexOutOfBoundsException
26582710
* if index is greater than or equal to the number of items emitted by the source
2659-
* Observable
2660-
* @throws IndexOutOfBoundsException
2661-
* if index is less than 0
2711+
* Observable, or index is less than 0
26622712
*/
26632713
def elementAt(index: Int): Observable[T] = {
26642714
toScalaObservable[T](asJavaObservable.elementAt(index))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.
5050

5151
/**
5252
* Adds a subscription to the group,
53-
* or unsubscribes immediately is the [[rx.subscriptions.CompositeSubscription]] is unsubscribed.
53+
* or unsubscribes immediately is the [[rx.lang.scala.subscriptions.CompositeSubscription]] is unsubscribed.
5454
* @param subscription the subscription to be added.
55-
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
55+
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
5656
*/
5757
def +=(subscription: Subscription): this.type = {
5858
asJavaSubscription.add(subscription.asJavaSubscription)
@@ -62,7 +62,7 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.
6262
/**
6363
* Removes and unsubscribes a subscription to the group,
6464
* @param subscription the subscription be removed.
65-
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
65+
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
6666
*/
6767
def -=(subscription: Subscription): this.type = {
6868
asJavaSubscription.remove(subscription.asJavaSubscription)

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package rx.lang.scala
1717

1818
import org.junit.Test
1919
import org.junit.Assert.assertNotNull
20+
import org.junit.Assert.assertTrue
2021
import org.scalatest.junit.JUnitSuite
2122

2223
class SubscriberTests extends JUnitSuite {
@@ -29,4 +30,16 @@ class SubscriberTests extends JUnitSuite {
2930
assertNotNull(subscriber.asJavaSubscriber)
3031
}
3132

33+
@Test def testUnsubscribeForSubscriber() {
34+
var innerSubscriber: Subscriber[Int] = null
35+
val o = Observable[Int](subscriber => {
36+
Observable[Int](subscriber => {
37+
innerSubscriber = subscriber
38+
}).subscribe(subscriber)
39+
})
40+
o.subscribe().unsubscribe()
41+
// If we unsubscribe outside, the inner Subscriber should also be unsubscribed
42+
assertTrue(innerSubscriber.isUnsubscribed)
43+
}
44+
3245
}

0 commit comments

Comments
 (0)