Skip to content

Commit 71acd87

Browse files
committed
Add lift to rxscala
1 parent 95e0636 commit 71acd87

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,4 +645,60 @@ class RxScalaDemo extends JUnitSuite {
645645
val o : Observable[String] = List("alice", "bob", "carol").toObservable
646646
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
647647
}
648+
649+
@Test def liftExample1(): Unit = {
650+
// Add "No. " in front of each item
651+
val o = List(1, 2, 3).toObservable.lift {
652+
subscriber: Subscriber[String] =>
653+
Subscriber(
654+
subscriber,
655+
v => subscriber.onNext("No. " + v),
656+
e => subscriber.onError(e),
657+
() => subscriber.onCompleted
658+
)
659+
}.toBlockingObservable.toList
660+
println(o)
661+
}
662+
663+
@Test def liftExample2(): Unit = {
664+
val o = Observable {
665+
subscriber: Subscriber[Int] => {
666+
for (i <- 1 to 10 if !subscriber.isUnsubscribed) {
667+
println("emit " + i)
668+
subscriber.onNext(i)
669+
}
670+
if (!subscriber.isUnsubscribed) {
671+
println("emit onCompleted")
672+
subscriber.onCompleted
673+
}
674+
}
675+
}
676+
// Take the first 5 items
677+
val take = 5
678+
val result = o.lift {
679+
subscriber: Subscriber[String] =>
680+
var index = 0
681+
Subscriber(
682+
subscriber,
683+
v => {
684+
if (index < take) {
685+
subscriber.onNext("No. " + v)
686+
}
687+
if (index == take - 1) {
688+
subscriber.onCompleted
689+
}
690+
index += 1
691+
},
692+
e => subscriber.onError(e),
693+
() => subscriber.onCompleted
694+
)
695+
}.toBlockingObservable.toList
696+
println(result)
697+
// emit 1
698+
// emit 2
699+
// emit 3
700+
// emit 4
701+
// emit 5
702+
// List(No. 1, No. 2, No. 3, No. 4, No. 5)
703+
}
648704
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2537,6 +2537,29 @@ trait Observable[+T]
25372537
toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap)
25382538
}
25392539

2540+
/**
2541+
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass
2542+
* the values of the current Observable through the function.
2543+
* <p>
2544+
* In other words, this allows chaining Observers together on an Observable for acting on the values within
2545+
* the Observable.
2546+
* {{{
2547+
* observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
2548+
* }}}
2549+
*
2550+
* @param operator
2551+
* @return an Observable that emits values that are the result of applying the bind function to the values
2552+
* of the current Observable
2553+
*/
2554+
def lift[R](operator: Subscriber[_ >: R] => Subscriber[_ >: T]): Observable[R] = {
2555+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
2556+
val thatJava = thisJava.lift[R](new rx.Observable.Operator[R, T] {
2557+
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = {
2558+
toJavaSubscriber(operator.call(toScalaSubscriber(subscriber)))
2559+
}
2560+
})
2561+
toScalaObservable(thatJava)
2562+
}
25402563
}
25412564

25422565
/**

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,14 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
4949
override def onCompleted(): Unit = c()
5050
})
5151
}
52+
53+
def apply[T](subscriber: Subscriber[_], onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscriber[T] = {
54+
val n = onNext; val e = onError; val c = onCompleted
55+
// Java calls XXX; Scala receives XXX.
56+
Subscriber(new rx.Subscriber[T](subscriber.asJavaSubscriber) {
57+
override def onNext(value: T): Unit = n(value)
58+
override def onError(error: Throwable): Unit = e(error)
59+
override def onCompleted(): Unit = c()
60+
})
61+
}
5262
}

0 commit comments

Comments
 (0)