Skip to content

Commit 7f2ed2f

Browse files
qwwdfsadelizarov
authored andcommitted
Report exception on cancellation in AbstractContinuation, do not swallow exceptions on cancellation in rx1
1 parent 08bda30 commit 7f2ed2f

File tree

6 files changed

+107
-8
lines changed

6 files changed

+107
-8
lines changed

common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,16 @@ internal abstract class AbstractContinuation<in T>(
277277
}
278278
is CancelledContinuation -> {
279279
/*
280-
* If continuation was cancelled, then all further updates (resumes or exceptions) must be
281-
* ignored, because cancellation is asynchronous and may race with resume/resumeWithException.
282-
* This race is normal.
280+
* If continuation was cancelled, then all further resumes must be
281+
* ignored, because cancellation is asynchronous and may race with resume.
282+
* Racy exception are reported so no exceptions are lost
283283
*
284284
* :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt.
285285
*/
286+
if (proposedUpdate is CompletedExceptionally) {
287+
handleException(proposedUpdate.cause)
288+
}
289+
286290
return
287291
}
288292
else -> error("Already resumed, but proposed with update $proposedUpdate")

common/kotlinx-coroutines-core-common/test/CancellableContinuationTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/*
22
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
4+
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
45

56
package kotlinx.coroutines.experimental
67

@@ -71,7 +72,7 @@ class CancellableContinuationTest : TestBase() {
7172
* should be ignored. Here suspended coroutine is cancelled but then resumed with exception.
7273
*/
7374
@Test
74-
fun testCancelAndResumeWithException() = runTest {
75+
fun testCancelAndResumeWithException() = runTest(unhandled = listOf({e -> e is TestException})) {
7576
var continuation: Continuation<Unit>? = null
7677
val job = launch(coroutineContext) {
7778
try {

reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class IntegrationTest(
3232
@JvmStatic
3333
fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
3434
listOf(false, true).map { delay ->
35-
arrayOf<Any>(ctx, delay)
35+
arrayOf(ctx, delay)
3636
}
3737
}
3838
}
@@ -95,6 +95,34 @@ class IntegrationTest(
9595
channel.cancel()
9696
}
9797

98+
@Test
99+
fun testCancelWithoutValue() = runTest {
100+
val job = launch(coroutineContext, parent = Job(), start = CoroutineStart.UNDISPATCHED) {
101+
publish<String>(coroutineContext) {
102+
yield()
103+
expectUnreached()
104+
}.awaitFirst()
105+
}
106+
107+
job.cancel()
108+
job.join()
109+
}
110+
111+
@Test
112+
fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
113+
expect(1)
114+
val job = launch(coroutineContext, parent = Job(), start = CoroutineStart.UNDISPATCHED) {
115+
publish<String>(coroutineContext) {
116+
yield()
117+
expect(2)
118+
// Nothing to emit
119+
}.awaitFirst()
120+
}
121+
122+
job.join()
123+
finish(3)
124+
}
125+
98126
private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
99127
var last = 0
100128
pub.consumeEach {

reactive/kotlinx-coroutines-rx1/src/RxAwait.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,17 @@ private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutin
120120
override fun onStart() { request(1) }
121121
override fun onNext(t: T) { cont.resume(t) }
122122
override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
123-
override fun onError(e: Throwable) { cont.resumeWithException(e) }
123+
override fun onError(e: Throwable) {
124+
/*
125+
* Rx1 observable throws NoSuchElementException if cancellation happened before
126+
* element emission. To mitigate this we try to atomically resume continuation with exception:
127+
* if resume failed, then we know that continuation successfully cancelled itself
128+
*/
129+
val token = cont.tryResumeWithException(e)
130+
if (token != null) {
131+
cont.completeResume(token)
132+
}
133+
}
124134
}))
125135
}
126136

reactive/kotlinx-coroutines-rx1/test/IntegrationTest.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class IntegrationTest(
3232
@JvmStatic
3333
fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
3434
listOf(false, true).map { delay ->
35-
arrayOf<Any>(ctx, delay)
35+
arrayOf(ctx, delay)
3636
}
3737
}
3838
}
@@ -117,6 +117,34 @@ class IntegrationTest(
117117
channel.cancel()
118118
}
119119

120+
@Test
121+
fun testCancelWithoutValue() = runTest {
122+
val job = launch(coroutineContext, parent = Job(), start = CoroutineStart.UNDISPATCHED) {
123+
rxObservable<String>(coroutineContext) {
124+
yield()
125+
expectUnreached()
126+
}.awaitFirst()
127+
}
128+
129+
job.cancel()
130+
job.join()
131+
}
132+
133+
@Test
134+
fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
135+
expect(1)
136+
val job = launch(coroutineContext, parent = Job(), start = CoroutineStart.UNDISPATCHED) {
137+
rxObservable<String>(coroutineContext) {
138+
yield()
139+
expect(2)
140+
// Nothing to emit
141+
}.awaitFirst()
142+
}
143+
144+
job.join()
145+
finish(3)
146+
}
147+
120148
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
121149
var last = 0
122150
observable.consumeEach {

reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class IntegrationTest(
3232
@JvmStatic
3333
fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
3434
listOf(false, true).map { delay ->
35-
arrayOf<Any>(ctx, delay)
35+
arrayOf(ctx, delay)
3636
}
3737
}
3838
}
@@ -97,6 +97,34 @@ class IntegrationTest(
9797
channel.cancel()
9898
}
9999

100+
@Test
101+
fun testCancelWithoutValue() = runTest {
102+
val job = launch(coroutineContext, parent = Job(), start = CoroutineStart.UNDISPATCHED) {
103+
rxObservable<String>(coroutineContext) {
104+
yield()
105+
expectUnreached()
106+
}.awaitFirst()
107+
}
108+
109+
job.cancel()
110+
job.join()
111+
}
112+
113+
@Test
114+
fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
115+
expect(1)
116+
val job = launch(coroutineContext, parent = Job(), start = CoroutineStart.UNDISPATCHED) {
117+
rxObservable<String>(coroutineContext) {
118+
yield()
119+
expect(2)
120+
// Nothing to emit
121+
}.awaitFirst()
122+
}
123+
124+
job.join()
125+
finish(3)
126+
}
127+
100128
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
101129
var last = 0
102130
observable.consumeEach {

0 commit comments

Comments
 (0)