@@ -28,10 +28,11 @@ private const val RESUMED = 2
28
28
/* *
29
29
* @suppress **This is unstable API and it is subject to change.**
30
30
*/
31
+ // Note: it also works directly as DispatchTask for this delegate
31
32
internal abstract class AbstractContinuation <in T >(
32
33
@JvmField protected val delegate : Continuation <T >,
33
34
@JvmField protected val resumeMode : Int
34
- ) : JobSupport(true ), Continuation<T> {
35
+ ) : JobSupport(true ), Continuation<T>, Runnable {
35
36
private val _decision = atomic(UNDECIDED )
36
37
37
38
/* decision state machine
@@ -81,10 +82,23 @@ internal abstract class AbstractContinuation<in T>(
81
82
override fun afterCompletion (state : Any? , mode : Int ) {
82
83
if (tryResume()) return // completed before getResult invocation -- bail out
83
84
// otherwise, getResult has already commenced, i.e. completed later or in other thread
85
+ var useMode = mode
86
+ if (mode.isDispatchedMode && delegate is DispatchedContinuation <* > && mode.isCancellableMode == resumeMode.isCancellableMode) {
87
+ // dispatch directly using this instance's Runnable implementation
88
+ val dispatcher = delegate.dispatcher
89
+ val context = delegate.context
90
+ if (dispatcher.isDispatchNeeded(context)) {
91
+ dispatcher.dispatch(context, this )
92
+ return // and that's it -- dispatched via fast-path
93
+ } else {
94
+ useMode = MODE_UNDISPATCHED
95
+ }
96
+ }
97
+ // slow-path - use delegate
84
98
if (state is CompletedExceptionally ) {
85
- delegate.resumeWithExceptionMode(state.exception, mode )
99
+ delegate.resumeWithExceptionMode(state.exception, useMode )
86
100
} else {
87
- delegate.resumeMode(getSuccessfulResult(state), mode )
101
+ delegate.resumeMode(getSuccessfulResult(state), useMode )
88
102
}
89
103
}
90
104
@@ -118,4 +132,23 @@ internal abstract class AbstractContinuation<in T>(
118
132
override fun handleException (exception : Throwable ) {
119
133
handleCoroutineException(context, exception)
120
134
}
135
+
136
+ // see all DispatchTask.run with the same logic
137
+ override fun run () {
138
+ check(delegate is DispatchedContinuation )
139
+ try {
140
+ val context = delegate.context
141
+ val job = if (resumeMode.isCancellableMode) context[Job ] else null
142
+ val state = this .state
143
+ withCoroutineContext(context) {
144
+ when {
145
+ job != null && ! job.isActive -> delegate.resumeWithException(job.getCancellationException())
146
+ state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
147
+ else -> delegate.resume(getSuccessfulResult(state))
148
+ }
149
+ }
150
+ } catch (e: Throwable ) {
151
+ throw RuntimeException (" Unexpected exception running $this " , e)
152
+ }
153
+ }
121
154
}
0 commit comments