Skip to content

Commit 24cecf3

Browse files
committed
Add publishLast to RxScala
1 parent a34cba2 commit 24cecf3

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,4 +1012,40 @@ class RxScalaDemo extends JUnitSuite {
10121012
subscription.unsubscribe()
10131013
}
10141014

1015+
def createAHotObservable: Observable[String] = {
1016+
var first = true
1017+
Observable[String] {
1018+
subscriber =>
1019+
if (first) {
1020+
subscriber.onNext("1st: First")
1021+
subscriber.onNext("1st: Last")
1022+
first = false
1023+
}
1024+
else {
1025+
subscriber.onNext("2nd: First")
1026+
subscriber.onNext("2nd: Last")
1027+
}
1028+
subscriber.onCompleted()
1029+
}
1030+
}
1031+
1032+
@Test def withoutPublishLastExample() {
1033+
val hot = createAHotObservable
1034+
hot.takeRight(1).subscribe(n => println(s"subscriber 1 gets $n"))
1035+
hot.takeRight(1).subscribe(n => println(s"subscriber 2 gets $n"))
1036+
}
1037+
1038+
@Test def publishLastExample() {
1039+
val hot = createAHotObservable
1040+
val o = hot.publishLast
1041+
o.subscribe(n => println(s"subscriber 1 gets $n"))
1042+
o.subscribe(n => println(s"subscriber 2 gets $n"))
1043+
o.connect
1044+
}
1045+
1046+
@Test def publishLastExample2() {
1047+
val hot = createAHotObservable
1048+
val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice
1049+
o.subscribe(n => println(s"subscriber gets $n"))
1050+
}
10151051
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,40 @@ trait Observable[+T]
15121512
toScalaObservable[R](thisJava.publish(fJava, initialValue))
15131513
}
15141514

1515+
/**
1516+
* Returns a [[ConnectableObservable]] that emits only the last item emitted by the source Observable.
1517+
* A [[ConnectableObservable]] resembles an ordinary Observable, except that it does not begin emitting items
1518+
* when it is subscribed to, but only when its `connect` method is called.
1519+
* <p>
1520+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishLast.png">
1521+
*
1522+
* @return a [[ConnectableObservable]] that emits only the last item emitted by the source Observable
1523+
*/
1524+
def publishLast: ConnectableObservable[T] = {
1525+
new ConnectableObservable[T](asJavaObservable.publishLast())
1526+
}
1527+
1528+
/**
1529+
* Returns an Observable that emits an item that results from invoking a specified selector on the last item
1530+
* emitted by a [[ConnectableObservable]] that shares a single subscription to the source Observable.
1531+
* <p>
1532+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishLast.f.png">
1533+
*
1534+
* @param selector a function that can use the multicasted source sequence as many times as needed, without
1535+
* causing multiple subscriptions to the source Observable. Subscribers to the source will only
1536+
* receive the last item emitted by the source.
1537+
* @return an Observable that emits an item that is the result of invoking the selector on a [[ConnectableObservable]]
1538+
* that shares a single subscription to the source Observable
1539+
*/
1540+
def publishLast[R](selector: Observable[T] => Observable[R]): Observable[R] = {
1541+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
1542+
val fJava = new rx.functions.Func1[rx.Observable[T], rx.Observable[R]]() {
1543+
override def call(jo: rx.Observable[T]): rx.Observable[R] =
1544+
selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1545+
}
1546+
toScalaObservable[R](thisJava.publishLast(fJava))
1547+
}
1548+
15151549
// TODO add Scala-like aggregate function
15161550

15171551
/**

0 commit comments

Comments
 (0)