Skip to content

Commit 7c1cbf1

Browse files
committed
Provide Observable.timestamp(Scheduler) to be used in the tests.
1 parent e335e61 commit 7c1cbf1

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,21 @@ trait Observable[+T]
281281
.map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
282282
}
283283

284+
/**
285+
* Wraps each item emitted by a source Observable in a timestamped tuple
286+
* with timestamps provided by the given Scheduler.
287+
* <p>
288+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timestamp.s.png">
289+
*
290+
* @param scheduler [[rx.lang.scala.Scheduler]] to use as a time source.
291+
* @return an Observable that emits timestamped items from the source
292+
* Observable with timestamps provided by the given Scheduler
293+
*/
294+
def timestamp(scheduler: Scheduler): Observable[(Long, T)] = {
295+
toScalaObservable[rx.util.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler))
296+
.map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
297+
}
298+
284299
/**
285300
* Returns an Observable formed from this Observable and another Observable by combining
286301
* corresponding elements in pairs.

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import scala.collection.mutable.ListBuffer
1819
import scala.concurrent.{Future, Await}
1920
import scala.concurrent.duration.Duration
2021
import scala.concurrent.ExecutionContext.Implicits.global
@@ -123,6 +124,21 @@ class ObservableTests extends JUnitSuite {
123124
assertEquals(6, o.toBlockingObservable.single)
124125
}
125126

127+
@Test def testTimestampWithScheduler() {
128+
val c = 10
129+
val s = TestScheduler()
130+
val o1 = Observable interval (1.milliseconds, s) map (_ + 1)
131+
val o2 = o1 timestamp s
132+
val l = ListBuffer[(Long, Long)]()
133+
o2.subscribe (
134+
onNext = (l += _)
135+
)
136+
s advanceTimeTo c.milliseconds
137+
val (l1, l2) = l.toList.unzip
138+
assertTrue(l1.size == c)
139+
assertEquals(l2, l1)
140+
}
141+
126142
/*
127143
@Test def testHead() {
128144
val observer = mock(classOf[Observer[Int]])

0 commit comments

Comments
 (0)