Skip to content

Commit f92eb0e

Browse files
add Subscriber and Observable creation from Subscriber[T] => Unit
1 parent ed983ee commit f92eb0e

File tree

6 files changed

+153
-24
lines changed

6 files changed

+153
-24
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,15 +463,43 @@ class RxScalaDemo extends JUnitSuite {
463463
List("a", "b").toObservable.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
464464
}
465465

466-
@Test def createExample() {
466+
def calculateElement(index: Int): String = {
467+
println("omg I'm calculating so hard")
468+
index match {
469+
case 0 => "a"
470+
case 1 => "b"
471+
case _ => throw new IllegalArgumentException
472+
}
473+
}
474+
475+
/**
476+
* This is a bad way of using Observable.create, because even if the consumer unsubscribes,
477+
* all elements are calculated.
478+
*/
479+
@Test def createExampleBad() {
467480
val o = Observable.create[String](observer => {
468-
// this is bad because you cannot unsubscribe!
469-
observer.onNext("a")
470-
observer.onNext("b")
481+
observer.onNext(calculateElement(0))
482+
observer.onNext(calculateElement(1))
471483
observer.onCompleted()
472484
Subscription {}
473485
})
474-
o.subscribe(println(_))
486+
o.take(1).subscribe(println(_))
487+
}
488+
489+
/**
490+
* This is the good way of doing it: If the consumer unsubscribes, no more elements are
491+
* calculated.
492+
*/
493+
@Test def createExampleGood() {
494+
val o = Observable[String](subscriber => {
495+
var i = 0
496+
while (i < 2 && !subscriber.isUnsubscribed) {
497+
subscriber.onNext(calculateElement(i))
498+
i += 1
499+
}
500+
if (!subscriber.isUnsubscribed) subscriber.onCompleted()
501+
})
502+
o.take(1).subscribe(println(_))
475503
}
476504

477505
def output(s: String): Unit = println(s)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ package rx.lang.scala
1717

1818
import java.lang.Exception
1919
import java.{ lang => jlang }
20-
2120
import scala.language.implicitConversions
2221
import scala.collection.Seq
23-
2422
import rx.functions._
2523
import rx.lang.scala.JavaConversions._
2624

@@ -56,6 +54,13 @@ object ImplicitFunctionConversions {
5654
}
5755
}
5856

57+
implicit def scalaAction1ToOnSubscribe[T](f: Subscriber[T] => Unit) =
58+
new rx.Observable.OnSubscribe[T] {
59+
def call(s: rx.Subscriber[_ >: T]): Unit = {
60+
f(s)
61+
}
62+
}
63+
5964
implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
6065
new Func0[B] {
6166
def call(): B = param

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ object JavaConversions {
3232

3333
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)
3434

35+
implicit def toJavaSubscriber[T](s: Subscriber[T]): rx.Subscriber[_ >: T] = s.asJavaSubscriber
36+
37+
implicit def toScalaSubscriber[T](s: rx.Subscriber[_ >: T]): Subscriber[T] = Subscriber(s)
38+
3539
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
3640
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)
3741

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2272,9 +2272,6 @@ object Observable {
22722272
* should invoke the Observer's [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onError onError]], and [[rx.lang.scala.Observer.onCompleted onCompleted]] methods
22732273
* appropriately.
22742274
*
2275-
* A well-formed Observable must invoke either the Observer's `onCompleted` method
2276-
* exactly once or its `onError` method exactly once.
2277-
*
22782275
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a>
22792276
* for detailed information.
22802277
*
@@ -2296,6 +2293,41 @@ object Observable {
22962293
}))
22972294
}
22982295

