Skip to content

Commit a74eb5f

Browse files
committed
launched jobs and await can be cancelled while waiting in dispatch queue
* suspendAtomicCancellableCoroutine function is introduced for funs like send/receive/receiveOrNull that require atomic cancellation (they cannot be cancelled after decision was made) * Coroutines started with default mode (CoroutineStart.ATOMIC) using async/launch/actor builders can be cancelled before execution starts. * CoroutineStart.ATOMIC is introduced as a start mode to specify that coroutine cannot be cancelled before its execution start.
1 parent 8385ec9 commit a74eb5f

File tree

20 files changed

+539
-85
lines changed

20 files changed

+539
-85
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2019
import java.util.concurrent.locks.LockSupport
2120
import kotlin.coroutines.experimental.*
2221
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
@@ -146,7 +145,7 @@ private class LazyStandaloneCoroutine(
146145
private val block: suspend CoroutineScope.() -> Unit
147146
) : StandaloneCoroutine(parentContext, active = false) {
148147
override fun onStart() {
149-
block.startCoroutine(this, this)
148+
block.startCoroutineCancellable(this, this)
150149
}
151150
}
152151

@@ -158,7 +157,7 @@ private class RunContinuationDirect<in T>(
158157
private class RunContinuationCoroutine<in T>(
159158
override val parentContext: CoroutineContext,
160159
continuation: Continuation<T>
161-
) : CancellableContinuationImpl<T>(continuation, active = true)
160+
) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = MODE_CANCELLABLE, active = true)
162161

163162
private class BlockingCoroutine<T>(
164163
override val parentContext: CoroutineContext,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.coroutines.experimental.Continuation
20+
import kotlin.coroutines.experimental.intrinsics.createCoroutineUnchecked
21+
22+
/**
23+
* Use this function to start coroutine in a cancellable way, so that it can be cancelled
24+
* while waiting to be dispatched.
25+
*
26+
* @suppress **This is unstable API and it is subject to change.**
27+
*/
28+
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
29+
createCoroutineUnchecked(completion).resumeCancellable(Unit)
30+
31+
/**
32+
* Use this function to start coroutine in a cancellable way, so that it can be cancelled
33+
* while waiting to be dispatched.
34+
*
35+
* @suppress **This is unstable API and it is subject to change.**
36+
*/
37+
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
38+
createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)
39+

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
2020
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2121
import kotlin.coroutines.experimental.Continuation
2222
import kotlin.coroutines.experimental.CoroutineContext
23+
import kotlin.coroutines.experimental.createCoroutine
2324
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2425
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
26+
import kotlin.coroutines.experimental.intrinsics.createCoroutineUnchecked
2527
import kotlin.coroutines.experimental.suspendCoroutine
2628

2729
// --------------- cancellable continuations ---------------
@@ -111,23 +113,43 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
111113
}
112114

113115
/**
114-
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
116+
* Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
115117
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
116118
*
117119
* If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
118120
* cancellable until [CancellableContinuation.initCancellability] is invoked.
121+
*
122+
* See [suspendAtomicCancellableCoroutine] for suspending functions that need *atomic cancellation*.
119123
*/
120124
public inline suspend fun <T> suspendCancellableCoroutine(
121125
holdCancellability: Boolean = false,
122126
crossinline block: (CancellableContinuation<T>) -> Unit
123127
): T =
124128
suspendCoroutineOrReturn { cont ->
125-
val cancellable = CancellableContinuationImpl(cont, active = true)
129+
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_CANCELLABLE, active = true)
126130
if (!holdCancellability) cancellable.initCancellability()
127131
block(cancellable)
128132
cancellable.getResult()
129133
}
130134

135+
/**
136+
* Suspends coroutine similar to [suspendCancellableCoroutine], but with *atomic cancellation*.
137+
*
138+
* When suspended function throws [CancellationException] it means that the continuation was not resumed.
139+
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
140+
* continue to execute even after it was cancelled from the same thread in the case when the continuation
141+
* was already resumed and was posted for execution to the thread's queue.
142+
*/
143+
public inline suspend fun <T> suspendAtomicCancellableCoroutine(
144+
holdCancellability: Boolean = false,
145+
crossinline block: (CancellableContinuation<T>) -> Unit
146+
): T =
147+
suspendCoroutineOrReturn { cont ->
148+
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_ATOMIC_DEFAULT, active = true)
149+
if (!holdCancellability) cancellable.initCancellability()
150+
block(cancellable)
151+
cancellable.getResult()
152+
}
131153

