Skip to content

Commit 8ebb962

Browse files
committed
Add take variants to RxScala
1 parent 7607f8f commit 8ebb962

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,4 +799,12 @@ class RxScalaDemo extends JUnitSuite {
799799
assertFalse(o1.sequenceEqual(o3).toBlockingObservable.single)
800800
assertTrue(o1.sequenceEqual(o4).toBlockingObservable.single)
801801
}
802+
803+
@Test def takeExample(): Unit = {
804+
val o = (1 to 20).toObservable
805+
.zip(Observable.interval(300 millis))
806+
.map(_._1)
807+
.take(2 seconds)
808+
println(o.toBlockingObservable.toList)
809+
}
802810
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,6 +1457,33 @@ trait Observable[+T]
14571457
toScalaObservable[T](asJavaObservable.take(n))
14581458
}
14591459

1460+
/**
1461+
* Returns an Observable that emits those items emitted by source Observable before a specified time runs out.
1462+
* <p>
1463+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/take.t.png">
1464+
*
1465+
* @param time the length of the time window
1466+
* @return an Observable that emits those items emitted by the source Observable before the time runs out
1467+
*/
1468+
def take(time: Duration): Observable[T] = {
1469+
toScalaObservable[T](asJavaObservable.take(time.length, time.unit))
1470+
}
1471+
1472+
/**
1473+
* Returns an Observable that emits those items emitted by source Observable before a specified time (on
1474+
* specified Scheduler) runs out
1475+
* <p>
1476+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/take.ts.png">
1477+
*
1478+
* @param time the length of the time window
1479+
* @param scheduler the Scheduler used for time source
1480+
* @return an Observable that emits those items emitted by the source Observable before the time runs out,
1481+
* according to the specified Scheduler
1482+
*/
1483+
def take(time: Duration, scheduler: Scheduler) {
1484+
toScalaObservable[T](asJavaObservable.take(time.length, time.unit, scheduler.asJavaScheduler))
1485+
}
1486+
14601487
/**
14611488
* Returns an Observable that emits items emitted by the source Observable so long as a
14621489
* specified condition is true.

0 commit comments

Comments
 (0)