Skip to content

Commit f29203c

Browse files
committed
MPP: Ported DispatchedTask perf improvements to JS and more code is made common
1 parent c46b6c2 commit f29203c

File tree

21 files changed

+450
-564
lines changed

21 files changed

+450
-564
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonCoroutineContext.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlin.coroutines.experimental.CoroutineContext
19+
import kotlin.coroutines.experimental.*
2020

2121
public expect object Unconfined : CoroutineDispatcher {
2222
override fun isDispatchNeeded(context: CoroutineContext): Boolean
2323
override fun dispatch(context: CoroutineContext, block: Runnable)
2424
}
25+
26+
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T
27+
internal expect fun Continuation<*>.toDebugString(): String
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
20+
21+
public expect open class CancellationException(message: String) : IllegalStateException
22+
23+
public expect class JobCancellationException(
24+
message: String,
25+
cause: Throwable?,
26+
job: Job
27+
) : CancellationException {
28+
val job: Job
29+
}
30+
31+
public expect class TimeoutCancellationException public constructor(message: String)
32+
33+
internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,6 @@ public expect interface DisposableHandle {
4545
public fun dispose()
4646
}
4747

48-
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
49-
50-
public open expect class CancellationException(message: String) : IllegalStateException
51-
52-
public expect class JobCancellationException(
53-
message: String,
54-
cause: Throwable?,
55-
job: Job
56-
) : CancellationException {
57-
val job: Job
58-
}
59-
6048
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
6149
public expect fun CoroutineContext.cancel(cause: Throwable? = null): Boolean
6250
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonScheduled.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,3 @@ package kotlinx.coroutines.experimental
1919
public expect suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T
2020

2121
public expect suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T?
22-
23-
public expect class TimeoutCancellationException public constructor(message: String)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
package kotlinx.coroutines.experimental
1818

