Skip to content

Commit de2d45e

Browse files
Merge pull request #1609 from zsxwing/rxscala-backpressure
RxScala: Add backpressure support
2 parents 63ae596 + 4f1af99 commit de2d45e

File tree

3 files changed

+126
-4
lines changed

3 files changed

+126
-4
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,48 @@ class RxScalaDemo extends JUnitSuite {
814814
println(s"RxScala appears ${count.toBlocking.single} times in http://rxscala.github.io/")
815815
}
816816

817+
@Test def createExampleWithBackpressure() {
818+
val o = Observable {
819+
subscriber: Subscriber[String] => {
820+
var emitted = 0
821+
subscriber.setProducer(n => {
822+
val intN = if (n >= 10) 10 else n.toInt
823+
(0 until intN)
824+
.takeWhile(_ => emitted < 10 && !subscriber.isUnsubscribed)
825+
.foreach {
826+
i =>
827+
emitted += 1
828+
subscriber.onNext(s"item ${emitted}")
829+
}
830+
if (emitted == 10 && !subscriber.isUnsubscribed) {
831+
subscriber.onCompleted()
832+
}
833+
})
834+
}
835+
}.subscribeOn(IOScheduler()) // Use `subscribeOn` to make sure `Producer` will run in the same Scheduler
836+
o.observeOn(ComputationScheduler()).subscribe(new Subscriber[String] {
837+
override def onStart() {
838+
println("Request a new one at the beginning")
839+
request(1)
840+
}
841+
842+
override def onNext(v: String) {
843+
println("Received " + v)
844+
println("Request a new one after receiving " + v)
845+
request(1)
846+
}
847+
848+
override def onError(e: Throwable) {
849+
e.printStackTrace()
850+
}
851+
852+
override def onCompleted() {
853+
println("Done")
854+
}
855+
})
856+
Thread.sleep(10000)
857+
}
858+
817859
def output(s: String): Unit = println(s)
818860

819861
/** Subscribes to obs and waits until obs has completed. Note that if you subscribe to
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala
17+
18+
import rx.{Producer => JProducer}
19+
20+
21+
trait Producer {
22+
23+
// Java calls XXX, Scala receives XXX
24+
private[scala] val asJavaProducer: JProducer = new JProducer {
25+
def request(n: Long): Unit = {
26+
Producer.this.request(n)
27+
}
28+
}
29+
30+
def request(n: Long): Unit = {}
31+
}
32+
33+
object Producer {
34+
/**
35+
* Scala calls XXX; Java receives XXX.
36+
*/
37+
private[scala] def apply[T](producer: JProducer): Producer = {
38+
new Producer {
39+
override val asJavaProducer = producer
40+
41+
override def request(n: Long): Unit = {
42+
producer.request(n)
43+
}
44+
}
45+
}
46+
47+
def apply[T](producer: Long => Unit): Producer = {
48+
// Java calls XXX; Scala receives XXX.
49+
Producer(new JProducer {
50+
override def request(n: Long): Unit = {
51+
producer(n)
52+
}
53+
})
54+
}
55+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ trait Subscriber[-T] extends Observer[T] with Subscription {
44

55
self =>
66

7-
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] {
8-
def onNext(value: T): Unit = self.onNext(value)
9-
def onError(error: Throwable): Unit = self.onError(error)
10-
def onCompleted(): Unit = self.onCompleted()
7+
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] with SubscriberAdapter[T] {
8+
override def onStart(): Unit = self.onStart()
9+
override def onNext(value: T): Unit = self.onNext(value)
10+
override def onError(error: Throwable): Unit = self.onError(error)
11+
override def onCompleted(): Unit = self.onCompleted()
12+
override def requestMore(n: Long): Unit = request(n)
1113
}
1214

1315
private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
@@ -37,6 +39,24 @@ trait Subscriber[-T] extends Observer[T] with Subscription {
3739
asJavaSubscriber.isUnsubscribed()
3840
}
3941

42+
def onStart(): Unit = {
43+
asJavaSubscriber.onStart()
44+
}
45+
46+
protected final def request(n: Long): Unit = {
47+
asJavaSubscriber match {
48+
case s: SubscriberAdapter[T] => s.requestMore(n)
49+
case _ => throw new rx.exceptions.MissingBackpressureException()
50+
}
51+
}
52+
53+
def setProducer(producer: Producer): Unit = {
54+
asJavaSubscriber.setProducer(producer.asJavaProducer)
55+
}
56+
57+
def setProducer(producer: Long => Unit): Unit = {
58+
asJavaSubscriber.setProducer(Producer(producer).asJavaProducer)
59+
}
4060
}
4161

4262
object Subscriber extends ObserverFactoryMethods[Subscriber] {
@@ -71,3 +91,8 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
7191
})
7292
}
7393
}
94+
95+
sealed trait SubscriberAdapter[T] extends rx.Subscriber[T] {
96+
// Add a method to expose the protected `request` method
97+
def requestMore(n: Long): Unit
98+
}

0 commit comments

Comments
 (0)