Skip to content

Commit b711f82

Browse files
committed
RxScala: Add backpressure support
1 parent 83cc1c1 commit b711f82

File tree

3 files changed

+129
-4
lines changed

3 files changed

+129
-4
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,51 @@ class RxScalaDemo extends JUnitSuite {
814814
println(s"RxScala appears ${count.toBlocking.single} times in http://rxscala.github.io/")
815815
}
816816

817+
@Test def createExampleWithBackpressure() {
818+
val o = Observable {
819+
subscriber: Subscriber[String] => {
820+
val worker = IOScheduler().createWorker
821+
var emitted = 0
822+
subscriber.setProducer(n => {
823+
worker.schedule {
824+
val intN = if (n >= 10) 10 else n.toInt
825+
(0 until intN)
826+
.takeWhile(_ => emitted < 10 && !subscriber.isUnsubscribed)
827+
.foreach {
828+
i =>
829+
emitted += 1
830+
subscriber.onNext(s"item ${emitted}")
831+
}
832+
if (emitted == 10 && !subscriber.isUnsubscribed) {
833+
subscriber.onCompleted()
834+
}
835+
}
836+
})
837+
}
838+
}
839+
o.observeOn(ComputationScheduler()).subscribe(new Subscriber[String] {
840+
override def onStart() {
841+
println("Request a new one at the beginning")
842+
request(1)
843+
}
844+
845+
override def onNext(v: String) {
846+
println("Received " + v)
847+
println("Request a new one after receiving " + v)
848+
request(1)
849+
}
850+
851+
override def onError(e: Throwable) {
852+
e.printStackTrace()
853+
}
854+
855+
override def onCompleted() {
856+
println("Done")
857+
}
858+
})
859+
Thread.sleep(10000)
860+
}
861+
817862
def output(s: String): Unit = println(s)
818863

819864
/** Subscribes to obs and waits until obs has completed. Note that if you subscribe to
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala
17+
18+
import rx.{Producer => JProducer}
19+
20+
21+
trait Producer {
22+
23+
// Java calls XXX, Scala receives XXX
24+
private[scala] val asJavaProducer: JProducer = new JProducer {
25+
def request(n: Long): Unit = {
26+
Producer.this.request(n)
27+
}
28+
}
29+
30+
def request(n: Long): Unit = {}
31+
}
32+
33+
object Producer {
34+
/**
35+
* Scala calls XXX; Java receives XXX.
36+
*/
37+
private[scala] def apply[T](producer: JProducer): Producer = {
38+
new Producer {
39+
override val asJavaProducer = producer
40+
41+
override def request(n: Long): Unit = {
42+
producer.request(n)
43+
}
44+
}
45+
}
46+
47+
def apply[T](producer: Long => Unit): Producer = {
48+
// Java calls XXX; Scala receives XXX.
49+
Producer(new JProducer {
50+
override def request(n: Long): Unit = {
51+
producer(n)
52+
}
53+
})
54+
}
55+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ trait Subscriber[-T] extends Observer[T] with Subscription {
44

55
self =>
66

7-
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] {
8-
def onNext(value: T): Unit = self.onNext(value)
9-
def onError(error: Throwable): Unit = self.onError(error)
10-
def onCompleted(): Unit = self.onCompleted()
7+
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] with SubscriberAdapter[T] {
8+
override def onStart(): Unit = self.onStart()
9+
override def onNext(value: T): Unit = self.onNext(value)
10+
override def onError(error: Throwable): Unit = self.onError(error)
11+
override def onCompleted(): Unit = self.onCompleted()
12+
override def requestMore(n: Long): Unit = request(n)
1113
}
1214

1315
private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
@@ -37,6 +39,24 @@ trait Subscriber[-T] extends Observer[T] with Subscription {
3739
asJavaSubscriber.isUnsubscribed()
3840
}
3941

42+
def onStart(): Unit = {
43+
asJavaSubscriber.onStart()
44+
}
45+
46+
protected final def request(n: Long): Unit = {
47+
asJavaSubscriber match {
48+
case s: SubscriberAdapter[T] => s.requestMore(n)
49+
case _ => throw new rx.exceptions.MissingBackpressureException()
50+
}
51+
}
52+
53+
def setProducer(producer: Producer): Unit = {
54+
asJavaSubscriber.setProducer(producer.asJavaProducer)
55+
}
56+
57+
def setProducer(producer: Long => Unit): Unit = {
58+
asJavaSubscriber.setProducer(Producer(producer).asJavaProducer)
59+
}
4060
}
4161

4262
object Subscriber extends ObserverFactoryMethods[Subscriber] {
@@ -71,3 +91,8 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
7191
})
7292
}
7393
}
94+
95+
sealed trait SubscriberAdapter[T] extends rx.Subscriber[T] {
96+
// Add a method to expose the protected `request` method
97+
def requestMore(n: Long): Unit
98+
}

0 commit comments

Comments
 (0)