132154
/**
133155
* Removes a given node on cancellation.
@@ -149,14 +171,16 @@ private class RemoveOnCancel(
149171
override fun toString() = "RemoveOnCancel[$node]"
150172
}
151173

152-
internal const val MODE_DISPATCHED = 0
153-
internal const val MODE_UNDISPATCHED = 1
154-
internal const val MODE_DIRECT = 2
174+
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
175+
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
176+
@PublishedApi internal const val MODE_DIRECT = 2 // when the context is right just invoke the delegate continuation direct
177+
@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
155178

156179
@PublishedApi
157180
internal open class CancellableContinuationImpl<in T>(
158181
@JvmField
159182
protected val delegate: Continuation<T>,
183+
override val defaultResumeMode: Int,
160184
active: Boolean
161185
) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
162186
@Volatile
@@ -239,17 +263,19 @@ internal open class CancellableContinuationImpl<in T>(
239263
if (state is CompletedExceptionally) {
240264
val exception = state.exception
241265
when (mode) {
242-
MODE_DISPATCHED -> delegate.resumeWithException(exception)
243-
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
266+
MODE_ATOMIC_DEFAULT -> delegate.resumeWithException(exception)
267+
MODE_CANCELLABLE -> delegate.resumeCancellableWithException(exception)
244268
MODE_DIRECT -> delegate.resumeDirectWithException(exception)
269+
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
245270
else -> error("Invalid mode $mode")
246271
}
247272
} else {
248273
val value = getSuccessfulResult<T>(state)
249274
when (mode) {
250-
MODE_DISPATCHED -> delegate.resume(value)
251-
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
275+
MODE_ATOMIC_DEFAULT -> delegate.resume(value)
276+
MODE_CANCELLABLE -> delegate.resumeCancellable(value)
252277
MODE_DIRECT -> delegate.resumeDirect(value)
278+
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
253279
else -> error("Invalid mode $mode")
254280
}
255281
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,37 +102,73 @@ public abstract class CoroutineDispatcher :
102102

103103
}
104104

105+
// named class for ease of debugging, better stack-traces and optimize the number of anonymous classes
106+
internal class DispatchTask<in T>(
107+
private val dispatched: DispatchedContinuation<T>,
108+
private val value: Any?, // T | Throwable
109+
private val exception: Boolean,
110+
private val cancellable: Boolean
111+
) : Runnable {
112+
@Suppress("UNCHECKED_CAST")
113+
override fun run() {
114+
val job = if (cancellable) dispatched.context[Job] else null
115+
when {
116+
job != null && job.isCompleted ->
117+
dispatched.resumeUndispatchedWithException(job.getCompletionException())
118+
exception -> dispatched.resumeUndispatchedWithException(value as Throwable)
119+
else -> dispatched.resumeUndispatched(value as T)
120+
}
121+
}
122+
123+
override fun toString(): String =
124+
"DispatchTask[$value, cancellable=$cancellable, $dispatched]"
125+
}
126+
105127
internal class DispatchedContinuation<in T>(
106128
@JvmField val dispatcher: CoroutineDispatcher,
107129
@JvmField val continuation: Continuation<T>
108130
): Continuation<T> by continuation {
109131
override fun resume(value: T) {
110132
val context = continuation.context
111133
if (dispatcher.isDispatchNeeded(context))
112-
dispatcher.dispatch(context, Runnable {
113-
resumeUndispatched(value)
114-
})
134+
dispatcher.dispatch(context, DispatchTask(this, value, exception = false, cancellable = false))
115135
else
116136
resumeUndispatched(value)
117137
}
118138

139+
override fun resumeWithException(exception: Throwable) {
140+
val context = continuation.context
141+
if (dispatcher.isDispatchNeeded(context))
142+
dispatcher.dispatch(context, DispatchTask(this, exception, exception = true, cancellable = false))
143+
else
144+
resumeUndispatchedWithException(exception)
145+
}
146+
119147
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
120-
inline fun resumeUndispatched(value: T) {
121-
withCoroutineContext(context) {
122-
continuation.resume(value)
123-
}
148+
inline fun resumeCancellable(value: T) {
149+
val context = continuation.context
150+
if (dispatcher.isDispatchNeeded(context))
151+
dispatcher.dispatch(context, DispatchTask(this, value, exception = false, cancellable = true))
152+
else
153+
resumeUndispatched(value)
124154
}
125155

126-
override fun resumeWithException(exception: Throwable) {
156+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
157+
inline fun resumeCancellableWithException(exception: Throwable) {
127158
val context = continuation.context
128159
if (dispatcher.isDispatchNeeded(context))
129-
dispatcher.dispatch(context, Runnable {
130-
resumeUndispatchedWithException(exception)
131-
})
160+
dispatcher.dispatch(context, DispatchTask(this, exception, exception = true, cancellable = true))
132161
else
133162
resumeUndispatchedWithException(exception)
134163
}
135164

165+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
166+
inline fun resumeUndispatched(value: T) {
167+
withCoroutineContext(context) {
168+
continuation.resume(value)
169+
}
170+
}
171+
136172
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
137173
inline fun resumeUndispatchedWithException(exception: Throwable) {
138174
withCoroutineContext(context) {
@@ -152,6 +188,18 @@ internal class DispatchedContinuation<in T>(
152188
}
153189
})
154190
}
191+
192+
override fun toString(): String = "DispatchedContinuation[$dispatcher, $continuation]"
193+
}
194+
195+
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
196+
is DispatchedContinuation -> resumeCancellable(value)
197+
else -> resume(value)
198+
}
199+
200+
internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
201+
is DispatchedContinuation -> resumeCancellableWithException(exception)
202+
else -> resumeWithException(exception)
155203
}
156204

157205
internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public abstract class AbstractCoroutine<in T>(
6767

6868
protected open fun createContext() = parentContext + this
6969

70-
protected open val defaultResumeMode: Int get() = MODE_DISPATCHED
70+
protected open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT
7171

7272
protected open val ignoreRepeatedResume: Boolean get() = false
7373

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ public enum class CoroutineStart {
3535
*
3636
* Note, that [Unconfined] dispatcher always returns `false` from its [CoroutineDispatcher.isDispatchNeeded]
3737
* function, so starting coroutine with [Unconfined] dispatcher by [DEFAULT] is the same as using [UNDISPATCHED].
38+
*
39+
* If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
40+
* execution at all, but complete with an exception.
41+
*
42+
* Cancellability of coroutine at suspension points depends on the particular implementation details of
43+
* suspending functions. Use [suspendCancellableCoroutine] to implement cancellable suspending functions.
3844
*/
3945
DEFAULT,
4046

