Skip to content

Commit 59ebdb2

Browse files
committed
Add Operator conversion, tail and update liftExample2
1 parent 71acd87 commit 59ebdb2

File tree

4 files changed

+100
-60
lines changed

4 files changed

+100
-60
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -650,9 +650,9 @@ class RxScalaDemo extends JUnitSuite {
650650
// Add "No. " in front of each item
651651
val o = List(1, 2, 3).toObservable.lift {
652652
subscriber: Subscriber[String] =>
653-
Subscriber(
653+
Subscriber[Int](
654654
subscriber,
655-
v => subscriber.onNext("No. " + v),
655+
(v: Int) => subscriber.onNext("No. " + v),
656656
e => subscriber.onError(e),
657657
() => subscriber.onCompleted
658658
)
@@ -661,44 +661,51 @@ class RxScalaDemo extends JUnitSuite {
661661
}
662662

663663
@Test def liftExample2(): Unit = {
664-
val o = Observable {
665-
subscriber: Subscriber[Int] => {
666-
for (i <- 1 to 10 if !subscriber.isUnsubscribed) {
667-
println("emit " + i)
668-
subscriber.onNext(i)
669-
}
670-
if (!subscriber.isUnsubscribed) {
671-
println("emit onCompleted")
672-
subscriber.onCompleted
673-
}
674-
}
664+
// Split the input Strings with " "
665+
val splitStringsWithSpace = (subscriber: Subscriber[String]) => {
666+
Subscriber[String](
667+
subscriber,
668+
(v: String) => v.split(" ").foreach(subscriber.onNext(_)),
669+
e => subscriber.onError(e),
670+
() => subscriber.onCompleted
671+
)
672+
}
673+
674+
// Convert the input Strings to Chars
675+
val stringsToChars = (subscriber: Subscriber[Char]) => {
676+
Subscriber[String](
677+
subscriber,
678+
(v: String) => v.foreach(subscriber.onNext(_)),
679+
e => subscriber.onError(e),
680+
() => subscriber.onCompleted
681+
)
682+
}
683+
684+
// Skip the first n items. If the length of source is less than n, throw an IllegalArgumentException
685+
def skipWithException[T](n: Int) = (subscriber: Subscriber[T]) => {
686+
var count = 0
687+
Subscriber[T](
688+
subscriber,
689+
(v: T) => {
690+
if (count >= n) subscriber.onNext(v)
691+
count += 1
692+
},
693+
e => subscriber.onError(e),
694+
() => if (count < n) subscriber.onError(new IllegalArgumentException("There is no enough items")) else subscriber.onCompleted
695+
)
696+
}
697+
698+
val o = List("RxJava – Reactive Extensions for the JVM").toObservable
699+
.lift(splitStringsWithSpace)
700+
.map(_.toLowerCase)
701+
.lift(stringsToChars)
702+
.filter(_.isLetter)
703+
.lift(skipWithException(100))
704+
try {
705+
o.toBlockingObservable.toList
706+
}
707+
catch {
708+
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
675709
}
676-
// Take the first 5 items
677-
val take = 5
678-
val result = o.lift {
679-
subscriber: Subscriber[String] =>
680-
var index = 0
681-
Subscriber(
682-
subscriber,
683-
v => {
684-
if (index < take) {
685-
subscriber.onNext("No. " + v)
686-
}
687-
if (index == take - 1) {
688-
subscriber.onCompleted
689-
}
690-
index += 1
691-
},
692-
e => subscriber.onError(e),
693-
() => subscriber.onCompleted
694-
)
695-
}.toBlockingObservable.toList
696-
println(result)
697-
// emit 1
698-
// emit 2
699-
// emit 3
700-
// emit 4
701-
// emit 5
702-
// List(No. 1, No. 2, No. 3, No. 4, No. 5)
703710
}
704711
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,12 @@ object JavaConversions {
5454
val asJavaObservable = observable
5555
}
5656
}
57+
58+
implicit def toJavaOperator[T, R](operator: Subscriber[R] => Subscriber[_ >: T]): rx.Observable.Operator[R, T] = {
59+
new rx.Observable.Operator[R, T] {
60+
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = {
61+
toJavaSubscriber[T](operator(toScalaSubscriber[R](subscriber)))
62+
}
63+
}
64+
}
5765
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2057,7 +2057,28 @@ trait Observable[+T]
20572057
* @see [[Observable.first]]
20582058
*/
20592059
def head: Observable[T] = first
2060-
2060+
2061+
/**
2062+
* Returns an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
2063+
* if the source Observable is empty.
2064+
*
2065+
* @return an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
2066+
* if the source Observable is empty.
2067+
*/
2068+
def tail: Observable[T] = {
2069+
lift {
2070+
(subscriber: Subscriber[T]) => {
2071+
var isFirst = true
2072+
Subscriber[T](
2073+
subscriber,
2074+
(v: T) => if(isFirst) isFirst = false else subscriber.onNext(v),
2075+
e => subscriber.onError(e),
2076+
() => if(isFirst) subscriber.onError(new UnsupportedOperationException("tail of empty Observable")) else subscriber.onCompleted
2077+
)
2078+
}
2079+
}
2080+
}
2081+
20612082
/**
20622083
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
20632084
* an `NoSuchElementException` if the source Observable is empty.
@@ -2551,14 +2572,8 @@ trait Observable[+T]
25512572
* @return an Observable that emits values that are the result of applying the bind function to the values
25522573
* of the current Observable
25532574
*/
2554-
def lift[R](operator: Subscriber[_ >: R] => Subscriber[_ >: T]): Observable[R] = {
2555-
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
2556-
val thatJava = thisJava.lift[R](new rx.Observable.Operator[R, T] {
2557-
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = {
2558-
toJavaSubscriber(operator.call(toScalaSubscriber(subscriber)))
2559-
}
2560-
})
2561-
toScalaObservable(thatJava)
2575+
def lift[R](operator: Subscriber[R] => Subscriber[_ >: T]): Observable[R] = {
2576+
toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator)))
25622577
}
25632578
}
25642579

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,26 @@ class ObservableTests extends JUnitSuite {
146146
assertEquals(l2, l1)
147147
}
148148

149-
/*
150-
@Test def testHead() {
151-
val observer = mock(classOf[Observer[Int]])
152-
val o = Observable().head
153-
val sub = o.subscribe(observer)
154-
155-
verify(observer, never).onNext(any(classOf[Int]))
156-
verify(observer, never).onCompleted()
157-
verify(observer, times(1)).onError(any(classOf[NoSuchElementException]))
158-
}
159-
*/
149+
@Test def testHead() {
150+
val o: Observable[String] = List("alice", "bob", "carol").toObservable.head
151+
assertEquals(List("alice"), o.toBlockingObservable.toList)
152+
}
153+
154+
@Test(expected = classOf[NoSuchElementException])
155+
def testHeadWithEmptyObservable() {
156+
val o: Observable[String] = List[String]().toObservable.head
157+
o.toBlockingObservable.toList
158+
}
160159

160+
@Test def testTail() {
161+
val o: Observable[String] = List("alice", "bob", "carol").toObservable.tail
162+
assertEquals(List("bob", "carol"), o.toBlockingObservable.toList)
163+
assertEquals(List("bob", "carol"), o.toBlockingObservable.toList)
164+
}
165+
166+
@Test(expected = classOf[UnsupportedOperationException])
167+
def testTailWithEmptyObservable() {
168+
val o: Observable[String] = List[String]().toObservable.tail
169+
o.toBlockingObservable.toList
170+
}
161171
}

0 commit comments

Comments
 (0)