1919
/**
20-
* Class for a [state] of a job that had completed exceptionally, including cancellation.
20+
* Class for an internal state of a job that had completed exceptionally, including cancellation.
21+
*
22+
* **Note: This class cannot be used outside of internal coroutines framework**.
2123
*
2224
* @param cause the exceptional completion cause. If `cause` is null, then an exception is
2325
* if created via [createException] on first get from [exception] property.
2426
* @param allowNullCause if `null` cause is allowed.
2527
* @suppress **This is unstable API and it is subject to change.**
2628
*/
2729
public open class CompletedExceptionally protected constructor(
28-
public @JvmField val cause: Throwable?,
30+
public val cause: Throwable?,
2931
allowNullCause: Boolean
3032
) {
3133
/**
@@ -50,5 +52,23 @@ public open class CompletedExceptionally protected constructor(
5052

5153
protected open fun createException(): Throwable = error("Completion exception was not specified")
5254

53-
override fun toString(): String = "${this::class.java.simpleName}[$exception]"
54-
}
55+
override fun toString(): String = "${this::class.simpleName}[$exception]"
56+
}
57+
58+
/**
59+
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
60+
*
61+
* **Note: This class cannot be used outside of internal coroutines framework**.
62+
*
63+
* @param job the job that was cancelled.
64+
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException]
65+
* if created on first get from [exception] property.
66+
* @suppress **This is unstable API and it is subject to change.**
67+
*/
68+
public class Cancelled(
69+
private val job: Job,
70+
cause: Throwable?
71+
) : CompletedExceptionally(cause, true) {
72+
override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job)
73+
}
74+
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlinx.coroutines.experimental.internal.*
20+
import kotlin.coroutines.experimental.*
21+
22+
@Suppress("PrivatePropertyName")
23+
private val UNDEFINED = Symbol("UNDEFINED")
24+
25+
internal class DispatchedContinuation<in T>(
26+
val dispatcher: CoroutineDispatcher,
27+
val continuation: Continuation<T>
28+
) : Continuation<T> by continuation, DispatchedTask<T> {
29+
private var _state: Any? = UNDEFINED
30+
public override var resumeMode: Int = 0
31+
32+
override fun takeState(): Any? {
33+
val state = _state
34+
check(state !== UNDEFINED) // fail-fast if repeatedly invoked
35+
_state = UNDEFINED
36+
return state
37+
}
38+
39+
override val delegate: Continuation<T>
40+
get() = this
41+
42+
override fun resume(value: T) {
43+
val context = continuation.context
44+
if (dispatcher.isDispatchNeeded(context)) {
45+
_state = value
46+
resumeMode = MODE_ATOMIC_DEFAULT
47+
dispatcher.dispatch(context, this)
48+
} else
49+
resumeUndispatched(value)
50+
}
51+
52+
override fun resumeWithException(exception: Throwable) {
53+
val context = continuation.context
54+
if (dispatcher.isDispatchNeeded(context)) {
55+
_state = CompletedExceptionally(exception)
56+
resumeMode = MODE_ATOMIC_DEFAULT
57+
dispatcher.dispatch(context, this)
58+
} else
59+
resumeUndispatchedWithException(exception)
60+
}
61+
62+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
63+
inline fun resumeCancellable(value: T) {
64+
val context = continuation.context
65+
if (dispatcher.isDispatchNeeded(context)) {
66+
_state = value
67+
resumeMode = MODE_CANCELLABLE
68+
dispatcher.dispatch(context, this)
69+
} else
70+
resumeUndispatched(value)
71+
}
72+
73+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
74+
inline fun resumeCancellableWithException(exception: Throwable) {
75+
val context = continuation.context
76+
if (dispatcher.isDispatchNeeded(context)) {
77+
_state = CompletedExceptionally(exception)
78+
resumeMode = MODE_CANCELLABLE
79+
dispatcher.dispatch(context, this)
80+
} else
81+
resumeUndispatchedWithException(exception)
82+
}
83+
84+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
85+
inline fun resumeUndispatched(value: T) {
86+
withCoroutineContext(context) {
87+
continuation.resume(value)
88+
}
89+
}
90+
91+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
92+
inline fun resumeUndispatchedWithException(exception: Throwable) {
93+
withCoroutineContext(context) {
94+
continuation.resumeWithException(exception)
95+
}
96+
}
97+
98+
// used by "yield" implementation
99+
internal fun dispatchYield(value: T) {
100+
val context = continuation.context
101+
_state = value
102+
resumeMode = MODE_CANCELLABLE
103+
dispatcher.dispatch(context, this)
104+
}
105+
106+
override fun toString(): String =
107+
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
108+
}
109+
110+
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
111+
is DispatchedContinuation -> resumeCancellable(value)
112+
else -> resume(value)
113+
}
114+
115+
internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
116+
is DispatchedContinuation -> resumeCancellableWithException(exception)
117+
else -> resumeWithException(exception)
118+
}
119+
120+
internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
121+
is DispatchedContinuation -> continuation.resume(value)
122+
else -> resume(value)
123+
}
124+
125+
internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
126+
is DispatchedContinuation -> continuation.resumeWithException(exception)
127+
else -> resumeWithException(exception)
128+
}
129+
130+
/**
131+
* @suppress **This is unstable API and it is subject to change.**
132+
*/
133+
public interface DispatchedTask<in T> : Runnable {
134+
public val delegate: Continuation<T>
135+
public val resumeMode: Int get() = MODE_CANCELLABLE
136+
137+
public fun takeState(): Any?
138+
139+
@Suppress("UNCHECKED_CAST")
140+
public fun <T> getSuccessfulResult(state: Any?): T =
141+
state as T
142+
143+
public fun getExceptionalResult(state: Any?): Throwable? =
144+
(state as? CompletedExceptionally)?.exception
145+
146+
public override fun run() {
147+
try {
148+
val delegate = delegate as DispatchedContinuation<T>
149+
val continuation = delegate.continuation
150+
val context = continuation.context
151+
val job = if (resumeMode.isCancellableMode) context[Job] else null
152+
val state = takeState() // NOTE: Must take state in any case, even if cancelled
153+
withCoroutineContext(context) {
154+
if (job != null && !job.isActive)
155+
continuation.resumeWithException(job.getCancellationException())
156+
else {
157+
val exception = getExceptionalResult(state)
158+
if (exception != null)
159+
continuation.resumeWithException(exception)
160+
else
161+
continuation.resume(getSuccessfulResult(state))
162+
}
163+
}
164+
} catch (e: Throwable) {
165+
throw DispatchException("Unexpected exception running $this", e)
166+
}
167+
}
168+
}
169+
170+
/**
171+
* @suppress **This is unstable API and it is subject to change.**
172+
*/
173+
public fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
174+
var useMode = mode
175+
val delegate = this.delegate
176+
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
177+
// dispatch directly using this instance's Runnable implementation
178+
val dispatcher = delegate.dispatcher
179+
val context = delegate.context
180+
if (dispatcher.isDispatchNeeded(context)
181+
) {
182+
dispatcher.dispatch(context, this)
183+
return // and that's it -- dispatched via fast-path
184+
} else {
185+
useMode = MODE_UNDISPATCHED
186+
}
187+
}
188+
// slow-path - use delegate
189+
val state = takeState()
190+
val exception = getExceptionalResult(state)
191+
if (exception != null) {
192+
delegate.resumeWithExceptionMode(exception, useMode)
193+
} else {
194+
delegate.resumeMode(getSuccessfulResult(state), useMode)
195+
}
196+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package kotlinx.coroutines.experimental.internal
1818

19-
/** @suppress **This is unstable API and it is subject to change.** */
19+
/**
20+
* A symbol class that is used to define unique constants that are self-explanatory in debugger.
21+
*
22+
* @suppress **This is unstable API and it is subject to change.**
23+
*/
2024
public class Symbol(val symbol: String) {
2125
override fun toString(): String = symbol
2226
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public fun newCoroutineContext(context: CoroutineContext, parent: Job? = null):
104104
/**
105105
* Executes a block using a given coroutine context.
106106
*/
107-
internal inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
107+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
108108
val oldName = context.updateThreadContext()
109109
try {
110110
return block()

0 commit comments

Comments
 (0)