@@ -2,6 +2,7 @@ package kotlinx.coroutines
2
2
3
3
import org.junit.Test
4
4
import rx.Observable
5
+ import java.util.concurrent.TimeUnit
5
6
import kotlin.test.assertEquals
6
7
import kotlin.test.fail
7
8
@@ -17,6 +18,18 @@ class AsyncRxTest {
17
18
}
18
19
}
19
20
21
+ @Test
22
+ fun testSingleWithDelay () {
23
+ val observable = asyncRx<String > {
24
+ Observable .timer(50 , TimeUnit .MILLISECONDS ).map { " O" }.awaitSingle() + " K"
25
+ }
26
+
27
+ checkObservableWithSingleValue(observable) {
28
+ assertEquals(" OK" , it)
29
+ }
30
+ }
31
+
32
+
20
33
@Test
21
34
fun testSingleException () {
22
35
val observable = asyncRx<String > {
@@ -117,26 +130,15 @@ class AsyncRxTest {
117
130
observable : Observable <* >,
118
131
checker : (Throwable ) -> Unit
119
132
) {
120
- var onErrorCalled = false
121
- observable.subscribe({ fail(" Next item on erroneous observable" ) }) {
122
- checker(it)
123
- onErrorCalled = true
124
- }
125
-
126
- assert (onErrorCalled)
133
+ val singleNotification = observable.materialize().toBlocking().single()
134
+ checker(singleNotification.throwable)
127
135
}
128
136
129
137
private fun <T > checkObservableWithSingleValue (
130
138
observable : Observable <T >,
131
139
checker : (T ) -> Unit
132
140
) {
133
- var subscribeCalled = false
134
-
135
- observable.single().subscribe {
136
- checker(it)
137
- subscribeCalled = true
138
- }
139
-
140
- assert (subscribeCalled)
141
+ val singleValue = observable.toBlocking().single()
142
+ checker(singleValue)
141
143
}
142
144
}
0 commit comments