Skip to content

Commit 1b9cbf4

Browse files
committed
awaitFirst on reactive streams should not try to resume again when
the stream is invoking onNext after cancel (it is allowed to do so)
1 parent 3ef4fca commit 1b9cbf4

File tree

2 files changed

+10
-6
lines changed
  • reactive
    • kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive
    • kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2

2 files changed

+10
-6
lines changed

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ private suspend fun <T> Publisher<T>.awaitOne(
9898
override fun onNext(t: T) {
9999
when (mode) {
100100
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
101-
seenValue = true
102-
cont.resume(t)
103-
subscription.cancel()
101+
if (!seenValue) {
102+
seenValue = true
103+
cont.resume(t)
104+
subscription.cancel()
105+
}
104106
}
105107
Mode.LAST, Mode.SINGLE -> {
106108
if (mode == Mode.SINGLE && seenValue) {

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,11 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
169169
override fun onNext(t: T) {
170170
when (mode) {
171171
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
172-
seenValue = true
173-
cont.resume(t)
174-
subscription.dispose()
172+
if (!seenValue) {
173+
seenValue = true
174+
cont.resume(t)
175+
subscription.dispose()
176+
}
175177
}
176178
Mode.LAST, Mode.SINGLE -> {
177179
if (mode == Mode.SINGLE && seenValue) {

0 commit comments

Comments
 (0)