Skip to content

Commit 5f2413a

Browse files
elizarovqwwdfsad
authored andcommitted
Get rid of AbstractCoroutine.defaultResumeMode
It was only needed to customize resume mode from within of AbstractCoroutine.resumeWith method. This is now achieved by a separate open fun afterResume. Also, afterCompletionInternal is now renamed to afterCompletion and is only used from the job was completed from unknown context and thus should be resumed in a default way. This "default way" is now "cancellable resume" (to minimize the amount of code duplication) which does not introduce any functional difference, since that only happens on cancellation. Remove MODE_DIRECT resume mode
1 parent ff9060c commit 5f2413a

File tree

11 files changed

+88
-85
lines changed

11 files changed

+88
-85
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
22
protected final field parentContext Lkotlin/coroutines/CoroutineContext;
33
public fun <init> (Lkotlin/coroutines/CoroutineContext;Z)V
44
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
5+
protected fun afterResume (Ljava/lang/Object;)V
56
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
67
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
78
public fun isActive ()Z
@@ -380,7 +381,7 @@ public final class kotlinx/coroutines/JobKt {
380381

381382
public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlinx/coroutines/Job, kotlinx/coroutines/ParentJob, kotlinx/coroutines/selects/SelectClause0 {
382383
public fun <init> (Z)V
383-
protected fun afterCompletionInternal (Ljava/lang/Object;I)V
384+
protected fun afterCompletion (Ljava/lang/Object;)V
384385
public final fun attachChild (Lkotlinx/coroutines/ChildJob;)Lkotlinx/coroutines/ChildHandle;
385386
public synthetic fun cancel ()V
386387
public synthetic fun cancel (Ljava/lang/Throwable;)Z

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,17 @@ public abstract class AbstractCoroutine<in T>(
102102
onCompleted(state as T)
103103
}
104104

105-
internal open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT
106-
107105
/**
108106
* Completes execution of this with coroutine with the specified result.
109107
*/
110108
public final override fun resumeWith(result: Result<T>) {
111109
val state = makeCompletingOnce(result.toState())
112110
if (state === COMPLETING_WAITING_CHILDREN) return
113-
afterCompletionInternal(state, defaultResumeMode)
111+
afterResume(state)
114112
}
115113

114+
protected open fun afterResume(state: Any?) = afterCompletion(state)
115+
116116
internal final override fun handleOnCompletionException(exception: Throwable) {
117117
handleCoroutineException(context, exception)
118118
}

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:JvmMultifileClass
@@ -142,20 +142,20 @@ public suspend fun <T> withContext(
142142
newContext.checkCompletion()
143143
// FAST PATH #1 -- new context is the same as the old one
144144
if (newContext === oldContext) {
145-
val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT
145+
val coroutine = ScopeCoroutine(newContext, uCont)
146146
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
147147
}
148148
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
149149
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
150150
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
151-
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
151+
val coroutine = UndispatchedCoroutine(newContext, uCont)
152152
// There are changes in the context, so this thread needs to be updated
153153
withCoroutineContext(newContext, null) {
154154
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
155155
}
156156
}
157157
// SLOW PATH -- use new dispatcher
158-
val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_CANCELLABLE
158+
val coroutine = DispatchedCoroutine(newContext, uCont)
159159
coroutine.initParentJob()
160160
block.startCoroutineCancellable(coroutine, coroutine)
161161
coroutine.getResult()
@@ -200,7 +200,13 @@ private class UndispatchedCoroutine<in T>(
200200
context: CoroutineContext,
201201
uCont: Continuation<T>
202202
) : ScopeCoroutine<T>(context, uCont) {
203-
override val defaultResumeMode: Int get() = MODE_UNDISPATCHED
203+
override fun afterResume(state: Any?) {
204+
// resume undispatched -- update context by stay on the same dispatcher
205+
val result = recoverResult(state, uCont)
206+
withCoroutineContext(uCont.context, null) {
207+
uCont.resumeWith(result)
208+
}
209+
}
204210
}
205211

206212
private const val UNDECIDED = 0
@@ -212,8 +218,6 @@ private class DispatchedCoroutine<in T>(
212218
context: CoroutineContext,
213219
uCont: Continuation<T>
214220
) : ScopeCoroutine<T>(context, uCont) {
215-
override val defaultResumeMode: Int get() = MODE_CANCELLABLE
216-
217221
// this is copy-and-paste of a decision state machine inside AbstractionContinuation
218222
// todo: we may some-how abstract it via inline class
219223
private val _decision = atomic(UNDECIDED)
@@ -238,10 +242,16 @@ private class DispatchedCoroutine<in T>(
238242
}
239243
}
240244

241-
override fun afterCompletionInternal(state: Any?, mode: Int) {
245+
override fun afterCompletion(state: Any?) {
246+
// Call afterResume from afterCompletion and not vice-versa, because stack-size is more
247+
// important for afterResume implementation
248+
afterResume(state)
249+
}
250+
251+
override fun afterResume(state: Any?) {
242252
if (tryResume()) return // completed before getResult invocation -- bail out
243-
// otherwise, getResult has already commenced, i.e. completed later or in other thread
244-
super.afterCompletionInternal(state, mode)
253+
// Resume in a cancellable way because we have to switch back to the original dispatcher
254+
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
245255
}
246256

247257
fun getResult(): Any? {

kotlinx-coroutines-core/common/src/CompletedExceptionally.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.internal.*
89
import kotlin.coroutines.*
910
import kotlin.jvm.*
1011

1112
internal fun <T> Result<T>.toState(): Any? =
1213
if (isSuccess) getOrThrow() else CompletedExceptionally(exceptionOrNull()!!) // todo: need to do it better
1314

15+
@Suppress("RESULT_CLASS_IN_RETURN_TYPE", "UNCHECKED_CAST")
16+
internal fun <T> recoverResult(state: Any?, uCont: Continuation<T>): Result<T> =
17+
if (state is CompletedExceptionally)
18+
Result.failure(recoverStackTrace(state.cause, uCont))
19+
else
20+
Result.success(state as T)
21+
1422
/**
1523
* Class for an internal state of a job that was cancelled (completed exceptionally).
1624
*

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -194,16 +194,16 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
194194
// Finalizes Finishing -> Completed (terminal state) transition.
195195
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
196196
// Returns final state that was created and updated to
197-
private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
197+
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
198198
/*
199199
* Note: proposed state can be Incomplete, e.g.
200200
* async {
201201
* something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
202202
* }
203203
*/
204-
require(this.state === state) // consistency check -- it cannot change
205-
require(!state.isSealed) // consistency check -- cannot be sealed yet
206-
require(state.isCompleting) // consistency check -- must be marked as completing
204+
assert { this.state === state } // consistency check -- it cannot change
205+
assert { !state.isSealed } // consistency check -- cannot be sealed yet
206+
assert { state.isCompleting } // consistency check -- must be marked as completing
207207
val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
208208
// Create the final exception and seal the state so that no more exceptions can be added
209209
var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
@@ -233,7 +233,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
233233
if (!wasCancelling) onCancelling(finalException)
234234
onCompletionInternal(finalState)
235235
// Then CAS to completed state -> it must succeed
236-
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
236+
val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
237+
assert { casSuccess }
237238
// And process all post-completion actions
238239
completeStateFinalization(state, finalState)
239240
return finalState
@@ -269,7 +270,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
269270
}
270271

271272
// fast-path method to finalize normally completed coroutines without children
272-
// returns true if complete, and afterCompletionInternal(update, mode) shall be called
273+
// returns true if complete, and afterCompletion(update) shall be called
273274
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
274275
assert { state is Empty || state is JobNode<*> } // only simple state without lists where children can concurrently add
275276
assert { update !is CompletedExceptionally } // only for normal completion
@@ -495,10 +496,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
495496

496497
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
497498
return if (onCancelling)
498-
(handler as? JobCancellingNode<*>)?.also { require(it.job === this) }
499+
(handler as? JobCancellingNode<*>)?.also { assert { it.job === this } }
499500
?: InvokeOnCancelling(this, handler)
500501
else
501-
(handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellingNode) }
502+
(handler as? JobNode<*>)?.also { assert { it.job === this && it !is JobCancellingNode } }
502503
?: InvokeOnCompletion(this, handler)
503504
}
504505

@@ -653,7 +654,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
653654
finalState === COMPLETING_WAITING_CHILDREN -> true
654655
finalState === TOO_LATE_TO_CANCEL -> false
655656
else -> {
656-
afterCompletionInternal(finalState, MODE_ATOMIC_DEFAULT)
657+
afterCompletion(finalState)
657658
true
658659
}
659660
}
@@ -663,7 +664,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
663664
// It contains a loop and never returns COMPLETING_RETRY, can return
664665
// COMPLETING_ALREADY -- if already completed/completing
665666
// COMPLETING_WAITING_CHILDREN -- if started waiting for children
666-
// final state -- when completed, for call to afterCompletionInternal
667+
// final state -- when completed, for call to afterCompletion
667668
private fun cancelMakeCompleting(cause: Any?): Any? {
668669
loopOnState { state ->
669670
if (state !is Incomplete || state is Finishing && state.isCompleting) {
@@ -703,7 +704,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
703704
// COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
704705
// COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
705706
// TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
706-
// final state -- when completed, for call to afterCompletionInternal
707+
// final state -- when completed, for call to afterCompletion
707708
private fun makeCancelling(cause: Any?): Any? {
708709
var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
709710
loopOnState { state ->
@@ -786,7 +787,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
786787
finalState === COMPLETING_WAITING_CHILDREN -> return true
787788
finalState === COMPLETING_RETRY -> return@loopOnState
788789
else -> {
789-
afterCompletionInternal(finalState, MODE_ATOMIC_DEFAULT)
790+
afterCompletion(finalState)
790791
return true
791792
}
792793
}
@@ -798,7 +799,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
798799
* It throws [IllegalStateException] on repeated invocation (when this job is already completing).
799800
* Returns:
800801
* * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
801-
* * Final state otherwise (caller should do [afterCompletionInternal])
802+
* * Final state otherwise (caller should do [afterCompletion])
802803
*/
803804
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
804805
loopOnState { state ->
@@ -819,7 +820,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
819820
// COMPLETING_ALREADY -- when already complete or completing
820821
// COMPLETING_RETRY -- when need to retry due to interference
821822
// COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
822-
// final state -- when completed, for call to afterCompletionInternal
823+
// final state -- when completed, for call to afterCompletion
823824
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
824825
if (state !is Incomplete)
825826
return COMPLETING_ALREADY
@@ -844,7 +845,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
844845
// COMPLETING_ALREADY -- when already complete or completing
845846
// COMPLETING_RETRY -- when need to retry due to interference
846847
// COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
847-
// final state -- when completed, for call to afterCompletionInternal
848+
// final state -- when completed, for call to afterCompletion
848849
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
849850
// get state's list or else promote to list to correctly operate on child lists
850851
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
@@ -866,7 +867,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
866867
if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
867868
}
868869
// ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
869-
require(!finishing.isSealed) // cannot be sealed
870+
assert { !finishing.isSealed } // cannot be sealed
870871
// add new proposed exception to the finishing state
871872
val wasCancelling = finishing.isCancelling
872873
(proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
@@ -880,7 +881,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
880881
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
881882
return COMPLETING_WAITING_CHILDREN
882883
// otherwise -- we have not children left (all were already cancelled?)
883-
return tryFinalizeFinishingState(finishing, proposedUpdate)
884+
return finalizeFinishingState(finishing, proposedUpdate)
884885
}
885886

886887
private val Any?.exceptionOrNull: Throwable?
@@ -903,14 +904,14 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
903904

904905
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
905906
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
906-
require(this.state === state) // consistency check -- it cannot change while we are waiting for children
907+
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
907908
// figure out if we need to wait for next child
908909
val waitChild = lastChild.nextChild()
909910
// try wait for next child
910911
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
911912
// no more children to wait -- try update state
912-
val finalState = tryFinalizeFinishingState(state, proposedUpdate)
913-
afterCompletionInternal(finalState, MODE_ATOMIC_DEFAULT)
913+
val finalState = finalizeFinishingState(state, proposedUpdate)
914+
afterCompletion(finalState)
914915
}
915916

916917
private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
@@ -1014,14 +1015,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10141015
protected open fun onCompletionInternal(state: Any?) {}
10151016

10161017
/**
1017-
* Override for the very last action on job's completion to resume the rest of the code in scoped coroutines.
1018-
*
1019-
* @param state the final state.
1020-
* @param mode completion mode.
1018+
* Override for the very last action on job's completion to resume the rest of the code in
1019+
* scoped coroutines. It is called when this job is externally completed in an unknown
1020+
* context and thus should resume with a default mode.
10211021
*
10221022
* @suppress **This is unstable API and it is subject to change.**
10231023
*/
1024-
protected open fun afterCompletionInternal(state: Any?, mode: Int) {}
1024+
protected open fun afterCompletion(state: Any?) {}
10251025

10261026
// for nicer debugging
10271027
public override fun toString(): String =

kotlinx-coroutines-core/common/src/ResumeMode.kt

Lines changed: 0 additions & 13 deletions
This file was deleted.

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import kotlinx.coroutines.internal.*
88
import kotlin.coroutines.*
99
import kotlin.jvm.*
1010

11+
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
12+
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
13+
@PublishedApi internal const val MODE_UNDISPATCHED = 2 // when the thread is right, but need to mark it with current coroutine
14+
15+
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
16+
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
17+
1118
internal abstract class DispatchedTask<in T>(
1219
@JvmField public var resumeMode: Int
1320
) : SchedulerTask() {
@@ -89,7 +96,7 @@ internal abstract class DispatchedTask<in T>(
8996
}
9097
}
9198

92-
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
99+
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
93100
val delegate = this.delegate
94101
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
95102
// dispatch directly using this instance's Runnable implementation
@@ -125,7 +132,6 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
125132
when (useMode) {
126133
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
127134
MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
128-
MODE_DIRECT -> ((delegate as? DispatchedContinuation)?.continuation ?: delegate).resumeWith(result)
129135
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
130136
else -> error("Invalid mode $useMode")
131137
}

0 commit comments

Comments
 (0)