Skip to content

Commit a5fe8dc

Browse files
Opening/Closing, Timestamped with unapply, BlockingObservable with WithFilter
1 parent f20a8eb commit a5fe8dc

File tree

4 files changed

+71
-22
lines changed

4 files changed

+71
-22
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.concurrent.duration._
2222
import org.junit.{Before, Test, Ignore}
2323
import org.junit.Assert._
2424
import rx.lang.scala.concurrency.NewThreadScheduler
25+
import rx.lang.scala.util.Timestamped
2526

2627
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
2728
class RxScalaDemo extends JUnitSuite {
@@ -375,6 +376,13 @@ class RxScalaDemo extends JUnitSuite {
375376
assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single)
376377
}
377378

379+
@Test def timestampExample() {
380+
val timestamped = Observable.interval(100 millis).take(3).timestamp.toBlockingObservable
381+
for (Timestamped(millis, value) <- timestamped if value > 0) {
382+
println(value + " at t = " + millis)
383+
}
384+
}
385+
378386
def output(s: String): Unit = println(s)
379387

380388
// blocks until obs has completed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
199199
* @return an Observable that emits timestamped items from the source Observable
200200
*/
201201
def timestamp: Observable[Timestamped[T]] = {
202-
Observable[Timestamped[T]](asJava.timestamp())
202+
Observable[rx.util.Timestamped[_ <: T]](asJava.timestamp()).map(Timestamped(_))
203203
}
204204

205205
/**

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@ package rx.lang.scala.observables
1818
import scala.collection.JavaConverters._
1919
import rx.lang.scala.ImplicitFunctionConversions._
2020

21+
/**
22+
* An Observable that provides blocking operators.
23+
*
24+
* You can obtain a BlockingObservable from an Observable using [[Observable.toBlockingObservable]]
25+
*/
2126
class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <: T])
2227
extends AnyVal
2328
{
2429

2530
/**
2631
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
2732
* completes.
28-
* <p>
33+
*
2934
* NOTE: This will block even if the Observable is asynchronous.
30-
* <p>
35+
*
3136
* This is similar to {@link Observable#subscribe(Observer)}, but it blocks. Because it blocks it does
3237
* not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods.
33-
* <p>
38+
*
3439
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.forEach.png">
3540
*
3641
* @param onNext
@@ -41,6 +46,10 @@ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <:
4146
def foreach(f: T => Unit): Unit = {
4247
asJava.forEach(f);
4348
}
49+
50+
def withFilter(p: T => Boolean): WithFilter[T] = {
51+
new WithFilter[T](p, asJava)
52+
}
4453

4554
// last -> use toIterable.last
4655
// lastOrDefault -> use toIterable.lastOption
@@ -118,3 +127,23 @@ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <:
118127
}
119128

120129
}
130+
131+
// Cannot yet have inner class because of this error message:
132+
// "implementation restriction: nested class is not allowed in value class.
133+
// This restriction is planned to be removed in subsequent releases."
134+
class WithFilter[+T] private[observables] (p: T => Boolean, asJava: rx.observables.BlockingObservable[_ <: T]) {
135+
import rx.lang.scala.ImplicitFunctionConversions._
136+
137+
// there's no map and flatMap here, they're only available on Observable
138+
139+
def withFilter(q: T => Boolean) = new WithFilter[T]((x: T) => p(x) && q(x), asJava)
140+
141+
def foreach(f: T => Unit): Unit = {
142+
asJava.forEach((e: T) => {
143+
if (p(e)) f(e)
144+
})
145+
}
146+
147+
}
148+
149+

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,45 @@
1616
package rx.lang.scala
1717

1818
package object util {
19-
type Closing = rx.util.Closing
20-
21-
object Closings {
22-
def create(): Closing = rx.util.Closings.create()
23-
}
24-
25-
type CompositeException = rx.util.CompositeException
26-
27-
// TODO not sure if we need this in Scala
28-
object Exceptions {
29-
def propageate(ex: Throwable) = rx.util.Exceptions.propagate(ex)
30-
}
31-
32-
// rx.util.OnErrorNotImplementedException TODO what's this?
3319

20+
/**
21+
* Tagging interface for objects which can open buffers.
22+
* @see [[Observable.buffer]]
23+
*/
3424
type Opening = rx.util.Opening
3525

36-
object Openings {
37-
def create(): Opening = rx.util.Openings.create()
38-
}
26+
/**
27+
* Creates an object which can open buffers.
28+
* @see [[Observable.buffer]]
29+
*/
30+
def Opening() = rx.util.Openings.create()
31+
32+
/**
33+
* Tagging interface for objects which can close buffers.
34+
* @see [[Observable.buffer]]
35+
*/
36+
type Closing = rx.util.Closing
3937

38+
/**
39+
* Creates an object which can close buffers.
40+
* @see [[Observable.buffer]]
41+
*/
42+
def Closing() = rx.util.Closings.create()
43+
4044
// rx.util.Range not needed because there's a standard Scala Range
4145

42-
type Timestamped[+T] = rx.util.Timestamped[_ <: T]
46+
implicit class Timestamped[+T](val asJava: rx.util.Timestamped[_ <: T]) {}
47+
4348
object Timestamped {
4449
def apply[T](timestampMillis: Long, value: T): Timestamped[T] = {
4550
new rx.util.Timestamped(timestampMillis, value)
4651
}
52+
53+
def unapply[T](v: Timestamped[T]): Option[(Long, T)] = unapply(v.asJava)
54+
55+
def unapply[T](v: rx.util.Timestamped[_ <: T]): Option[(Long, T)] = {
56+
Some((v.getTimestampMillis, v.getValue))
57+
}
4758
}
59+
4860
}

0 commit comments

Comments
 (0)