Skip to content

Commit 6eb7162

Browse files
Merge pull request #876 from benjchristensen/751-merge
Manual Merge of #750
2 parents b9fe278 + 840f721 commit 6eb7162

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,21 @@ trait Observable[+T]
282282
.map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
283283
}
284284

285+
/**
286+
* Wraps each item emitted by a source Observable in a timestamped tuple
287+
* with timestamps provided by the given Scheduler.
288+
* <p>
289+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timestamp.s.png">
290+
*
291+
* @param scheduler [[rx.lang.scala.Scheduler]] to use as a time source.
292+
* @return an Observable that emits timestamped items from the source
293+
* Observable with timestamps provided by the given Scheduler
294+
*/
295+
def timestamp(scheduler: Scheduler): Observable[(Long, T)] = {
296+
toScalaObservable[rx.util.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler))
297+
.map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
298+
}
299+
285300
/**
286301
* Returns an Observable formed from this Observable and another Observable by combining
287302
* corresponding elements in pairs.

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import scala.collection.mutable.ListBuffer
1819
import scala.concurrent.{Future, Await}
1920
import scala.concurrent.duration.Duration
2021
import scala.concurrent.ExecutionContext.Implicits.global
@@ -130,6 +131,21 @@ class ObservableTests extends JUnitSuite {
130131
assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList)
131132
}
132133

134+
@Test def testTimestampWithScheduler() {
135+
val c = 10
136+
val s = TestScheduler()
137+
val o1 = Observable interval (1.milliseconds, s) map (_ + 1)
138+
val o2 = o1 timestamp s
139+
val l = ListBuffer[(Long, Long)]()
140+
o2.subscribe (
141+
onNext = (l += _)
142+
)
143+
s advanceTimeTo c.milliseconds
144+
val (l1, l2) = l.toList.unzip
145+
assertTrue(l1.size == c)
146+
assertEquals(l2, l1)
147+
}
148+
133149
/*
134150
@Test def testHead() {
135151
val observer = mock(classOf[Observer[Int]])

0 commit comments

Comments
 (0)