Skip to content

Commit 2b12d58

Browse files
committed
Refactoring: AbstractCoroutine is extended only by true coroutine impls;
AbstractCoroutine got simplified as a result; CancellableContinuationImpl and RunCompletion do not extend it anymore; they extend AbstractContinuation base class that they share.
1 parent e22f14a commit 2b12d58

File tree

16 files changed

+183
-145
lines changed

16 files changed

+183
-145
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
20+
import kotlin.coroutines.experimental.Continuation
21+
22+
/**
23+
* @suppress **This is unstable API and it is subject to change.**
24+
*/
25+
internal abstract class AbstractContinuation<in T>(
26+
active: Boolean,
27+
@JvmField protected val resumeMode: Int
28+
) : JobSupport(active), Continuation<T> {
29+
@Volatile
30+
private var decision = UNDECIDED
31+
32+
/* decision state machine
33+
34+
+-----------+ trySuspend +-----------+
35+
| UNDECIDED | -------------> | SUSPENDED |
36+
+-----------+ +-----------+
37+
|
38+
| tryResume
39+
V
40+
+-----------+
41+
| RESUMED |
42+
+-----------+
43+
44+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
45+
*/
46+
47+
protected companion object {
48+
@JvmField
49+
val DECISION: AtomicIntegerFieldUpdater<AbstractContinuation<*>> =
50+
AtomicIntegerFieldUpdater.newUpdater(AbstractContinuation::class.java, "decision")
51+
52+
const val UNDECIDED = 0
53+
const val SUSPENDED = 1
54+
const val RESUMED = 2
55+
}
56+
57+
protected fun trySuspend(): Boolean {
58+
while (true) { // lock-free loop
59+
val decision = this.decision // volatile read
60+
when (decision) {
61+
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return true
62+
RESUMED -> return false
63+
else -> error("Already suspended")
64+
}
65+
}
66+
}
67+
68+
protected fun tryResume(): Boolean {
69+
while (true) { // lock-free loop
70+
val decision = this.decision // volatile read
71+
when (decision) {
72+
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return true
73+
SUSPENDED -> return false
74+
else -> error("Already resumed")
75+
}
76+
}
77+
}
78+
79+
override fun resume(value: T) = resumeImpl(value, resumeMode)
80+
81+
protected fun resumeImpl(value: T, resumeMode: Int) {
82+
while (true) { // lock-free loop on state
83+
val state = this.state // atomic read
84+
when (state) {
85+
is Incomplete -> if (updateState(state, value, resumeMode)) return
86+
is Cancelled -> return // ignore resumes on cancelled continuation
87+
else -> error("Already resumed, but got value $value")
88+
}
89+
}
90+
}
91+
92+
override fun resumeWithException(exception: Throwable) = resumeWithExceptionImpl(exception, resumeMode)
93+
94+
protected fun resumeWithExceptionImpl(exception: Throwable, resumeMode: Int) {
95+
while (true) { // lock-free loop on state
96+
val state = this.state // atomic read
97+
when (state) {
98+
is Incomplete -> {
99+
if (updateState(state, CompletedExceptionally(exception), resumeMode)) return
100+
}
101+
is Cancelled -> {
102+
// ignore resumes on cancelled continuation, but handle exception if a different one is here
103+
if (exception != state.exception) handleCoroutineException(context, exception)
104+
return
105+
}
106+
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
107+
}
108+
}
109+
}
110+
111+
override fun handleCompletionException(closeException: Throwable) {
112+
handleCoroutineException(context, closeException)
113+
}
114+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ public suspend fun <T> run(
107107
val newContinuation = RunContinuationDirect(newContext, cont)
108108
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
109109
}
110-
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCoroutine
110+
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
111111
require(!start.isLazy) { "$start start is not supported" }
112-
val coroutine = RunCoroutine(
112+
val completion = RunCompletion(
113+
context = newContext,
113114
delegate = cont,
114-
parentContext = newContext,
115-
defaultResumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE)
116-
coroutine.initParentJob(newContext[Job]) // attach to job
117-
start(block, coroutine)
118-
coroutine.getResult()
115+
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE)
116+
completion.initParentJob(newContext[Job]) // attach to job
117+
start(block, completion)
118+
completion.getResult()
119119
}
120120

121121
/** @suppress **Deprecated** */
@@ -153,9 +153,9 @@ public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl
153153
// --------------- implementation ---------------
154154

