Skip to content

Commit 08dfede

Browse files
Merge pull request #1124 from zsxwing/scala-lift
Add lift to rxscala
2 parents d62ddb7 + aac1903 commit 08dfede

File tree

6 files changed

+154
-12
lines changed

6 files changed

+154
-12
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,4 +645,67 @@ class RxScalaDemo extends JUnitSuite {
645645
val o : Observable[String] = List("alice", "bob", "carol").toObservable
646646
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
647647
}
648+
649+
@Test def liftExample1(): Unit = {
650+
// Add "No. " in front of each item
651+
val o = List(1, 2, 3).toObservable.lift {
652+
subscriber: Subscriber[String] =>
653+
Subscriber[Int](
654+
subscriber,
655+
(v: Int) => subscriber.onNext("No. " + v),
656+
e => subscriber.onError(e),
657+
() => subscriber.onCompleted
658+
)
659+
}.toBlockingObservable.toList
660+
println(o)
661+
}
662+
663+
@Test def liftExample2(): Unit = {
664+
// Split the input Strings with " "
665+
val splitStringsWithSpace = (subscriber: Subscriber[String]) => {
666+
Subscriber[String](
667+
subscriber,
668+
(v: String) => v.split(" ").foreach(subscriber.onNext(_)),
669+
e => subscriber.onError(e),
670+
() => subscriber.onCompleted
671+
)
672+
}
673+
674+
// Convert the input Strings to Chars
675+
val stringsToChars = (subscriber: Subscriber[Char]) => {
676+
Subscriber[String](
677+
subscriber,
678+
(v: String) => v.foreach(subscriber.onNext(_)),
679+
e => subscriber.onError(e),
680+
() => subscriber.onCompleted
681+
)
682+
}
683+
684+
// Skip the first n items. If the length of source is less than n, throw an IllegalArgumentException
685+
def skipWithException[T](n: Int) = (subscriber: Subscriber[T]) => {
686+
var count = 0
687+
Subscriber[T](
688+
subscriber,
689+
(v: T) => {
690+
if (count >= n) subscriber.onNext(v)
691+
count += 1
692+
},
693+
e => subscriber.onError(e),
694+
() => if (count < n) subscriber.onError(new IllegalArgumentException("There is no enough items")) else subscriber.onCompleted
695+
)
696+
}
697+
698+
val o = List("RxJava – Reactive Extensions for the JVM").toObservable
699+
.lift(splitStringsWithSpace)
700+
.map(_.toLowerCase)
701+
.lift(stringsToChars)
702+
.filter(_.isLetter)
703+
.lift(skipWithException(100))
704+
try {
705+
o.toBlockingObservable.toList
706+
}
707+
catch {
708+
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
709+
}
710+
}
648711
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,12 @@ object JavaConversions {
5454
val asJavaObservable = observable
5555
}
5656
}
57+
58+
implicit def toJavaOperator[T, R](operator: Subscriber[R] => Subscriber[T]): rx.Observable.Operator[R, T] = {
59+
new rx.Observable.Operator[R, T] {
60+
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = {
61+
toJavaSubscriber[T](operator(toScalaSubscriber[R](subscriber)))
62+
}
63+
}
64+
}
5765
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2057,7 +2057,28 @@ trait Observable[+T]
20572057
* @see [[Observable.first]]
20582058
*/
20592059
def head: Observable[T] = first
2060-
2060+
2061+
/**
2062+
* Returns an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
2063+
* if the source Observable is empty.
2064+
*
2065+
* @return an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
2066+
* if the source Observable is empty.
2067+
*/
2068+
def tail: Observable[T] = {
2069+
lift {
2070+
(subscriber: Subscriber[T]) => {
2071+
var isFirst = true
2072+
Subscriber[T](
2073+
subscriber,
2074+
(v: T) => if(isFirst) isFirst = false else subscriber.onNext(v),
2075+
e => subscriber.onError(e),
2076+
() => if(isFirst) subscriber.onError(new UnsupportedOperationException("tail of empty Observable")) else subscriber.onCompleted
2077+
)
2078+
}
2079+
}
2080+
}
2081+
20612082
/**
20622083
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
20632084
* an `NoSuchElementException` if the source Observable is empty.
@@ -2537,6 +2558,23 @@ trait Observable[+T]
25372558
toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap)
25382559
}
25392560

2561+
/**
2562+
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass
2563+
* the values of the current Observable through the function.
2564+
* <p>
2565+
* In other words, this allows chaining Observers together on an Observable for acting on the values within
2566+
* the Observable.
2567+
* {{{
2568+
* observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
2569+
* }}}
2570+
*
2571+
* @param operator
2572+
* @return an Observable that emits values that are the result of applying the bind function to the values
2573+
* of the current Observable
2574+
*/
2575+
def lift[R](operator: Subscriber[R] => Subscriber[T]): Observable[R] = {
2576+
toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator)))
2577+
}
25402578
}
25412579

