Skip to content

Commit 4eae2a8

Browse files
committed
run function is cancellable by default and accepts optional CoroutineStart
1 parent 607f893 commit 4eae2a8

File tree

4 files changed

+114
-24
lines changed
  • integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future
  • kotlinx-coroutines-core/src

4 files changed

+114
-24
lines changed

integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ private class ContinuationConsumer<T>(
154154
replaceWith = ReplaceWith("asCompletableFuture()"))
155155
public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture()
156156

157+
/** @suppress **Deprecated** */
157158
@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
158159
@Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN)
159160
public fun <T> future(

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,31 +77,52 @@ public fun launch(context: CoroutineContext, start: Boolean, block: suspend Coro
7777
* This function immediately applies dispatcher from the new context, shifting execution of the block into the
7878
* different thread inside the block, and back when it completes.
7979
* The specified [context] is added onto the current coroutine context for the execution of the block.
80+
*
81+
* An optional `start` parameter is used only if the specified `context` uses a different [CoroutineDispatcher] than
82+
* a current one, otherwise it is ignored.
83+
* By default, the coroutine is immediately scheduled for execution and can be cancelled
84+
* while it is waiting to be executed and it can be cancelled while the result is scheduled
85+
* to be be processed by the invoker context.
86+
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
87+
* A value of [CoroutineStart.LAZY] is not supported and produces [IllegalArgumentException].
8088
*/
81-
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
82-
suspendCoroutineOrReturn sc@ { cont ->
83-
val oldContext = cont.context
84-
// fast path #1 if there is no change in the actual context:
85-
if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
86-
return@sc block.startCoroutineUninterceptedOrReturn(cont)
87-
// compute new context
88-
val newContext = oldContext + context
89-
// fast path #2 if the result is actually the same
90-
if (newContext === oldContext)
91-
return@sc block.startCoroutineUninterceptedOrReturn(cont)
92-
// fast path #3 if the new dispatcher is the same as the old one.
93-
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
94-
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
95-
val newContinuation = RunContinuationDirect(newContext, cont)
96-
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
97-
}
98-
// slowest path otherwise -- use new interceptor, sync to its result via a
99-
// full-blown instance of CancellableContinuation
100-
val newContinuation = RunContinuationCoroutine(newContext, cont)
101-
newContinuation.initCancellability()
102-
block.startCoroutine(newContinuation)
103-
newContinuation.getResult()
89+
public suspend fun <T> run(
90+
context: CoroutineContext,
91+
start: CoroutineStart = CoroutineStart.DEFAULT,
92+
block: suspend () -> T
93+
): T = suspendCoroutineOrReturn sc@ { cont ->
94+
val oldContext = cont.context
95+
// fast path #1 if there is no change in the actual context:
96+
if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
97+
return@sc block.startCoroutineUninterceptedOrReturn(cont)
98+
// compute new context
99+
val newContext = oldContext + context
100+
// fast path #2 if the result is actually the same
101+
if (newContext === oldContext)
102+
return@sc block.startCoroutineUninterceptedOrReturn(cont)
103+
// fast path #3 if the new dispatcher is the same as the old one.
104+
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
105+
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
106+
val newContinuation = RunContinuationDirect(newContext, cont)
107+
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
104108
}
109+
// slowest path otherwise -- use new interceptor, sync to its result via a
110+
// full-blown instance of CancellableContinuation
111+
require(!start.isLazy) { "$start start is not supported" }
112+
val newContinuation = RunContinuationCoroutine(
113+
parentContext = newContext,
114+
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE,
115+
continuation = cont)
116+
newContinuation.initCancellability() // attach to parent job
117+
start(block, newContinuation)
118+
newContinuation.getResult()
119+
}
120+
121+
/** @suppress **Deprecated** */
122+
@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
123+
@Deprecated(message = "It is here for binary compatibility only", level=DeprecationLevel.HIDDEN)
124+
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
125+
run(context, start = CoroutineStart.ATOMIC, block = block)
105126

106127
/**
107128
* Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
@@ -157,8 +178,9 @@ private class RunContinuationDirect<in T>(
157178

158179
private class RunContinuationCoroutine<in T>(
159180
override val parentContext: CoroutineContext,
181+
resumeMode: Int,
160182
continuation: Continuation<T>
161-
) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = MODE_CANCELLABLE, active = true)
183+
) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = resumeMode, active = true)
162184

163185
private class BlockingCoroutine<T>(
164186
override val parentContext: CoroutineContext,

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlinx.coroutines.experimental.CoroutineStart.*
1920
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2021
import kotlin.coroutines.experimental.Continuation
2122
import kotlin.coroutines.experimental.startCoroutine
@@ -74,9 +75,31 @@ public enum class CoroutineStart {
7475
* Immediately executes coroutine until its first suspension point _in the current thread_ as if it the
7576
* coroutine was started using [Unconfined] dispatcher. However, when coroutine is resumed from suspension
7677
* it is dispatched according to the [CoroutineDispatcher] in its context.
78+
*
79+
* This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled,
80+
* but the difference is that it start executing in the same thread.
81+
*
82+
* Cancellability of coroutine at suspension points depends on the particular implementation details of
83+
* suspending functions as in [DEFAULT].
7784
*/
7885
UNDISPATCHED;
7986

87+
/**
88+
* Starts the corresponding block as a coroutine with this coroutine start strategy.
89+
*
90+
* * [DEFAULT] uses [startCoroutineCancellable].
91+
* * [ATOMIC] uses [startCoroutine].
92+
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
93+
* * [LAZY] does nothing.
94+
*/
95+
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
96+
when (this) {
97+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
98+
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
99+
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
100+
CoroutineStart.LAZY -> Unit // will start lazily
101+
}
102+
80103
/**
81104
* Starts the corresponding block with receiver as a coroutine with this coroutine start strategy.
82105
*

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package kotlinx.coroutines.experimental
1919
import org.hamcrest.MatcherAssert.assertThat
2020
import org.hamcrest.core.IsEqual
2121
import org.junit.Test
22+
import kotlin.coroutines.experimental.ContinuationInterceptor
23+
import kotlin.coroutines.experimental.CoroutineContext
2224

2325
class RunTest : TestBase() {
2426
@Test
@@ -124,4 +126,46 @@ class RunTest : TestBase() {
124126
assertThat(result, IsEqual("OK"))
125127
finish(4)
126128
}
129+
130+
@Test(expected = CancellationException::class)
131+
fun testRunCancellableDefault() = runBlocking<Unit> {
132+
val job = Job()
133+
job.cancel() // cancel before it has a chance to run
134+
run(job + wrapperDispatcher(context)) {
135+
expectUnreached() // will get cancelled
136+
}
137+
}
138+
139+
@Test(expected = CancellationException::class)
140+
fun testRunAtomicTryCancel() = runBlocking<Unit> {
141+
expect(1)
142+
val job = Job()
143+
job.cancel() // try to cancel before it has a chance to run
144+
run(job + wrapperDispatcher(context), CoroutineStart.ATOMIC) { // but start atomically
145+
finish(2)
146+
yield() // but will cancel here
147+
expectUnreached()
148+
}
149+
}
150+
151+
@Test(expected = CancellationException::class)
152+
fun testRunUndispatchedTryCancel() = runBlocking<Unit> {
153+
expect(1)
154+
val job = Job()
155+
job.cancel() // try to cancel before it has a chance to run
156+
run(job + wrapperDispatcher(context), CoroutineStart.UNDISPATCHED) { // but start atomically
157+
finish(2)
158+
yield() // but will cancel here
159+
expectUnreached()
160+
}
161+
}
162+
163+
private fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
164+
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
165+
return object : CoroutineDispatcher() {
166+
override fun dispatch(context: CoroutineContext, block: Runnable) {
167+
dispatcher.dispatch(context, block)
168+
}
169+
}
170+
}
127171
}

0 commit comments

Comments
 (0)