File tree Expand file tree Collapse file tree 1 file changed +1
-4
lines changed
language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples Expand file tree Collapse file tree 1 file changed +1
-4
lines changed Original file line number Diff line number Diff line change @@ -817,10 +817,8 @@ class RxScalaDemo extends JUnitSuite {
817
817
@ Test def createExampleWithBackpressure () {
818
818
val o = Observable {
819
819
subscriber : Subscriber [String ] => {
820
- val worker = IOScheduler ().createWorker
821
820
var emitted = 0
822
821
subscriber.setProducer(n => {
823
- worker.schedule {
824
822
val intN = if (n >= 10 ) 10 else n.toInt
825
823
(0 until intN)
826
824
.takeWhile(_ => emitted < 10 && ! subscriber.isUnsubscribed)
@@ -832,10 +830,9 @@ class RxScalaDemo extends JUnitSuite {
832
830
if (emitted == 10 && ! subscriber.isUnsubscribed) {
833
831
subscriber.onCompleted()
834
832
}
835
- }
836
833
})
837
834
}
838
- }
835
+ }.subscribeOn( IOScheduler ()) // Use `subscribeOn` to make sure `Producer` will run in the same Scheduler
839
836
o.observeOn(ComputationScheduler ()).subscribe(new Subscriber [String ] {
840
837
override def onStart () {
841
838
println(" Request a new one at the beginning" )
You can’t perform that action at this time.
0 commit comments