2296+
/*
2297+
Note: It's dangerous to have two overloads where one takes an `Observer[T] => Subscription`
2298+
function and the other takes a `Subscriber[T] => Unit` function, because expressions like
2299+
`o => Subscription{}` have both of these types.
2300+
So we call the old create method "create", and the new create method "apply".
2301+
Try it out yourself here:
2302+
def foo[T]: Unit = {
2303+
val fMeant: Observer[T] => Subscription = o => Subscription{}
2304+
val fWrong: Subscriber[T] => Unit = o => Subscription{}
2305+
}
2306+
*/
2307+
2308+
/**
2309+
* Returns an Observable that will execute the specified function when a someone subscribes to it.
2310+
*
2311+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/create.png">
2312+
*
2313+
* Write the function you pass so that it behaves as an Observable: It should invoke the
2314+
* Subscriber's `onNext`, `onError`, and `onCompleted` methods appropriately.
2315+
*
2316+
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
2317+
* information.
2318+
*
2319+
* @tparam T
2320+
* the type of the items that this Observable emits
2321+
* @param f
2322+
* a function that accepts a `Subscriber[T]`, and invokes its `onNext`,
2323+
* `onError`, and `onCompleted` methods as appropriate
2324+
* @return an Observable that, when someone subscribes to it, will execute the specified
2325+
* function
2326+
*/
2327+
def apply[T](f: Subscriber[T] => Unit): Observable[T] = {
2328+
toScalaObservable(rx.Observable.create(f))
2329+
}
2330+
22992331
/**
23002332
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
23012333
* method when the Observer subscribes to it.

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ trait Observer[-T] {
5656

5757
}
5858

59-
object Observer {
59+
object Observer extends ObserverFactoryMethods[Observer] {
6060

6161
/**
6262
* Scala calls XXX; Java receives XXX.
@@ -72,17 +72,25 @@ object Observer {
7272

7373
}
7474

75-
def apply[T]( ): Observer[T] = apply[T]((v:T)=>(), (e: Throwable)=>(), ()=>())
76-
def apply[T](onNext: T=>Unit ): Observer[T] = apply[T](onNext, (e: Throwable)=>(), ()=>())
77-
def apply[T](onNext: T=>Unit, onError: Throwable=>Unit ): Observer[T] = apply[T](onNext, onError, ()=>())
78-
def apply[T](onNext: T=>Unit, onCompleted: ()=>Unit): Observer[T] = apply[T](onNext, (e: Throwable)=>(), onCompleted)
79-
def apply[T](onNext: T=>Unit, onError: Throwable=>Unit, onCompleted: ()=>Unit): Observer[T] = {
80-
val n = onNext; val e = onError; val c = onCompleted
81-
// Java calls XXX; Scala receives XXX.
82-
Observer(new rx.Observer[T] {
83-
override def onNext(value: T): Unit = n(value)
84-
override def onError(error: Throwable): Unit = e(error)
85-
override def onCompleted(): Unit = c()
86-
})
75+
def apply[T](onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observer[T] = {
76+
val n = onNext; val e = onError; val c = onCompleted
77+
// Java calls XXX; Scala receives XXX.
78+
Observer(new rx.Observer[T] {
79+
override def onNext(value: T): Unit = n(value)
80+
override def onError(error: Throwable): Unit = e(error)
81+
override def onCompleted(): Unit = c()
82+
})
8783
}
88-
}
84+
}
85+
86+
87+
private [scala] trait ObserverFactoryMethods[P[_] <: Observer[_]] {
88+
89+
def apply[T](onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): P[T]
90+
91+
def apply[T]( ): P[T] = apply[T]((v:T)=>(), (e: Throwable)=>(), ()=>())
92+
def apply[T](onNext: T=>Unit ): P[T] = apply[T](onNext, (e: Throwable)=>(), ()=>())
93+
def apply[T](onNext: T=>Unit, onError: Throwable=>Unit ): P[T] = apply[T](onNext, onError, ()=>())
94+
def apply[T](onNext: T=>Unit, onCompleted: ()=>Unit): P[T] = apply[T](onNext, (e: Throwable)=>(), onCompleted)
95+
}
96+
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package rx.lang.scala
2+
3+
trait Subscriber[-T] extends Observer[T] with Subscription {
4+
5+
self =>
6+
7+
private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
8+
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber
9+
10+
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] {
11+
def onNext(value: T): Unit = self.onNext(value)
12+
def onError(error: Throwable): Unit = self.onError(error)
13+
def onCompleted(): Unit = self.onCompleted()
14+
}
15+
16+
/**
17+
* Used to register an unsubscribe callback.
18+
*/
19+
final def add(s: Subscription): Unit = {
20+
asJavaSubscriber.add(s.asJavaSubscription)
21+
}
22+
23+
override final def unsubscribe(): Unit = {
24+
asJavaSubscriber.unsubscribe()
25+
}
26+
27+
override final def isUnsubscribed: Boolean = {
28+
asJavaSubscriber.isUnsubscribed()
29+
}
30+
31+
}
32+
33+
object Subscriber extends ObserverFactoryMethods[Subscriber] {
34+
35+
private[scala] def apply[T](subscriber: rx.Subscriber[T]): Subscriber[T] = new Subscriber[T] {
36+
override val asJavaSubscriber = subscriber
37+
38+
override def onNext(value: T): Unit = asJavaSubscriber.onNext(value)
39+
override def onError(error: Throwable): Unit = asJavaSubscriber.onError(error)
40+
override def onCompleted(): Unit = asJavaSubscriber.onCompleted()
41+
}
42+
43+
def apply[T](onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscriber[T] = {
44+
val n = onNext; val e = onError; val c = onCompleted
45+
// Java calls XXX; Scala receives XXX.
46+
Subscriber(new rx.Subscriber[T] {
47+
override def onNext(value: T): Unit = n(value)
48+
override def onError(error: Throwable): Unit = e(error)
49+
override def onCompleted(): Unit = c()
50+
})
51+
}
52+
}

0 commit comments

Comments
 (0)