Skip to content

Commit 9e6e890

Browse files
committed
Add timeInterval to RxScala
1 parent 31ceb09 commit 9e6e890

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,4 +828,12 @@ class RxScalaDemo extends JUnitSuite {
828828
.takeRight(2, 300 millis)
829829
println(o.toBlockingObservable.toList)
830830
}
831+
832+
@Test def timeIntervalExample(): Unit = {
833+
val o = (1 to 10).toObservable
834+
.zip(Observable.interval(100 millis))
835+
.map(_._1)
836+
.timeInterval
837+
println(o.toBlockingObservable.toList)
838+
}
831839
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2871,6 +2871,35 @@ trait Observable[+T]
28712871
toScalaObservable[java.lang.Boolean](rx.Observable.sequenceEqual[U](thisJava, thatJava, equalityJava)).map(_.booleanValue)
28722872
}
28732873

2874+
/**
2875+
* Returns an Observable that emits records of the time interval between consecutive items emitted by the
2876+
* source Obsegrvable.
2877+
* <p>
2878+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeInterval.png">
2879+
*
2880+
* @return an Observable that emits time interval information items
2881+
*/
2882+
def timeInterval: Observable[(Duration, T)] = {
2883+
import scala.concurrent.duration.DurationLong
2884+
toScalaObservable(asJavaObservable.timeInterval())
2885+
.map(inv => (inv.getIntervalInMilliseconds millis, inv.getValue))
2886+
}
2887+
2888+
/**
2889+
* Returns an Observable that emits records of the time interval between consecutive items emitted by the
2890+
* source Observable, where this interval is computed on a specified Scheduler.
2891+
* <p>
2892+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeInterval.s.png">
2893+
*
2894+
* @param scheduler the [[Scheduler]] used to compute time intervals
2895+
* @return an Observable that emits time interval information items
2896+
*/
2897+
def timeInterval(scheduler: Scheduler): Observable[(Duration, T)] = {
2898+
import scala.concurrent.duration.DurationLong
2899+
toScalaObservable(asJavaObservable.timeInterval(scheduler.asJavaScheduler))
2900+
.map(inv => (inv.getIntervalInMilliseconds millis, inv.getValue))
2901+
}
2902+
28742903
/**
28752904
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass
28762905
* the values of the current Observable through the function.

0 commit comments

Comments
 (0)