Skip to content

Commit bcdd8e1

Browse files
committed
run uses cancelling state & propagates exceptions when cancelled
* When coroutine that was switched into a different dispatcher using `run` is cancelled, the run invocation does not complete immediately, but waits until the body completes. * If the body completes with exception, then this exception is propagated. Fixes #147
1 parent 6aed878 commit bcdd8e1

File tree

4 files changed

+75
-65
lines changed

4 files changed

+75
-65
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.atomicfu.atomic
2020
import kotlinx.atomicfu.loop
2121
import kotlin.coroutines.experimental.Continuation
22-
22+
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2323

2424
private const val UNDECIDED = 0
2525
private const val SUSPENDED = 1
@@ -29,9 +29,9 @@ private const val RESUMED = 2
2929
* @suppress **This is unstable API and it is subject to change.**
3030
*/
3131
internal abstract class AbstractContinuation<in T>(
32-
active: Boolean,
32+
@JvmField protected val delegate: Continuation<T>,
3333
@JvmField protected val resumeMode: Int
34-
) : JobSupport(active), Continuation<T> {
34+
) : JobSupport(true), Continuation<T> {
3535
private val _decision = atomic(UNDECIDED)
3636

3737
/* decision state machine
@@ -49,7 +49,7 @@ internal abstract class AbstractContinuation<in T>(
4949
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
5050
*/
5151

52-
protected fun trySuspend(): Boolean {
52+
private fun trySuspend(): Boolean {
5353
_decision.loop { decision ->
5454
when (decision) {
5555
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
@@ -59,7 +59,7 @@ internal abstract class AbstractContinuation<in T>(
5959
}
6060
}
6161

62-
protected fun tryResume(): Boolean {
62+
private fun tryResume(): Boolean {
6363
_decision.loop { decision ->
6464
when (decision) {
6565
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
@@ -69,32 +69,48 @@ internal abstract class AbstractContinuation<in T>(
6969
}
7070
}
7171

72-
override fun resume(value: T) = resumeImpl(value, resumeMode)
72+
@PublishedApi
73+
internal fun getResult(): Any? {
74+
if (trySuspend()) return COROUTINE_SUSPENDED
75+
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
76+
val state = this.state
77+
if (state is CompletedExceptionally) throw state.exception
78+
return getSuccessfulResult(state)
79+
}
7380

74-
protected fun resumeImpl(value: T, resumeMode: Int) {
75-
loopOnState { state ->
76-
when (state) {
77-
is Incomplete -> if (updateState(state, value, resumeMode)) return
78-
is Cancelled -> return // ignore resumes on cancelled continuation
79-
else -> error("Already resumed, but got value $value")
80-
}
81+
override fun afterCompletion(state: Any?, mode: Int) {
82+
if (tryResume()) return // completed before getResult invocation -- bail out
83+
// otherwise, getResult has already commenced, i.e. completed later or in other thread
84+
if (state is CompletedExceptionally) {
85+
delegate.resumeWithExceptionMode(state.exception, mode)
86+
} else {
87+
delegate.resumeMode(getSuccessfulResult(state), mode)
8188
}
8289
}
8390

84-
override fun resumeWithException(exception: Throwable) = resumeWithExceptionImpl(exception, resumeMode)
91+
@Suppress("UNCHECKED_CAST")
92+
protected open fun <T> getSuccessfulResult(state: Any?): T =
93+
state as T
94+
95+
override fun resume(value: T) =
96+
resumeImpl(value, resumeMode)
97+
98+
override fun resumeWithException(exception: Throwable) =
99+
resumeImpl(CompletedExceptionally(exception), resumeMode)
85100

86-
protected fun resumeWithExceptionImpl(exception: Throwable, resumeMode: Int) {
101+
protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
87102
loopOnState { state ->
88103
when (state) {
89104
is Incomplete -> {
90-
if (updateState(state, CompletedExceptionally(exception), resumeMode)) return
105+
if (updateState(state, proposedUpdate, resumeMode)) return
91106
}
92107
is Cancelled -> {
93-
// ignore resumes on cancelled continuation, but handle exception if a different one is here
94-
if (exception != state.exception) handleCoroutineException(context, exception)
108+
// Ignore resumes in cancelled coroutines, but handle exception if a different one here
109+
if (proposedUpdate is CompletedExceptionally && proposedUpdate.exception != state.exception)
110+
handleException(proposedUpdate.exception)
95111
return
96112
}
97-
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
113+
else -> error("Already resumed, but got $proposedUpdate")
98114
}
99115
}
100116
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.locks.LockSupport
2020
import kotlin.coroutines.experimental.*
21-
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2221
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2322
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2423

@@ -179,29 +178,15 @@ private class RunContinuationDirect<in T>(
179178
continuation: Continuation<T>
180179
) : Continuation<T> by continuation
181180

181+
182182
@Suppress("UNCHECKED_CAST")
183183
private class RunCompletion<in T>(
184184
override val context: CoroutineContext,
185-
private val delegate: Continuation<T>,
185+
delegate: Continuation<T>,
186186
resumeMode: Int
187-
) : AbstractContinuation<T>(true, resumeMode) {
188-
@PublishedApi
189-
internal fun getResult(): Any? {
190-
if (trySuspend()) return COROUTINE_SUSPENDED
191-
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
192-
val state = this.state
193-
if (state is CompletedExceptionally) throw state.exception
194-
return state as T
195-
}
196-
197-
override fun afterCompletion(state: Any?, mode: Int) {
198-
if (tryResume()) return // completed before getResult invocation -- bail out
199-
// otherwise, getResult has already commenced, i.e. completed later or in other thread
200-
if (state is CompletedExceptionally)
201-
delegate.resumeWithExceptionMode(state.exception, mode)
202-
else
203-
delegate.resumeMode(state as T, mode)
204-
}
187+
) : AbstractContinuation<T>(delegate, resumeMode) {
188+
override val hasCancellingState: Boolean
189+
get() = true
205190
}
206191

207192
private class BlockingCoroutine<T>(

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
2020
import kotlin.coroutines.experimental.Continuation
2121
import kotlin.coroutines.experimental.CoroutineContext
22-
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2322
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2423
import kotlin.coroutines.experimental.suspendCoroutine
2524

@@ -179,9 +178,9 @@ private class RemoveOnCancel(
179178

180179
@PublishedApi
181180
internal class CancellableContinuationImpl<in T>(
182-
private val delegate: Continuation<T>,
181+
delegate: Continuation<T>,
183182
resumeMode: Int
184-
) : AbstractContinuation<T>(true, resumeMode), CancellableContinuation<T> {
183+
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T> {
185184
@Volatile // just in case -- we don't want an extra data race, even benign one
186185
private var _context: CoroutineContext? = null // created on first need
187186

@@ -192,25 +191,6 @@ internal class CancellableContinuationImpl<in T>(
192191
initParentJob(delegate.context[Job])
193192
}
194193

195-
@PublishedApi
196-
internal fun getResult(): Any? {
197-
if (trySuspend()) return COROUTINE_SUSPENDED
198-
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
199-
val state = this.state
200-
if (state is CompletedExceptionally) throw state.exception
201-
return getSuccessfulResult(state)
202-
}
203-
204-
override fun afterCompletion(state: Any?, mode: Int) {
205-
if (tryResume()) return // completed before getResult invocation -- bail out
206-
// otherwise, getResult has already commenced, i.e. completed later or in other thread
207-
if (state is CompletedExceptionally) {
208-
delegate.resumeWithExceptionMode(state.exception, mode)
209-
} else {
210-
delegate.resumeMode(getSuccessfulResult<T>(state), mode)
211-
}
212-
}
213-
214194
override fun tryResume(value: T, idempotent: Any?): Any? {
215195
while (true) { // lock-free loop on state
216196
val state = this.state // atomic read
@@ -255,9 +235,13 @@ internal class CancellableContinuationImpl<in T>(
255235

256236
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
257237
val dc = delegate as? DispatchedContinuation
258-
resumeWithExceptionImpl(exception, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
238+
resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
259239
}
260240

241+
@Suppress("UNCHECKED_CAST")
242+
override fun <T> getSuccessfulResult(state: Any?): T =
243+
if (state is CompletedIdempotentResult) state.result as T else state as T
244+
261245
override fun nameString(): String =
262246
"CancellableContinuation(${delegate.toDebugString()})"
263247
}
@@ -270,6 +254,3 @@ private class CompletedIdempotentResult(
270254
override fun toString(): String = "CompletedIdempotentResult[$result]"
271255
}
272256

273-
@Suppress("UNCHECKED_CAST")
274-
private fun <T> getSuccessfulResult(state: Any?): T =
275-
if (state is CompletedIdempotentResult) state.result as T else state as T

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental
1919
import org.hamcrest.MatcherAssert.assertThat
2020
import org.hamcrest.core.IsEqual
2121
import org.junit.Test
22+
import java.io.IOException
2223
import kotlin.coroutines.experimental.ContinuationInterceptor
2324
import kotlin.coroutines.experimental.CoroutineContext
2425

@@ -168,6 +169,33 @@ class RunTest : TestBase() {
168169
}
169170
}
170171

172+
@Test
173+
fun testRunWithException() = runTest {
174+
expect(1)
175+
var job: Job? = null
176+
job = launch(coroutineContext) {
177+
try {
178+
expect(3)
179+
run(wrapperDispatcher(coroutineContext)) {
180+
expect(5)
181+
job!!.cancel() // cancel itself
182+
throw IOException() // but throw a different exception
183+
}
184+
} catch (e: Throwable) {
185+
expect(7)
186+
// make sure IOException, not CancellationException is thrown!
187+
check(e is IOException)
188+
}
189+
}
190+
expect(2)
191+
yield() // to the launched job
192+
expect(4)
193+
yield() // again to the job
194+
expect(6)
195+
yield() // again to exception handler
196+
finish(8)
197+
}
198+
171199
private fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
172200
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
173201
return object : CoroutineDispatcher() {

0 commit comments

Comments
 (0)