Skip to content

Commit 67449fb

Browse files
allow to construct Observables ina similar way as futures
1 parent 6635e61 commit 67449fb

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,40 @@ class RxScalaDemo extends JUnitSuite {
420420
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
421421
}
422422

423+
@Test def observableLikeFuture1() {
424+
implicit val scheduler = Schedulers.threadPoolForIO
425+
val o1 = observable {
426+
Thread.sleep(1000)
427+
5
428+
}
429+
val o2 = observable {
430+
Thread.sleep(500)
431+
4
432+
}
433+
Thread.sleep(500)
434+
val t1 = System.currentTimeMillis
435+
println((o1 merge o2).first.toBlockingObservable.single)
436+
println(System.currentTimeMillis - t1)
437+
}
438+
439+
@Test def observableLikeFuture2() {
440+
class Friend {}
441+
val session = new Object {
442+
def getFriends: List[Friend] = List(new Friend, new Friend)
443+
}
444+
445+
implicit val scheduler = Schedulers.threadPoolForIO
446+
val o: Observable[List[Friend]] = observable {
447+
session.getFriends
448+
}
449+
o.subscribe(
450+
friendList => println(friendList),
451+
err => println(err.getMessage)
452+
)
453+
454+
Thread.sleep(1500) // or convert to BlockingObservable
455+
}
456+
423457
def output(s: String): Unit = println(s)
424458

425459
// blocks until obs has completed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,26 @@ package object scala {
114114
type Subscription = rx.Subscription
115115

116116
//#endif
117-
117+
118+
/**
119+
* Allows to construct observables in a similar way as futures.
120+
*
121+
* Example:
122+
*
123+
* {{{
124+
* implicit val scheduler = Schedulers.threadPoolForIO
125+
* val o: Observable[List[Friend]] = observable {
126+
* session.getFriends
127+
* }
128+
* o.subscribe(
129+
* friendList => println(friendList),
130+
* err => println(err.getMessage)
131+
* )
132+
* }}}
133+
*/
134+
def observable[T](body: => T)(implicit scheduler: Scheduler): Observable[T] = {
135+
Observable(1).observeOn(scheduler).map(_ => body)
136+
}
118137
}
119138

120139
/*

0 commit comments

Comments
 (0)