@@ -11,16 +11,15 @@ import org.junit.Assert._
11
11
12
12
// @Ignore // Since this doesn't do automatic testing.
13
13
class RxScalaDemo extends JUnitSuite {
14
-
15
- def output (s : String ): Unit = println(s)
16
-
17
- def sleep (ms : Long ): Unit = Thread .sleep(ms)
18
-
14
+
19
15
@ Test def intervalExample () {
20
- println(" hello" )
21
- Observable .interval(200 millis).take(5 ).subscribe((n : Long ) => println(" n = " + n))
22
- // need to sleep here because otherwise JUnit kills the thread created by interval()
23
- sleep(1200 )
16
+ val o = Observable .interval(200 millis).take(5 )
17
+ o.subscribe(n => println(" n = " + n))
18
+
19
+ // need to wait here because otherwise JUnit kills the thread created by interval()
20
+ waitFor(o)
21
+
22
+ println(" done" )
24
23
}
25
24
26
25
def msTicks (start : Long , step : Long ): Observable [Long ] = {
@@ -33,15 +32,17 @@ class RxScalaDemo extends JUnitSuite {
33
32
}
34
33
35
34
@ Test def testTicks () {
36
- prefixedTicks(5000 , 500 , " t = " ).take(5 ).subscribe(output(_))
37
- sleep(3000 )
35
+ val o = prefixedTicks(5000 , 500 , " t = " ).take(5 )
36
+ o.subscribe(output(_))
37
+ waitFor(o)
38
38
}
39
39
40
40
@ Test def testSwitch () {
41
41
// We do not have ultimate precision: Sometimes, 747 gets through, sometimes not
42
42
val o = Observable .interval(1000 millis).map(n => prefixedTicks(0 , 249 , s " Observable# $n: " ))
43
- o.switch.take(16 ).subscribe(output(_))
44
- sleep(5000 )
43
+ .switch.take(16 )
44
+ o.subscribe(output(_))
45
+ waitFor(o)
45
46
}
46
47
47
48
@ Test def testSwitchOnObservableOfInt () {
@@ -87,9 +88,9 @@ class RxScalaDemo extends JUnitSuite {
87
88
@ Test def mergeExample () {
88
89
val slowNumbers = Observable .interval(400 millis).take(5 ).map(" slow " + _)
89
90
val fastNumbers = Observable .interval(200 millis).take(10 ).map(" fast " + _)
90
-
91
- (slowNumbers merge fastNumbers) .subscribe(output(_))
92
- sleep( 2500 )
91
+ val o = (slowNumbers merge fastNumbers)
92
+ o .subscribe(output(_))
93
+ waitFor(o )
93
94
}
94
95
95
96
@ Test def rangeAndBufferExample () {
@@ -108,4 +109,11 @@ class RxScalaDemo extends JUnitSuite {
108
109
assertEquals(10 , Observable (1 , 2 , 3 , 4 ).reduce(_ + _).toBlockingObservable.single)
109
110
}
110
111
112
+ def output (s : String ): Unit = println(s)
113
+
114
+ // blocks until obs has completed
115
+ def waitFor [T ](obs : Observable [T ]): Unit = {
116
+ obs.toBlockingObservable.last
117
+ }
118
+
111
119
}
0 commit comments