Skip to content

Commit 4a67afb

Browse files
committed
Implement awaitFirstOrDefault for RxJava2 and reactive streams, with better docs and tests
1 parent da861a7 commit 4a67afb

File tree

11 files changed

+226
-38
lines changed

11 files changed

+226
-38
lines changed

reactive/kotlinx-coroutines-reactive/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Suspending extension functions and suspending iteration:
1313
| **Name** | **Description**
1414
| -------- | ---------------
1515
| [Publisher.awaitFirst][org.reactivestreams.Publisher.awaitFirst] | Returns the first value from the given publisher
16+
| [Publisher.awaitFirstOrDefault][org.reactivestreams.Publisher.awaitFirstOrDefault] | Returns the first value from the given publisher or default
1617
| [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher
1718
| [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher
1819
| [Publisher.open][org.reactivestreams.Publisher.open] | Subscribes to publisher and returns [ReceiveChannel]
@@ -36,6 +37,7 @@ Conversion functions:
3637
<!--- INDEX kotlinx.coroutines.experimental.reactive -->
3738
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
3839
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first.html
40+
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-default.html
3941
[org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-single.html
4042
[org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open.html
4143
[org.reactivestreams.Publisher.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/iterator.html

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,30 @@ import org.reactivestreams.Subscription
3030
* This suspending function is cancellable.
3131
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
3232
* immediately resumes with [CancellationException].
33+
*
34+
* @throws NoSuchElementException if publisher does not emit any value
3335
*/
3436
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
3537

38+
/**
39+
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
40+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
41+
*
42+
* This suspending function is cancellable.
43+
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
44+
* immediately resumes with [CancellationException].
45+
*/
46+
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
47+
3648
/**
3749
* Awaits for the last value from the given publisher without blocking a thread and
3850
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
3951
*
4052
* This suspending function is cancellable.
4153
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
4254
* immediately resumes with [CancellationException].
55+
*
56+
* @throws NoSuchElementException if publisher does not emit any value
4357
*/
4458
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
4559

@@ -50,19 +64,26 @@ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
5064
* This suspending function is cancellable.
5165
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
5266
* immediately resumes with [CancellationException].
67+
*
68+
* @throws NoSuchElementException if publisher does not emit any value
69+
* @throws IllegalArgumentException if publisher emits more than one value
5370
*/
5471
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
5572

5673
// ------------------------ private ------------------------
5774

5875
private enum class Mode(val s: String) {
5976
FIRST("awaitFirst"),
77+
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
6078
LAST("awaitLast"),
6179
SINGLE("awaitSingle");
6280
override fun toString(): String = s
6381
}
6482

65-
private suspend fun <T> Publisher<T>.awaitOne(mode: Mode): T = suspendCancellableCoroutine { cont ->
83+
private suspend fun <T> Publisher<T>.awaitOne(
84+
mode: Mode,
85+
default: T? = null
86+
): T = suspendCancellableCoroutine { cont ->
6687
subscribe(object : Subscriber<T> {
6788
private lateinit var subscription: Subscription
6889
private var value: T? = null
@@ -76,7 +97,7 @@ private suspend fun <T> Publisher<T>.awaitOne(mode: Mode): T = suspendCancellabl
7697

7798
override fun onNext(t: T) {
7899
when (mode) {
79-
Mode.FIRST -> {
100+
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
80101
seenValue = true
81102
cont.resume(t)
82103
subscription.cancel()
@@ -95,13 +116,18 @@ private suspend fun <T> Publisher<T>.awaitOne(mode: Mode): T = suspendCancellabl
95116
}
96117

97118
override fun onComplete() {
98-
if (!seenValue) {
99-
if (cont.isActive)
100-
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
119+
if (seenValue) {
120+
if (cont.isActive) cont.resume(value as T)
101121
return
102122
}
103-
if (!cont.isActive) return // was already resumed
104-
cont.resume(value as T)
123+
when {
124+
mode == Mode.FIRST_OR_DEFAULT -> {
125+
cont.resume(default as T)
126+
}
127+
cont.isActive -> {
128+
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
129+
}
130+
}
105131
}
106132

107133
override fun onError(e: Throwable) {

reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package kotlinx.coroutines.experimental.reactive
1818

1919
import kotlinx.coroutines.experimental.*
20-
import kotlinx.coroutines.experimental.channels.TestChannelKind
2120
import org.hamcrest.MatcherAssert.assertThat
2221
import org.hamcrest.core.IsEqual
2322
import org.hamcrest.core.IsInstanceOf
@@ -51,13 +50,31 @@ class IntegrationTest(
5150
}
5251
}
5352

53+
@Test
54+
fun testEmpty(): Unit = runBlocking {
55+
val pub = publish<String>(ctx(context)) {
56+
if (delay) delay(1)
57+
// does not send anything
58+
}
59+
assertNSE { pub.awaitFirst() }
60+
assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
61+
assertNSE { pub.awaitLast() }
62+
assertNSE { pub.awaitSingle() }
63+
var cnt = 0
64+
for (t in pub) {
65+
cnt++
66+
}
67+
assertThat(cnt, IsEqual(0))
68+
}
69+
5470
@Test
5571
fun testSingle() = runBlocking<Unit> {
5672
val pub = publish<String>(ctx(context)) {
5773
if (delay) delay(1)
5874
send("OK")
5975
}
6076
assertThat(pub.awaitFirst(), IsEqual("OK"))
77+
assertThat(pub.awaitFirstOrDefault("!"), IsEqual("OK"))
6178
assertThat(pub.awaitLast(), IsEqual("OK"))
6279
assertThat(pub.awaitSingle(), IsEqual("OK"))
6380
var cnt = 0
@@ -78,13 +95,9 @@ class IntegrationTest(
7895
}
7996
}
8097
assertThat(pub.awaitFirst(), IsEqual(1))
98+
assertThat(pub.awaitFirstOrDefault(0), IsEqual(1))
8199
assertThat(pub.awaitLast(), IsEqual(n))
82-
try {
83-
pub.awaitSingle()
84-
expectUnreached()
85-
} catch (e: Throwable) {
86-
assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
87-
}
100+
assertIAE { pub.awaitSingle() }
88101
checkNumbers(n, pub)
89102
val channel = pub.open()
90103
checkNumbers(n, channel.asPublisher(ctx(context)))
@@ -98,4 +111,22 @@ class IntegrationTest(
98111
}
99112
assertThat(last, IsEqual(n))
100113
}
114+
115+
inline fun assertIAE(block: () -> Unit) {
116+
try {
117+
block()
118+
expectUnreached()
119+
} catch (e: Throwable) {
120+
assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
121+
}
122+
}
123+
124+
inline fun assertNSE(block: () -> Unit) {
125+
try {
126+
block()
127+
expectUnreached()
128+
} catch (e: Throwable) {
129+
assertThat(e, IsInstanceOf(NoSuchElementException::class.java))
130+
}
131+
}
101132
}

reactive/kotlinx-coroutines-rx1/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Suspending extension functions and suspending iteration:
1616
| -------- | ---------------
1717
| [Single.await][rx.Single.await] | Awaits for completion of the single value and returns it
1818
| [Observable.awaitFirst][rx.Observable.awaitFirst] | Returns the first value from the given observable
19+
| [Observable.awaitFirstOrDefault][rx.Observable.awaitFirstOrDefault] | Returns the first value from the given observable or default
1920
| [Observable.awaitLast][rx.Observable.awaitFirst] | Returns the last value from the given observable
2021
| [Observable.awaitSingle][rx.Observable.awaitSingle] | Returns the single value from the given observable
2122
| [Observable.open][rx.Observable.open] | Subscribes to observable and returns [ReceiveChannel]
@@ -45,6 +46,7 @@ Conversion functions:
4546
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-observable.html
4647
[rx.Single.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-single/await.html
4748
[rx.Observable.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first.html
49+
[rx.Observable.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-default.html
4850
[rx.Observable.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-single.html
4951
[rx.Observable.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/open.html
5052
[rx.Observable.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/iterator.html

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont
4848
* This suspending function is cancellable.
4949
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
5050
* immediately resumes with [CancellationException].
51+
*
52+
* @throws NoSuchElementException if observable does not emit any value
5153
*/
5254
public suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
5355

@@ -68,6 +70,8 @@ public suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstO
6870
* This suspending function is cancellable.
6971
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
7072
* immediately resumes with [CancellationException].
73+
*
74+
* @throws NoSuchElementException if observable does not emit any value
7175
*/
7276
public suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
7377

@@ -78,6 +82,9 @@ public suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
7882
* This suspending function is cancellable.
7983
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
8084
* immediately resumes with [CancellationException].
85+
*
86+
* @throws NoSuchElementException if observable does not emit any value
87+
* @throws IllegalArgumentException if publisher emits more than one value
8188
*/
8289
public suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
8390

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package kotlinx.coroutines.experimental.rx1
1818

1919
import kotlinx.coroutines.experimental.*
20-
import kotlinx.coroutines.experimental.channels.TestChannelKind
2120
import org.hamcrest.MatcherAssert.assertThat
2221
import org.hamcrest.core.IsEqual
2322
import org.hamcrest.core.IsInstanceOf
@@ -51,6 +50,23 @@ class IntegrationTest(
5150
}
5251
}
5352

53+
@Test
54+
fun testEmpty(): Unit = runBlocking {
55+
val pub = rxObservable<String>(ctx(context)) {
56+
if (delay) delay(1)
57+
// does not send anything
58+
}
59+
assertNSE { pub.awaitFirst() }
60+
assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
61+
assertNSE { pub.awaitLast() }
62+
assertNSE { pub.awaitSingle() }
63+
var cnt = 0
64+
for (t in pub) {
65+
cnt++
66+
}
67+
assertThat(cnt, IsEqual(0))
68+
}
69+
5470
@Test
5571
fun testSingle() = runBlocking<Unit> {
5672
val observable = rxObservable<String>(ctx(context)) {
@@ -79,12 +95,7 @@ class IntegrationTest(
7995
}
8096
assertThat(observable.awaitFirst(), IsEqual(1))
8197
assertThat(observable.awaitLast(), IsEqual(n))
82-
try {
83-
observable.awaitSingle()
84-
expectUnreached()
85-
} catch (e: Throwable) {
86-
assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
87-
}
98+
assertIAE { observable.awaitSingle() }
8899
checkNumbers(n, observable)
89100
val channel = observable.open()
90101
checkNumbers(n, channel.asObservable(ctx(context)))
@@ -98,4 +109,22 @@ class IntegrationTest(
98109
}
99110
assertThat(last, IsEqual(n))
100111
}
112+
113+
inline fun assertIAE(block: () -> Unit) {
114+
try {
115+
block()
116+
expectUnreached()
117+
} catch (e: Throwable) {
118+
assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
119+
}
120+
}
121+
122+
inline fun assertNSE(block: () -> Unit) {
123+
try {
124+
block()
125+
expectUnreached()
126+
} catch (e: Throwable) {
127+
assertThat(e, IsInstanceOf(NoSuchElementException::class.java))
128+
}
129+
}
101130
}

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,29 @@ class ObservableSingleTest {
9191
@Test
9292
fun testAwaitFirst() {
9393
val observable = rxObservable(CommonPool) {
94-
send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
94+
send(Observable.just("O", "#").awaitFirst() + "K")
9595
}
9696

9797
checkSingleValue(observable) {
9898
assertEquals("OK", it)
9999
}
100100
}
101101

102-
103102
@Test
104103
fun testAwaitFirstOrDefault() {
105104
val observable = rxObservable(CommonPool) {
106-
send(Observable.just("O", "#").awaitFirst() + "K")
105+
send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
106+
}
107+
108+
checkSingleValue(observable) {
109+
assertEquals("OK", it)
110+
}
111+
}
112+
113+
@Test
114+
fun testAwaitFirstOrDefaultWithValues() {
115+
val observable = rxObservable(CommonPool) {
116+
send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
107117
}
108118

109119
checkSingleValue(observable) {

reactive/kotlinx-coroutines-rx2/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Suspending extension functions and suspending iteration:
1818
| [CompletableSource.await][io.reactivex.CompletableSource.await] | Awaits for completion of the completable value
1919
| [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it
2020
| [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
21+
| [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
2122
| [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
2223
| [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
2324
| [ObservableSource.open][io.reactivex.ObservableSource.open] | Subscribes to observable and returns [ReceiveChannel]
@@ -55,6 +56,7 @@ Conversion functions:
5556
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-completable-source/await.html
5657
[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-single-source/await.html
5758
[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first.html
59+
[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-default.html
5860
[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-single.html
5961
[io.reactivex.ObservableSource.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/open.html
6062
[io.reactivex.ObservableSource.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/iterator.html

0 commit comments

Comments
 (0)