Skip to content

Commit e22f14a

Browse files
committed
Refactoring: RunCoroutine no longer extends CancellableContinuationImpl,
The later is not open anymore AbstractCoroutineWithDecision provide basic resume/suspend decision logic
1 parent 0f66a6d commit e22f14a

File tree

6 files changed

+184
-107
lines changed

6 files changed

+184
-107
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.locks.LockSupport
2020
import kotlin.coroutines.experimental.*
21+
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2122
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2223
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2324

@@ -106,16 +107,15 @@ public suspend fun <T> run(
106107
val newContinuation = RunContinuationDirect(newContext, cont)
107108
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
108109
}
109-
// slowest path otherwise -- use new interceptor, sync to its result via a
110-
// full-blown instance of CancellableContinuation
110+
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCoroutine
111111
require(!start.isLazy) { "$start start is not supported" }
112-
val newContinuation = RunContinuationCoroutine(
112+
val coroutine = RunCoroutine(
113+
delegate = cont,
113114
parentContext = newContext,
114-
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE,
115-
continuation = cont)
116-
newContinuation.initCancellability() // attach to parent job
117-
start(block, newContinuation)
118-
newContinuation.getResult()
115+
defaultResumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE)
116+
coroutine.initParentJob(newContext[Job]) // attach to job
117+
start(block, coroutine)
118+
coroutine.getResult()
119119
}
120120

121121
/** @suppress **Deprecated** */
@@ -176,11 +176,30 @@ private class RunContinuationDirect<in T>(
176176
continuation: Continuation<T>
177177
) : Continuation<T> by continuation
178178

179-
private class RunContinuationCoroutine<in T>(
179+
@Suppress("UNCHECKED_CAST")
180+
private class RunCoroutine<in T>(
181+
private val delegate: Continuation<T>,
180182
override val parentContext: CoroutineContext,
181-
resumeMode: Int,
182-
continuation: Continuation<T>
183-
) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = resumeMode, active = true)
183+
override val defaultResumeMode: Int
184+
) : AbstractCoroutineWithDecision<T>(active = true) {
185+
@PublishedApi
186+
internal fun getResult(): Any? {
187+
if (trySuspend()) return COROUTINE_SUSPENDED
188+
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
189+
val state = this.state
190+
if (state is CompletedExceptionally) throw state.exception
191+
return state as T
192+
}
193+
194+
override fun afterCompletion(state: Any?, mode: Int) {
195+
if (tryResume()) return // completed before getResult invocation -- bail out
196+
// otherwise, getResult has already commenced, i.e. completed later or in other thread
197+
if (state is CompletedExceptionally)
198+
delegate.resumeWithExceptionMode(mode, state.exception)
199+
else
200+
delegate.resumeMode(mode, state as T)
201+
}
202+
}
184203

185204
private class BlockingCoroutine<T>(
186205
override val parentContext: CoroutineContext,

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 30 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
20-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2120
import kotlin.coroutines.experimental.Continuation
2221
import kotlin.coroutines.experimental.CoroutineContext
2322
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
@@ -124,7 +123,7 @@ public inline suspend fun <T> suspendCancellableCoroutine(
124123
crossinline block: (CancellableContinuation<T>) -> Unit
125124
): T =
126125
suspendCoroutineOrReturn { cont ->
127-
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_CANCELLABLE, active = true)
126+
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_CANCELLABLE)
128127
if (!holdCancellability) cancellable.initCancellability()
129128
block(cancellable)
130129
cancellable.getResult()
@@ -143,7 +142,7 @@ public inline suspend fun <T> suspendAtomicCancellableCoroutine(
143142
crossinline block: (CancellableContinuation<T>) -> Unit
144143
): T =
145144
suspendCoroutineOrReturn { cont ->
146-
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_ATOMIC_DEFAULT, active = true)
145+
val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_ATOMIC_DEFAULT)
147146
if (!holdCancellability) cancellable.initCancellability()
148147
block(cancellable)
149148
cancellable.getResult()
@@ -169,51 +168,36 @@ private class RemoveOnCancel(
169168
override fun toString() = "RemoveOnCancel[$node]"
170169
}
171170