25422580
/**

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,14 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
4949
override def onCompleted(): Unit = c()
5050
})
5151
}
52+
53+
def apply[T](subscriber: Subscriber[_], onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscriber[T] = {
54+
val n = onNext; val e = onError; val c = onCompleted
55+
// Java calls XXX; Scala receives XXX.
56+
Subscriber(new rx.Subscriber[T](subscriber.asJavaSubscriber) {
57+
override def onNext(value: T): Unit = n(value)
58+
override def onError(error: Throwable): Unit = e(error)
59+
override def onCompleted(): Unit = c()
60+
})
61+
}
5262
}

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class CompletenessTest extends JUnitSuite {
7979
"firstOrDefault(T)" -> "firstOrElse(=> U)",
8080
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]",
8181
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
82+
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
8283
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
8384
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
8485
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
@@ -89,16 +90,28 @@ class CompletenessTest extends JUnitSuite {
8990
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
9091
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
9192
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
93+
"retry()" -> "retry()",
9294
"scan(Func2[T, T, T])" -> unnecessary,
9395
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
9496
"skip(Int)" -> "drop(Int)",
97+
"skip(Long, TimeUnit)" -> "drop(Duration)",
98+
"skip(Long, TimeUnit, Scheduler)" -> "drop(Duration, Scheduler)",
9599
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
96100
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
97101
"startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]",
102+
"skipLast(Int)" -> "dropRight(Int)",
103+
"skipLast(Long, TimeUnit)" -> "dropRight(Duration)",
104+
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
98105
"takeFirst()" -> "first",
99106
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
100107
"takeLast(Int)" -> "takeRight(Int)",
101108
"takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]",
109+
"timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])",
110+
"timeout(Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V])",
111+
"timeout(Long, TimeUnit, Observable[_ <: T])" -> "timeout(Duration, Observable[U])",
112+
"timeout(Long, TimeUnit, Observable[_ <: T], Scheduler)" -> "timeout(Duration, Observable[U], Scheduler)",
113+
"timer(Long, Long, TimeUnit)" -> "timer(Duration, Duration)",
114+
"timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)",
102115
"toList()" -> "toSeq",
103116
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
104117
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,26 @@ class ObservableTests extends JUnitSuite {
146146
assertEquals(l2, l1)
147147
}
148148

149-
/*
150-
@Test def testHead() {
151-
val observer = mock(classOf[Observer[Int]])
152-
val o = Observable().head
153-
val sub = o.subscribe(observer)
154-
155-
verify(observer, never).onNext(any(classOf[Int]))
156-
verify(observer, never).onCompleted()
157-
verify(observer, times(1)).onError(any(classOf[NoSuchElementException]))
158-
}
159-
*/
149+
@Test def testHead() {
150+
val o: Observable[String] = List("alice", "bob", "carol").toObservable.head
151+
assertEquals(List("alice"), o.toBlockingObservable.toList)
152+
}
153+
154+
@Test(expected = classOf[NoSuchElementException])
155+
def testHeadWithEmptyObservable() {
156+
val o: Observable[String] = List[String]().toObservable.head
157+
o.toBlockingObservable.toList
158+
}
160159

160+
@Test def testTail() {
161+
val o: Observable[String] = List("alice", "bob", "carol").toObservable.tail
162+
assertEquals(List("bob", "carol"), o.toBlockingObservable.toList)
163+
assertEquals(List("bob", "carol"), o.toBlockingObservable.toList)
164+
}
165+
166+
@Test(expected = classOf[UnsupportedOperationException])
167+
def testTailWithEmptyObservable() {
168+
val o: Observable[String] = List[String]().toObservable.tail
169+
o.toBlockingObservable.toList
170+
}
161171
}

0 commit comments

Comments
 (0)