Skip to content

Commit 31ceb09

Browse files
committed
Add takeRight(takeLast) variants to RxScala
1 parent 8ebb962 commit 31ceb09

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,4 +807,25 @@ class RxScalaDemo extends JUnitSuite {
807807
.take(2 seconds)
808808
println(o.toBlockingObservable.toList)
809809
}
810+
811+
@Test def takeRightExample(): Unit = {
812+
val o = (1 to 6).toObservable.takeRight(3)
813+
assertEquals(List(4, 5, 6), o.toBlockingObservable.toList)
814+
}
815+
816+
@Test def takeRightExample2(): Unit = {
817+
val o = (1 to 10).toObservable
818+
.zip(Observable.interval(100 millis))
819+
.map(_._1)
820+
.takeRight(300 millis)
821+
println(o.toBlockingObservable.toList)
822+
}
823+
824+
@Test def takeRightExample3(): Unit = {
825+
val o = (1 to 10).toObservable
826+
.zip(Observable.interval(100 millis))
827+
.map(_._1)
828+
.takeRight(2, 300 millis)
829+
println(o.toBlockingObservable.toList)
830+
}
810831
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1516,6 +1516,72 @@ trait Observable[+T]
15161516
toScalaObservable[T](asJavaObservable.takeLast(count))
15171517
}
15181518

1519+
/**
1520+
* Return an Observable that emits the items from the source Observable that were emitted in a specified
1521+
* window of `time` before the Observable completed.
1522+
* <p>
1523+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.t.png">
1524+
*
1525+
* @param time the length of the time window
1526+
* @return an Observable that emits the items from the source Observable that were emitted in the window of
1527+
* time before the Observable completed specified by `time`
1528+
*/
1529+
def takeRight(time: Duration): Observable[T] = {
1530+
toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit))
1531+
}
1532+
1533+
/**
1534+
* Return an Observable that emits the items from the source Observable that were emitted in a specified
1535+
* window of `time` before the Observable completed, where the timing information is provided by a specified
1536+
* Scheduler.
1537+
* <p>
1538+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.ts.png">
1539+
*
1540+
* @param time the length of the time window
1541+
* @param scheduler the Scheduler that provides the timestamps for the Observed items
1542+
* @return an Observable that emits the items from the source Observable that were emitted in the window of
1543+
* time before the Observable completed specified by `time`, where the timing information is
1544+
* provided by `scheduler`
1545+
*/
1546+
def takeRight(time: Duration, scheduler: Scheduler): Observable[T] = {
1547+
toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit, scheduler.asJavaScheduler))
1548+
}
1549+
1550+
/**
1551+
* Return an Observable that emits at most a specified number of items from the source Observable that were
1552+
* emitted in a specified window of time before the Observable completed.
1553+
* <p>
1554+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.tn.png">
1555+
*
1556+
* @param count the maximum number of items to emit
1557+
* @param time the length of the time window
1558+
* @return an Observable that emits at most `count` items from the source Observable that were emitted
1559+
* in a specified window of time before the Observable completed
1560+
* @throws IllegalArgumentException if `count` is less than zero
1561+
*/
1562+
def takeRight(count: Int, time: Duration): Observable[T] = {
1563+
toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit))
1564+
}
1565+
1566+
/**
1567+
* Return an Observable that emits at most a specified number of items from the source Observable that were
1568+
* emitted in a specified window of `time` before the Observable completed, where the timing information is
1569+
* provided by a given Scheduler.
1570+
* <p>
1571+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.tns.png">
1572+
*
1573+
* @param count the maximum number of items to emit
1574+
* @param time the length of the time window
1575+
* @param scheduler the Scheduler that provides the timestamps for the observed items
1576+
* @return an Observable that emits at most `count` items from the source Observable that were emitted
1577+
* in a specified window of time before the Observable completed, where the timing information is
1578+
* provided by the given `scheduler`
1579+
* @throws IllegalArgumentException if `count` is less than zero
1580+
*/
1581+
def takeRight(count: Int, time: Duration, scheduler: Scheduler): Observable[T] = {
1582+
toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit, scheduler.asJavaScheduler))
1583+
}
1584+
15191585
/**
15201586
* Returns an Observable that emits the items from the source Observable only until the
15211587
* `other` Observable emits an item.

0 commit comments

Comments
 (0)