172-
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
173-
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
174-
@PublishedApi internal const val MODE_DIRECT = 2 // when the context is right just invoke the delegate continuation direct
175-
@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
176-
177171
@PublishedApi
178-
internal open class CancellableContinuationImpl<in T>(
179-
@JvmField
180-
protected val delegate: Continuation<T>,
181-
override val defaultResumeMode: Int,
182-
active: Boolean
183-
) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
184-
@Volatile
185-
private var decision = UNDECIDED
186-
187-
override val parentContext: CoroutineContext
188-
get() = delegate.context
189-
190-
protected companion object {
191-
@JvmField
192-
val DECISION: AtomicIntegerFieldUpdater<CancellableContinuationImpl<*>> =
193-
AtomicIntegerFieldUpdater.newUpdater(CancellableContinuationImpl::class.java, "decision")
194-
195-
const val UNDECIDED = 0
196-
const val SUSPENDED = 1
197-
const val RESUMED = 2
198-
199-
@Suppress("UNCHECKED_CAST")
200-
fun <T> getSuccessfulResult(state: Any?): T = if (state is CompletedIdempotentResult) state.result as T else state as T
201-
}
172+
internal class CancellableContinuationImpl<in T>(
173+
private val delegate: Continuation<T>,
174+
override val defaultResumeMode: Int
175+
) : AbstractCoroutineWithDecision<T>(active = true), CancellableContinuation<T> {
176+
override val parentContext: CoroutineContext get() = delegate.context
202177

203178
override fun initCancellability() {
204179
initParentJob(parentContext[Job])
205180
}
206181

207182
@PublishedApi
208183
internal fun getResult(): Any? {
209-
val decision = this.decision // volatile read
210-
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
211-
// otherwise, afterCompletion was already invoked, and the result is in the state
184+
if (trySuspend()) return COROUTINE_SUSPENDED
185+
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
212186
val state = this.state
213187
if (state is CompletedExceptionally) throw state.exception
214188
return getSuccessfulResult(state)
215189
}
216190

191+
override fun afterCompletion(state: Any?, mode: Int) {
192+
if (tryResume()) return // completed before getResult invocation -- bail out
193+
// otherwise, getResult has already commenced, i.e. completed later or in other thread
194+
if (state is CompletedExceptionally) {
195+
delegate.resumeWithExceptionMode(mode, state.exception)
196+
} else {
197+
delegate.resumeMode(mode, getSuccessfulResult<T>(state))
198+
}
199+
}
200+
217201
override fun tryResume(value: T, idempotent: Any?): Any? {
218202
while (true) { // lock-free loop on state
219203
val state = this.state // atomic read
@@ -251,31 +235,6 @@ internal open class CancellableContinuationImpl<in T>(
251235
completeUpdateState(token, state, defaultResumeMode)
252236
}
253237

254-
override fun afterCompletion(state: Any?, mode: Int) {
255-
val decision = this.decision // volatile read
256-
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
257-
// otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
258-
if (state is CompletedExceptionally) {
259-
val exception = state.exception
260-
when (mode) {
261-
MODE_ATOMIC_DEFAULT -> delegate.resumeWithException(exception)
262-
MODE_CANCELLABLE -> delegate.resumeCancellableWithException(exception)
263-
MODE_DIRECT -> delegate.resumeDirectWithException(exception)
264-
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
265-
else -> error("Invalid mode $mode")
266-
}
267-
} else {
268-
val value = getSuccessfulResult<T>(state)
269-
when (mode) {
270-
MODE_ATOMIC_DEFAULT -> delegate.resume(value)
271-
MODE_CANCELLABLE -> delegate.resumeCancellable(value)
272-
MODE_DIRECT -> delegate.resumeDirect(value)
273-
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
274-
else -> error("Invalid mode $mode")
275-
}
276-
}
277-
}
278-
279238
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
280239
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
281240
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
@@ -287,12 +246,16 @@ internal open class CancellableContinuationImpl<in T>(
287246
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
288247
resumeWithException(exception, MODE_UNDISPATCHED)
289248
}
249+
}
290250

291-
private class CompletedIdempotentResult(
292-
@JvmField val idempotentResume: Any?,
293-
@JvmField val result: Any?,
294-
@JvmField val token: Incomplete
295-
) {
296-
override fun toString(): String = "CompletedIdempotentResult[$result]"
297-
}
251+
private class CompletedIdempotentResult(
252+
@JvmField val idempotentResume: Any?,
253+
@JvmField val result: Any?,
254+
@JvmField val token: JobSupport.Incomplete
255+
) {
256+
override fun toString(): String = "CompletedIdempotentResult[$result]"
298257
}
258+
259+
@Suppress("UNCHECKED_CAST")
260+
private fun <T> getSuccessfulResult(state: Any?): T =
261+
if (state is CompletedIdempotentResult) state.result as T else state as T

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
1920
import kotlin.coroutines.experimental.Continuation
2021
import kotlin.coroutines.experimental.CoroutineContext
2122

@@ -83,7 +84,7 @@ public abstract class AbstractCoroutine<in T>(
8384
if (ignoreRepeatedResume) {
8485
return
8586
} else
86-
throw IllegalStateException("Already resumed, but got value $value")
87+
error("Already resumed, but got value $value")
8788
}
8889
}
8990
}
@@ -117,11 +118,59 @@ public abstract class AbstractCoroutine<in T>(
117118
final override fun handleCompletionException(closeException: Throwable) {
118119
handleCoroutineException(context, closeException)
119120
}
121+
}
122+
123+
/**
124+
* @suppress **This is unstable API and it is subject to change.**
125+
*/
126+
public abstract class AbstractCoroutineWithDecision<in T>(active: Boolean) : AbstractCoroutine<T>(active) {
127+
@Volatile
128+
private var decision = UNDECIDED
129+
130+
/* decision state machine
131+
132+
+-----------+ trySuspend +-----------+
133+
| UNDECIDED | -------------> | SUSPENDED |
134+
+-----------+ +-----------+
135+
|
136+
| tryResume
137+
V
138+
+-----------+
139+
| RESUMED |
140+
+-----------+
141+
142+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
143+
*/
120144

121-
// for nicer debugging
122-
override fun toString(): String {
123-
val state = this.state
124-
val result = if (state is Incomplete) "" else "[$state]"
125-
return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
145+
protected companion object {
146+
@JvmField
147+
val DECISION: AtomicIntegerFieldUpdater<AbstractCoroutineWithDecision<*>> =
148+
AtomicIntegerFieldUpdater.newUpdater(AbstractCoroutineWithDecision::class.java, "decision")
149+
150+
const val UNDECIDED = 0
151+
const val SUSPENDED = 1
152+
const val RESUMED = 2
126153
}
127-
}
154+
155+
protected fun trySuspend(): Boolean {
156+
while (true) { // lock-free loop
157+
val decision = this.decision // volatile read
158+
when (decision) {
159+
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return true
160+
RESUMED -> return false
161+
else -> error("Already suspended")
162+
}
163+
}
164+
}
165+
166+
protected fun tryResume(): Boolean {
167+
while (true) { // lock-free loop
168+
val decision = this.decision // volatile read
169+
when (decision) {
170+
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return true
171+
SUSPENDED -> return false
172+
else -> error("Already resumed")
173+
}
174+
}
175+
}
176+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,11 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
591591
?: InvokeOnCompletion(this, handler)
592592

593593
// for nicer debugging
594-
override fun toString(): String = "${this::class.java.simpleName}{${stateToString(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
594+
override fun toString(): String {
595+
val state = this.state
596+
val result = if (state is Incomplete) "" else "[$state]"
597+
return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
598+
}
595599

596600
/**
597601
* Interface for incomplete [state] of a job.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.coroutines.experimental.Continuation
20+
21+
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
22+
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
23+
@PublishedApi internal const val MODE_DIRECT = 2 // when the context is right just invoke the delegate continuation direct
24+
@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
25+
26+
fun <T> Continuation<T>.resumeMode(mode: Int, value: T) {
27+
when (mode) {
28+
MODE_ATOMIC_DEFAULT -> resume(value)
29+
MODE_CANCELLABLE -> resumeCancellable(value)
30+
MODE_DIRECT -> resumeDirect(value)
31+
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
32+
else -> error("Invalid mode $mode")
33+
}
34+
}
35+
36+
fun <T> Continuation<T>.resumeWithExceptionMode(mode: Int, exception: Throwable) {
37+
when (mode) {
38+
MODE_ATOMIC_DEFAULT -> resumeWithException(exception)
39+
MODE_CANCELLABLE -> resumeCancellableWithException(exception)
40+
MODE_DIRECT -> resumeDirectWithException(exception)
41+
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatchedWithException(exception)
42+
else -> error("Invalid mode $mode")
43+
}
44+
}

0 commit comments

Comments
 (0)