Skip to content

Commit 48275f0

Browse files
committed
Add unsubscribeOn to RxScala
1 parent 5eea817 commit 48275f0

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,4 +1048,16 @@ class RxScalaDemo extends JUnitSuite {
10481048
val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice
10491049
o.subscribe(n => println(s"subscriber gets $n"))
10501050
}
1051+
1052+
@Test def unsubscribeOnExample() {
1053+
val o = Observable[String] {
1054+
subscriber =>
1055+
subscriber.add(Subscription {
1056+
println("unsubscribe on " + Thread.currentThread().getName())
1057+
})
1058+
subscriber.onNext("RxScala")
1059+
subscriber.onCompleted()
1060+
}
1061+
o.unsubscribeOn(NewThreadScheduler()).subscribe(println(_))
1062+
}
10511063
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,17 @@ trait Observable[+T]
949949
toScalaObservable[T](asJavaObservable.subscribeOn(scheduler))
950950
}
951951

952+
/**
953+
* Asynchronously unsubscribes on the specified [[Scheduler]].
954+
*
955+
* @param scheduler the [[Scheduler]] to perform subscription and unsubscription actions on
956+
* @return the source Observable modified so that its unsubscriptions happen on the specified [[Scheduler]]
957+
* @since 0.17
958+
*/
959+
def unsubscribeOn(scheduler: Scheduler): Observable[T] = {
960+
toScalaObservable[T](asJavaObservable.unsubscribeOn(scheduler))
961+
}
962+
952963
/**
953964
* Asynchronously notify [[rx.lang.scala.Observer]]s on the specified [[rx.lang.scala.Scheduler]].
954965
*

0 commit comments

Comments
 (0)