155155
private open class StandaloneCoroutine(
156-
override val parentContext: CoroutineContext,
156+
private val parentContext: CoroutineContext,
157157
active: Boolean
158-
) : AbstractCoroutine<Unit>(active) {
158+
) : AbstractCoroutine<Unit>(parentContext, active) {
159159
override fun afterCompletion(state: Any?, mode: Int) {
160160
// note the use of the parent's job context below!
161161
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
@@ -177,11 +177,11 @@ private class RunContinuationDirect<in T>(
177177
) : Continuation<T> by continuation
178178

179179
@Suppress("UNCHECKED_CAST")
180-
private class RunCoroutine<in T>(
180+
private class RunCompletion<in T>(
181+
override val context: CoroutineContext,
181182
private val delegate: Continuation<T>,
182-
override val parentContext: CoroutineContext,
183-
override val defaultResumeMode: Int
184-
) : AbstractCoroutineWithDecision<T>(active = true) {
183+
resumeMode: Int
184+
) : AbstractContinuation<T>(true, resumeMode) {
185185
@PublishedApi
186186
internal fun getResult(): Any? {
187187
if (trySuspend()) return COROUTINE_SUSPENDED
@@ -195,17 +195,17 @@ private class RunCoroutine<in T>(
195195
if (tryResume()) return // completed before getResult invocation -- bail out
196196
// otherwise, getResult has already commenced, i.e. completed later or in other thread
197197
if (state is CompletedExceptionally)
198-
delegate.resumeWithExceptionMode(mode, state.exception)
198+
delegate.resumeWithExceptionMode(state.exception, mode)
199199
else
200-
delegate.resumeMode(mode, state as T)
200+
delegate.resumeMode(state as T, mode)
201201
}
202202
}
203203

204204
private class BlockingCoroutine<T>(
205-
override val parentContext: CoroutineContext,
205+
parentContext: CoroutineContext,
206206
private val blockedThread: Thread,
207207
private val privateEventLoop: Boolean
208-
) : AbstractCoroutine<T>(active = true) {
208+
) : AbstractCoroutine<T>(parentContext, true) {
209209
val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
210210

211211
init {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public inline suspend fun <T> suspendCancellableCoroutine(
123123
crossinline block: (CancellableContinuation<T>) -> Unit
124124
): T =
125125
suspendCoroutineOrReturn { cont ->
126-
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_CANCELLABLE)
126+
val cancellable = CancellableContinuationImpl(cont, resumeMode = MODE_CANCELLABLE)
127127
if (!holdCancellability) cancellable.initCancellability()
128128
block(cancellable)
129129
cancellable.getResult()
@@ -142,7 +142,7 @@ public inline suspend fun <T> suspendAtomicCancellableCoroutine(
142142
crossinline block: (CancellableContinuation<T>) -> Unit
143143
): T =
144144
suspendCoroutineOrReturn { cont ->
145-
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_ATOMIC_DEFAULT)
145+
val cancellable = CancellableContinuationImpl(cont, resumeMode = MODE_ATOMIC_DEFAULT)
146146
if (!holdCancellability) cancellable.initCancellability()
147147
block(cancellable)
148148
cancellable.getResult()
@@ -171,12 +171,16 @@ private class RemoveOnCancel(
171171
@PublishedApi
172172
internal class CancellableContinuationImpl<in T>(
173173
private val delegate: Continuation<T>,
174-
override val defaultResumeMode: Int
175-
) : AbstractCoroutineWithDecision<T>(active = true), CancellableContinuation<T> {
176-
override val parentContext: CoroutineContext get() = delegate.context
174+
resumeMode: Int
175+
) : AbstractContinuation<T>(true, resumeMode), CancellableContinuation<T> {
176+
@Volatile // just in case -- we don't want an extra data race, even benign one
177+
private var _context: CoroutineContext? = null // created on first need
178+
179+
public override val context: CoroutineContext
180+
get() = _context ?: (delegate.context + this).also { _context = it }
177181

178182
override fun initCancellability() {
179-
initParentJob(parentContext[Job])
183+
initParentJob(delegate.context[Job])
180184
}
181185

182186
@PublishedApi
@@ -192,9 +196,9 @@ internal class CancellableContinuationImpl<in T>(
192196
if (tryResume()) return // completed before getResult invocation -- bail out
193197
// otherwise, getResult has already commenced, i.e. completed later or in other thread
194198
if (state is CompletedExceptionally) {
195-
delegate.resumeWithExceptionMode(mode, state.exception)
199+
delegate.resumeWithExceptionMode(state.exception, mode)
196200
} else {
197-
delegate.resumeMode(mode, getSuccessfulResult<T>(state))
201+
delegate.resumeMode(getSuccessfulResult<T>(state), mode)
198202
}
199203
}
200204

@@ -232,19 +236,19 @@ internal class CancellableContinuationImpl<in T>(
232236
}
233237

234238
override fun completeResume(token: Any) {
235-
completeUpdateState(token, state, defaultResumeMode)
239+
completeUpdateState(token, state, resumeMode)
236240
}
237241

238242
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
239243
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
240244
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
241-
resume(value, MODE_UNDISPATCHED)
245+
resumeImpl(value, MODE_UNDISPATCHED)
242246
}
243247

244248
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
245249
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
246250
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
247-
resumeWithException(exception, MODE_UNDISPATCHED)
251+
resumeWithExceptionImpl(exception, MODE_UNDISPATCHED)
248252
}
249253
}
250254

0 commit comments

Comments
 (0)