@@ -43,9 +49,21 @@ public enum class CoroutineStart {
4349
*
4450
* See the documentation for the corresponding coroutine builders for details:
4551
* [launch], [async], and [actor][kotlinx.coroutines.experimental.channels.actor].
52+
*
53+
* If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
54+
* execution at all, but complete with an exception.
4655
*/
4756
LAZY,
4857

58+
/**
59+
* Atomically schedules coroutines for execution according to its context. This is similar to [DEFAULT],
60+
* but the coroutine cannot be cancelled before it starts executing.
61+
*
62+
* Cancellability of coroutine at suspension points depends on the particular implementation details of
63+
* suspending functions as in [DEFAULT].
64+
*/
65+
ATOMIC,
66+
4967
/**
5068
* Immediately executes coroutine until its first suspension point _in the current thread_ as if it the
5169
* coroutine was started using [Unconfined] dispatcher. However, when coroutine is resumed from suspension
@@ -56,13 +74,15 @@ public enum class CoroutineStart {
5674
/**
5775
* Starts the corresponding block as a coroutine with this coroutine start strategy.
5876
*
59-
* * [DEFAULT] uses [startCoroutine].
77+
* * [DEFAULT] uses [startCoroutineCancellable].
78+
* * [ATOMIC] uses [startCoroutine].
6079
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
6180
* * [LAZY] does nothing.
6281
*/
6382
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
6483
when (this) {
65-
CoroutineStart.DEFAULT -> block.startCoroutine(receiver, completion)
84+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
85+
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
6686
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
6787
CoroutineStart.LAZY -> Unit // will start lazily
6888
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,11 @@ private open class DeferredCoroutine<T>(
206206
@Suppress("UNCHECKED_CAST")
207207
internal fun <R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
208208
if (select.trySelect(idempotent = null)) {
209+
// Note: await is non-atomic (can be cancelled while dispatched)
209210
if (state is CompletedExceptionally)
210-
select.resumeSelectWithException(state.exception, MODE_DISPATCHED)
211+
select.resumeSelectWithException(state.exception, MODE_CANCELLABLE)
211212
else
212-
block.startCoroutine(state as T, select.completion)
213+
block.startCoroutineCancellable(state as T, select.completion)
213214
}
214215
}
215216

@@ -236,6 +237,6 @@ private class LazyDeferredCoroutine<T>(
236237
private val block: suspend CoroutineScope.() -> T
237238
) : DeferredCoroutine<T>(parentContext, active = false) {
238239
override fun onStart() {
239-
block.startCoroutine(this, this)
240+
block.startCoroutineCancellable(this, this)
240241
}
241242
}

0 commit comments

Comments
 (0)