File tree Expand file tree Collapse file tree 2 files changed +22
-0
lines changed
reactive/kotlinx-coroutines-rx1/src
main/kotlin/kotlinx/coroutines/experimental/rx1
test/kotlin/kotlinx/coroutines/experimental/rx1 Expand file tree Collapse file tree 2 files changed +22
-0
lines changed Original file line number Diff line number Diff line change @@ -51,6 +51,16 @@ public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont
51
51
*/
52
52
public suspend fun <T > Observable<T>.awaitFirst (): T = first().awaitOne()
53
53
54
+ /* *
55
+ * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
56
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
57
+ *
58
+ * This suspending function is cancellable.
59
+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
60
+ * immediately resumes with [CancellationException].
61
+ */
62
+ public suspend fun <T > Observable<T>.awaitFirstOrDefault (default : T ): T = firstOrDefault(default).awaitOne()
63
+
54
64
/* *
55
65
* Awaits for the last value from the given observable without blocking a thread and
56
66
* returns the resulting value or throws the corresponding exception if this observable had produced error.
Original file line number Diff line number Diff line change @@ -90,6 +90,18 @@ class ObservableSingleTest {
90
90
91
91
@Test
92
92
fun testAwaitFirst () {
93
+ val observable = rxObservable(CommonPool ) {
94
+ send(Observable .empty<String >().awaitFirstOrDefault(" O" ) + " K" )
95
+ }
96
+
97
+ checkSingleValue(observable) {
98
+ assertEquals(" OK" , it)
99
+ }
100
+ }
101
+
102
+
103
+ @Test
104
+ fun testAwaitFirstOrDefault () {
93
105
val observable = rxObservable(CommonPool ) {
94
106
send(Observable .just(" O" , " #" ).awaitFirst() + " K" )
95
107
}
You can’t perform that action at this time.
0 commit comments