Skip to content

Commit f2239e1

Browse files
committed
Performance: Remove DispatchTask, store value in DispatchedContinuation
1 parent fa13beb commit f2239e1

File tree

7 files changed

+180
-136
lines changed

7 files changed

+180
-136
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@ private const val RESUMED = 2
2828
/**
2929
* @suppress **This is unstable API and it is subject to change.**
3030
*/
31-
// Note: it also works directly as DispatchTask for this delegate
3231
internal abstract class AbstractContinuation<in T>(
33-
@JvmField protected val delegate: Continuation<T>,
34-
@JvmField protected val resumeMode: Int
35-
) : JobSupport(true), Continuation<T>, Runnable {
32+
public final override val delegate: Continuation<T>,
33+
public final override val resumeMode: Int
34+
) : JobSupport(true), Continuation<T>, DispatchedTask<T> {
3635
private val _decision = atomic(UNDECIDED)
3736

3837
/* decision state machine
@@ -50,6 +49,8 @@ internal abstract class AbstractContinuation<in T>(
5049
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
5150
*/
5251

52+
override fun takeState(): Any? = state
53+
5354
private fun trySuspend(): Boolean {
5455
_decision.loop { decision ->
5556
when (decision) {
@@ -82,30 +83,9 @@ internal abstract class AbstractContinuation<in T>(
8283
override fun afterCompletion(state: Any?, mode: Int) {
8384
if (tryResume()) return // completed before getResult invocation -- bail out
8485
// otherwise, getResult has already commenced, i.e. completed later or in other thread
85-
var useMode = mode
86-
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
87-
// dispatch directly using this instance's Runnable implementation
88-
val dispatcher = delegate.dispatcher
89-
val context = delegate.context
90-
if (dispatcher.isDispatchNeeded(context)) {
91-
dispatcher.dispatch(context, this)
92-
return // and that's it -- dispatched via fast-path
93-
} else {
94-
useMode = MODE_UNDISPATCHED
95-
}
96-
}
97-
// slow-path - use delegate
98-
if (state is CompletedExceptionally) {
99-
delegate.resumeWithExceptionMode(state.exception, useMode)
100-
} else {
101-
delegate.resumeMode(getSuccessfulResult(state), useMode)
102-
}
86+
dispatch(mode)
10387
}
10488

105-
@Suppress("UNCHECKED_CAST")
106-
protected open fun <T> getSuccessfulResult(state: Any?): T =
107-
state as T
108-
10989
override fun resume(value: T) =
11090
resumeImpl(value, resumeMode)
11191

@@ -132,24 +112,4 @@ internal abstract class AbstractContinuation<in T>(
132112
override fun handleException(exception: Throwable) {
133113
handleCoroutineException(context, exception)
134114
}
135-
136-
// see all DispatchTask.run with the same logic
137-
override fun run() {
138-
delegate as DispatchedContinuation // type assertion
139-
try {
140-
val context = delegate.context
141-
val job = if (resumeMode.isCancellableMode) context[Job] else null
142-
val state = this.state
143-
val continuation = delegate.continuation
144-
withCoroutineContext(context) {
145-
when {
146-
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
147-
state is CompletedExceptionally -> continuation.resumeWithException(state.exception)
148-
else -> continuation.resume(getSuccessfulResult(state))
149-
}
150-
}
151-
} catch (e: Throwable) {
152-
throw RuntimeException("Unexpected exception running $this", e)
153-
}
154-
}
155115
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
/**
20+
* Class for a [state] of a job that had completed exceptionally, including cancellation.
21+
*
22+
* @param cause the exceptional completion cause. If `cause` is null, then an exception is
23+
* if created via [createException] on first get from [exception] property.
24+
* @param allowNullCause if `null` cause is allowed.
25+
* @suppress **This is unstable API and it is subject to change.**
26+
*/
27+
public open class CompletedExceptionally protected constructor(
28+
public @JvmField val cause: Throwable?,
29+
allowNullCause: Boolean
30+
) {
31+
/**
32+
* Creates exceptionally completed state.
33+
* @param cause the exceptional completion cause.
34+
*/
35+
public constructor(cause: Throwable) : this(cause, false)
36+
37+
@Volatile
38+
private var _exception: Throwable? = cause // will materialize JobCancellationException on first need
39+
40+
init {
41+
require(allowNullCause || cause != null) { "Null cause is not allowed" }
42+
}
43+
44+
/**
45+
* Returns completion exception.
46+
*/
47+
public val exception: Throwable get() =
48+
_exception ?: // atomic read volatile var or else create new
49+
createException().also { _exception = it }
50+
51+
protected open fun createException(): Throwable = error("Completion exception was not specified")
52+
53+
override fun toString(): String = "${this::class.java.simpleName}[$exception]"
54+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 107 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlinx.coroutines.experimental.internal.*
1920
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
2021
import kotlin.coroutines.experimental.Continuation
2122
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -109,70 +110,65 @@ public actual abstract class CoroutineDispatcher actual constructor() :
109110
*/
110111
public actual typealias Runnable = java.lang.Runnable
111112

112-
// named class for ease of debugging, better stack-traces and optimize the number of anonymous classes
113-
// note that CancellableContinuationImpl directly works as DispatchTask
114-
internal class DispatchTask<in T>(
115-
private val continuation: Continuation<T>,
116-
private val value: Any?, // T | Throwable
117-
private val exception: Boolean,
118-
private val cancellable: Boolean
119-
) : Runnable {
120-
@Suppress("UNCHECKED_CAST")
121-
override fun run() {
122-
try {
123-
val context = continuation.context
124-
val job = if (cancellable) context[Job] else null
125-
withCoroutineContext(context) {
126-
when {
127-
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
128-
exception -> continuation.resumeWithException(value as Throwable)
129-
else -> continuation.resume(value as T)
130-
}
131-
}
132-
} catch (e: Throwable) {
133-
throw RuntimeException("Unexpected exception running $this", e)
134-
}
135-
}
136-
137-
override fun toString(): String =
138-
"DispatchTask[${continuation.toDebugString()}, cancellable=$cancellable, value=${value.toSafeString()}]"
139-
}
113+
@Suppress("PrivatePropertyName")
114+
private val UNDEFINED = Symbol("UNDEFINED")
140115

141116
internal class DispatchedContinuation<in T>(
142117
@JvmField val dispatcher: CoroutineDispatcher,
143118
@JvmField val continuation: Continuation<T>
144-
): Continuation<T> by continuation {
119+
): Continuation<T> by continuation, DispatchedTask<T> {
120+
private var _state: Any? = UNDEFINED
121+
public override var resumeMode: Int = 0
122+
123+
override fun takeState(): Any? {
124+
val state = _state
125+
check(state !== UNDEFINED) // fail-fast if repeatedly invoked
126+
_state = UNDEFINED
127+
return state
128+
}
129+
130+
override val delegate: Continuation<T>
131+
get() = this
132+
145133
override fun resume(value: T) {
146134
val context = continuation.context
147-
if (dispatcher.isDispatchNeeded(context))
148-
dispatcher.dispatch(context, DispatchTask(continuation, value, exception = false, cancellable = false))
149-
else
135+
if (dispatcher.isDispatchNeeded(context)) {
136+
_state = value
137+
resumeMode = MODE_ATOMIC_DEFAULT
138+
dispatcher.dispatch(context, this)
139+
} else
150140
resumeUndispatched(value)
151141
}
152142

153143
override fun resumeWithException(exception: Throwable) {
154144
val context = continuation.context
155-
if (dispatcher.isDispatchNeeded(context))
156-
dispatcher.dispatch(context, DispatchTask(continuation, exception, exception = true, cancellable = false))
157-
else
145+
if (dispatcher.isDispatchNeeded(context)) {
146+
_state = CompletedExceptionally(exception)
147+
resumeMode = MODE_ATOMIC_DEFAULT
148+
dispatcher.dispatch(context, this)
149+
} else
158150
resumeUndispatchedWithException(exception)
159151
}
160152

161153
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
162154
inline fun resumeCancellable(value: T) {
163155
val context = continuation.context
164-
if (dispatcher.isDispatchNeeded(context))
165-
dispatcher.dispatch(context, DispatchTask(continuation, value, exception = false, cancellable = true))
166-
else
156+
if (dispatcher.isDispatchNeeded(context)) {
157+
_state = value
158+
resumeMode = MODE_CANCELLABLE
159+
dispatcher.dispatch(context, this)
160+
} else
167161
resumeUndispatched(value)
168162
}
169163

170164
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
171165
inline fun resumeCancellableWithException(exception: Throwable) {
172166
val context = continuation.context
173-
if (dispatcher.isDispatchNeeded(context))
174-
dispatcher.dispatch(context, DispatchTask(continuation, exception, exception = true, cancellable = true))
175-
else
167+
if (dispatcher.isDispatchNeeded(context)) {
168+
_state = CompletedExceptionally(exception)
169+
resumeMode = MODE_CANCELLABLE
170+
dispatcher.dispatch(context, this)
171+
} else
176172
resumeUndispatchedWithException(exception)
177173
}
178174

@@ -193,7 +189,9 @@ internal class DispatchedContinuation<in T>(
193189
// used by "yield" implementation
194190
internal fun dispatchYield(value: T) {
195191
val context = continuation.context
196-
dispatcher.dispatch(context, DispatchTask(continuation, value,false, true))
192+
_state = value
193+
resumeMode = MODE_CANCELLABLE
194+
dispatcher.dispatch(context, this)
197195
}
198196

199197
override fun toString(): String =
@@ -219,3 +217,70 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
219217
is DispatchedContinuation -> continuation.resumeWithException(exception)
220218
else -> resumeWithException(exception)
221219
}
220+
221+
/**
222+
* @suppress **This is unstable API and it is subject to change.**
223+
*/
224+
public interface DispatchedTask<in T> : Runnable {
225+
public val delegate: Continuation<T>
226+
public val resumeMode: Int get() = MODE_CANCELLABLE
227+
228+
public fun takeState(): Any?
229+
230+
@Suppress("UNCHECKED_CAST")
231+
public fun <T> getSuccessfulResult(state: Any?): T =
232+
state as T
233+
234+
public fun getExceptionalResult(state: Any?): Throwable? =
235+
(state as? CompletedExceptionally)?.exception
236+
237+
public override fun run() {
238+
try {
239+
val delegate = delegate as DispatchedContinuation<T>
240+
val continuation = delegate.continuation
241+
val context = continuation.context
242+
val job = if (resumeMode.isCancellableMode) context[Job] else null
243+
val state = takeState() // NOTE: Must take state in any case, even if cancelled
244+
withCoroutineContext(context) {
245+
if (job != null && !job.isActive)
246+
continuation.resumeWithException(job.getCancellationException())
247+
else {
248+
val exception = getExceptionalResult(state)
249+
if (exception != null)
250+
continuation.resumeWithException(exception)
251+
else
252+
continuation.resume(getSuccessfulResult(state))
253+
}
254+
}
255+
} catch (e: Throwable) {
256+
throw RuntimeException("Unexpected exception running $this", e)
257+
}
258+
}
259+
}
260+
261+
/**
262+
* @suppress **This is unstable API and it is subject to change.**
263+
*/
264+
public fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
265+
var useMode = mode
266+
val delegate = this.delegate
267+
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
268+
// dispatch directly using this instance's Runnable implementation
269+
val dispatcher = delegate.dispatcher
270+
val context = delegate.context
271+
if (dispatcher.isDispatchNeeded(context)) {
272+
dispatcher.dispatch(context, this)
273+
return // and that's it -- dispatched via fast-path
274+
} else {
275+
useMode = MODE_UNDISPATCHED
276+
}
277+
}
278+
// slow-path - use delegate
279+
val state = takeState()
280+
val exception = getExceptionalResult(state)
281+
if (exception != null) {
282+
delegate.resumeWithExceptionMode(exception, useMode)
283+
} else {
284+
delegate.resumeMode(getSuccessfulResult(state), useMode)
285+
}
286+
}

0 commit comments

Comments
 (0)