Skip to content

Commit 0a656ff

Browse files
elizarovqwwdfsad
authored andcommitted
Fixed CoroutineScope of withContext block for structured concurrency
* Both are optimized and are rewritten via AbstractCoroutine * Support for cancelling state is dropped from AbstractContinuation and it is now faster, too Fixes #553 Fixes #617
1 parent 8d6c1a9 commit 0a656ff

File tree

7 files changed

+223
-322
lines changed

7 files changed

+223
-322
lines changed

common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt

Lines changed: 2 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,6 @@ internal abstract class AbstractContinuation<in T>(
3131
* 4) Its cancellation listeners cannot be deregistered
3232
* As a consequence it has much simpler state machine, more lightweight machinery and
3333
* less dependencies.
34-
*
35-
* Cancelling state
36-
* If useCancellingState is true, then this continuation can have additional cancelling state,
37-
* which is transition from Active to Cancelled. This is specific state to support withContext(ctx)
38-
* construction: block in withContext can be cancelled from withing or even before stepping into withContext,
39-
* but we still want to properly run it (e.g. when it has atomic cancellation mode) and run its completion listener
40-
* after.
41-
* During cancellation all pending exceptions are aggregated and thrown during transition to final state
4234
*/
4335

4436
/* decision state machine
@@ -63,7 +55,6 @@ internal abstract class AbstractContinuation<in T>(
6355
------ ------------ ------------ -----------
6456
ACTIVE Active : Active active, no listeners
6557
SINGLE_A CancelHandler : Active active, one cancellation listener
66-
CANCELLING Cancelling : Active in the process of cancellation due to cancellation of parent job
6758
CANCELLED Cancelled : Cancelled cancelled (final state)
6859
COMPLETED any : Completed produced some result or threw an exception (final state)
6960
*/
@@ -80,8 +71,6 @@ internal abstract class AbstractContinuation<in T>(
8071

8172
public val isCancelled: Boolean get() = state is CancelledContinuation
8273

83-
protected open val useCancellingState: Boolean get() = false
84-
8574
internal fun initParentJobInternal(parent: Job?) {
8675
check(parentHandle == null)
8776
if (parent == null) {
@@ -105,7 +94,6 @@ internal abstract class AbstractContinuation<in T>(
10594
public fun cancel(cause: Throwable?): Boolean {
10695
loopOnState { state ->
10796
if (state !is NotCompleted) return false // quit if already complete
108-
if (state is Cancelling) return false // someone else succeeded
10997
if (tryCancel(state, cause)) return true
11098
}
11199
}
@@ -170,7 +158,6 @@ internal abstract class AbstractContinuation<in T>(
170158
handler.invokeIt((state as? CompletedExceptionally)?.cause)
171159
return
172160
}
173-
is Cancelling -> error("Cancellation handlers for continuations with 'Cancelling' state are not supported")
174161
else -> return
175162
}
176163
}
@@ -179,14 +166,8 @@ internal abstract class AbstractContinuation<in T>(
179166
private fun makeHandler(handler: CompletionHandler): CancelHandler =
180167
if (handler is CancelHandler) handler else InvokeOnCancel(handler)
181168

182-
private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean {
183-
if (useCancellingState) {
184-
require(state !is CancelHandler) { "Invariant: 'Cancelling' state and cancellation handlers cannot be used together" }
185-
return _state.compareAndSet(state, Cancelling(CancelledContinuation(this, cause)))
186-
}
187-
188-
return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
189-
}
169+
private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean =
170+
updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
190171

191172
private fun dispatchResume(mode: Int) {
192173
if (tryResume()) return // completed before getResult invocation -- bail out
@@ -203,75 +184,6 @@ internal abstract class AbstractContinuation<in T>(
203184
protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
204185
loopOnState { state ->
205186
when (state) {
206-
is Cancelling -> { // withContext() support
207-
/*
208-
* If already cancelled block is resumed with non-exception,
209-
* resume it with cancellation exception.
210-
* E.g.
211-
* ```
212-
* val value = withContext(ctx) {
213-
* outerJob.cancel() // -> cancelling
214-
* 42 // -> cancelled
215-
* }
216-
* ```
217-
* should throw cancellation exception instead of returning 42
218-
*/
219-
if (proposedUpdate !is CompletedExceptionally) {
220-
val update = state.cancel
221-
if (updateStateToFinal(state, update, resumeMode)) return
222-
} else {
223-
/*
224-
* If already cancelled block is resumed with an exception,
225-
* then we should properly merge them to avoid information loss.
226-
*
227-
* General rule:
228-
* Thrown exception always becomes a result and cancellation reason
229-
* is added to suppressed exceptions if necessary.
230-
* Basic duplicate/cycles check is performed
231-
*/
232-
val update: CompletedExceptionally
233-
234-
/*
235-
* Proposed update is another CancellationException.
236-
* e.g.
237-
* ```
238-
* T1: ctxJob.cancel(e1) // -> cancelling
239-
* T2:
240-
* withContext(ctx, Mode.ATOMIC) {
241-
* // -> resumed with cancellation exception
242-
* }
243-
* ```
244-
*/
245-
if (proposedUpdate.cause is CancellationException) {
246-
// Keep original cancellation cause and try add to suppressed exception from proposed cancel
247-
update = proposedUpdate
248-
coerceWithException(state, update)
249-
} else {
250-
/*
251-
* Proposed update is exception => transition to terminal state
252-
* E.g.
253-
* ```
254-
* withContext(ctx) {
255-
* outerJob.cancel() // -> cancelling
256-
* throw Exception() // -> completed exceptionally
257-
* }
258-
* ```
259-
*/
260-
val exception = proposedUpdate.cause
261-
val currentException = state.cancel.cause
262-
// Add to suppressed if original cancellation differs from proposed exception
263-
if (currentException !is CancellationException || currentException.cause !== exception) {
264-
exception.addSuppressedThrowable(currentException)
265-
}
266-
267-
update = CompletedExceptionally(exception)
268-
}
269-
270-
if (updateStateToFinal(state, update, resumeMode)) {
271-
return
272-
}
273-
}
274-
}
275187
is NotCompleted -> {
276188
if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
277189
}
@@ -286,35 +198,20 @@ internal abstract class AbstractContinuation<in T>(
286198
if (proposedUpdate is CompletedExceptionally) {
287199
handleException(proposedUpdate.cause)
288200
}
289-
290201
return
291202
}
292203
else -> error("Already resumed, but proposed with update $proposedUpdate")
293204
}
294205
}
295206
}
296207

297-
// Coerce current cancelling state with proposed exception
298-
private fun coerceWithException(state: Cancelling, proposedUpdate: CompletedExceptionally) {
299-
val originalCancellation = state.cancel
300-
val originalException = originalCancellation.cause
301-
val updateCause = proposedUpdate.cause
302-
// Cause of proposed update is present and differs from one in current state
303-
val isSameCancellation = originalCancellation.cause is CancellationException
304-
&& originalException.cause === updateCause.cause
305-
if (!isSameCancellation && (originalException.cause !== updateCause)) {
306-
proposedUpdate.cause.addSuppressedThrowable(originalException)
307-
}
308-
}
309-
310208
/**
311209
* Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary
312210
*/
313211
private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
314212
if (!tryUpdateStateToFinal(expect, proposedUpdate)) {
315213
return false
316214
}
317-
318215
completeStateUpdate(expect, proposedUpdate, mode)
319216
return true
320217
}
@@ -374,9 +271,6 @@ internal interface NotCompleted
374271
private class Active : NotCompleted
375272
private val ACTIVE: Active = Active()
376273

377-
// In progress of cancellation
378-
internal class Cancelling(@JvmField val cancel: CancelledContinuation) : NotCompleted
379-
380274
internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
381275

382276
// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly

common/kotlinx-coroutines-core-common/src/Builders.common.kt

Lines changed: 77 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package kotlinx.coroutines.experimental
99

10+
import kotlinx.atomicfu.*
1011
import kotlinx.coroutines.experimental.internal.*
1112
import kotlinx.coroutines.experimental.intrinsics.*
1213
import kotlinx.coroutines.experimental.selects.*
@@ -193,13 +194,30 @@ public fun launch(context: CoroutineContext, start: Boolean, block: suspend Coro
193194
public suspend fun <T> withContext(
194195
context: CoroutineContext,
195196
block: suspend CoroutineScope.() -> T
196-
): T =
197-
// todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
198-
withContextImpl(context, start = CoroutineStart.DEFAULT) {
199-
currentScope {
200-
block()
197+
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
198+
// compute new context
199+
val oldContext = uCont.context
200+
val newContext = oldContext + context
201+
// FAST PATH #1 -- new context is the same as the old one
202+
if (newContext === oldContext) {
203+
val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT
204+
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
205+
}
206+
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
207+
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
208+
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
209+
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
210+
// There are changes in the context, so this thread needs to be updated
211+
withCoroutineContext(newContext) {
212+
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
201213
}
202214
}
215+
// SLOW PATH -- use new dispatcher
216+
val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_DISPATCHED
217+
coroutine.initParentJob()
218+
block.startCoroutineCancellable(coroutine, coroutine)
219+
coroutine.getResult()
220+
}
203221

204222
/**
205223
* @suppress **Deprecated**: start parameter is deprecated, no replacement.
@@ -210,48 +228,7 @@ public suspend fun <T> withContext(
210228
start: CoroutineStart = CoroutineStart.DEFAULT,
211229
block: suspend CoroutineScope.() -> T
212230
): T =
213-
// todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
214-
withContextImpl(context, start) {
215-
currentScope {
216-
block()
217-
}
218-
}
219-
220-
// todo: optimize it to reduce allocations
221-
private suspend fun <T> withContextImpl(
222-
context: CoroutineContext,
223-
start: CoroutineStart = CoroutineStart.DEFAULT,
224-
block: suspend () -> T
225-
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
226-
val oldContext = uCont.context
227-
// fast path #1 if there is no change in the actual context:
228-
if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
229-
return@sc block.startCoroutineUninterceptedOrReturn(uCont)
230-
// compute new context
231-
val newContext = oldContext + context
232-
// fast path #2 if the result is actually the same
233-
if (newContext === oldContext)
234-
return@sc block.startCoroutineUninterceptedOrReturn(uCont)
235-
// fast path #3 if the new dispatcher is the same as the old one.
236-
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
237-
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
238-
val newContinuation = RunContinuationUnintercepted(newContext, uCont)
239-
// There are some other changes in the context, so this thread needs to be updated
240-
withCoroutineContext(newContext) {
241-
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
242-
}
243-
}
244-
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
245-
require(!start.isLazy) { "$start start is not supported" }
246-
val completion = RunCompletion(
247-
context = newContext,
248-
delegate = uCont.intercepted(), // delegate to continuation intercepted with old dispatcher on completion
249-
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
250-
)
251-
completion.initParentJobInternal(newContext[Job]) // attach to job
252-
start(block, completion)
253-
completion.getResult()
254-
}
231+
withContext(context, block)
255232

256233
/** @suppress **Deprecated**: Binary compatibility */
257234
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
@@ -261,7 +238,7 @@ public suspend fun <T> withContext0(
261238
start: CoroutineStart = CoroutineStart.DEFAULT,
262239
block: suspend () -> T
263240
): T =
264-
withContextImpl(context, start, block)
241+
withContext(context) { block() }
265242

266243
/** @suppress **Deprecated**: Renamed to [withContext]. */
267244
@Deprecated(message = "Renamed to `withContext`", level=DeprecationLevel.WARNING,
@@ -271,12 +248,12 @@ public suspend fun <T> run(
271248
start: CoroutineStart = CoroutineStart.DEFAULT,
272249
block: suspend () -> T
273250
): T =
274-
withContextImpl(context, start, block)
251+
withContext(context) { block() }
275252

276253
/** @suppress **Deprecated** */
277254
@Deprecated(message = "It is here for binary compatibility only", level=DeprecationLevel.HIDDEN)
278255
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
279-
withContextImpl(context, start = CoroutineStart.ATOMIC, block = block)
256+
withContext(context) { block() }
280257

281258
// --------------- implementation ---------------
282259

@@ -297,29 +274,61 @@ private class LazyStandaloneCoroutine(
297274
}
298275
}
299276

300-
private class RunContinuationUnintercepted<in T>(
301-
override val context: CoroutineContext,
302-
private val continuation: Continuation<T>
303-
): Continuation<T> {
304-
override fun resume(value: T) {
305-
withCoroutineContext(continuation.context) {
306-
continuation.resume(value)
277+
// Used by withContext when context changes, but dispatcher stays the same
278+
private class UndispatchedCoroutine<in T>(
279+
context: CoroutineContext,
280+
uCont: Continuation<T>
281+
) : ScopeCoroutine<T>(context, uCont) {
282+
override val defaultResumeMode: Int get() = MODE_UNDISPATCHED
283+
}
284+
285+
private const val UNDECIDED = 0
286+
private const val SUSPENDED = 1
287+
private const val RESUMED = 2
288+
289+
// Used by withContext when context dispatcher changes
290+
private class DispatchedCoroutine<in T>(
291+
context: CoroutineContext,
292+
uCont: Continuation<T>
293+
) : ScopeCoroutine<T>(context, uCont) {
294+
override val defaultResumeMode: Int get() = MODE_CANCELLABLE
295+
296+
// this is copy-and-paste of a decision state machine inside AbstractionContinuation
297+
// todo: we may some-how abstract it via inline class
298+
private val _decision = atomic(UNDECIDED)
299+
300+
private fun trySuspend(): Boolean {
301+
_decision.loop { decision ->
302+
when (decision) {
303+
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
304+
RESUMED -> return false
305+
else -> error("Already suspended")
306+
}
307307
}
308308
}
309309

310-
override fun resumeWithException(exception: Throwable) {
311-
withCoroutineContext(continuation.context) {
312-
continuation.resumeWithException(exception)
310+
private fun tryResume(): Boolean {
311+
_decision.loop { decision ->
312+
when (decision) {
313+
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
314+
SUSPENDED -> return false
315+
else -> error("Already resumed")
316+
}
313317
}
314318
}
315-
}
316319

317-
@Suppress("UNCHECKED_CAST")
318-
private class RunCompletion<in T>(
319-
override val context: CoroutineContext,
320-
delegate: Continuation<T>,
321-
resumeMode: Int
322-
) : AbstractContinuation<T>(delegate, resumeMode) {
320+
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
321+
if (tryResume()) return // completed before getResult invocation -- bail out
322+
// otherwise, getResult has already commenced, i.e. completed later or in other thread
323+
super.onCompletionInternal(state, mode, suppressed)
324+
}
323325

324-
override val useCancellingState: Boolean get() = true
326+
fun getResult(): Any? {
327+
if (trySuspend()) return COROUTINE_SUSPENDED
328+
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
329+
val state = this.state
330+
if (state is CompletedExceptionally) throw state.cause
331+
@Suppress("UNCHECKED_CAST")
332+
return state as T
333+
}
325334
}

0 commit comments

Comments
 (0)