Skip to content

Commit fd54bc4

Browse files
committed
Move undispatched thread-local event loop to dispatched continuation, execute all undispatched tasks in that loop to avoid SOE in user-supplied unconfined dispatchers and 'immediate' main dispatchers.
1 parent 1c5efc8 commit fd54bc4

File tree

2 files changed

+112
-56
lines changed

2 files changed

+112
-56
lines changed

common/kotlinx-coroutines-core-common/src/Dispatched.kt

Lines changed: 110 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,95 @@ import kotlin.coroutines.*
99
import kotlin.jvm.*
1010

1111
@Suppress("PrivatePropertyName")
12-
private val UNDEFINED = Symbol("UNDEFINED")
12+
@JvmField
13+
internal val UNDEFINED = Symbol("UNDEFINED")
14+
15+
@NativeThreadLocal
16+
internal object UndispatchedEventLoop {
17+
data class State(
18+
@JvmField var isActive: Boolean = false,
19+
@JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList()
20+
)
21+
22+
@JvmField
23+
internal val state = CommonThreadLocal { State() }
24+
25+
fun dispatch(block: Runnable) {
26+
val state = state.get()
27+
if (state.isActive) {
28+
state.threadLocalQueue.add(block)
29+
return
30+
}
31+
32+
try {
33+
state.isActive = true
34+
block.run()
35+
while (!state.threadLocalQueue.isEmpty()) {
36+
val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
37+
element.run()
38+
}
39+
} catch (e: Throwable) {
40+
/*
41+
* This exception doesn't happen normally, only if user either submitted throwing runnable
42+
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
43+
*/
44+
state.threadLocalQueue.clear()
45+
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
46+
} finally {
47+
state.isActive = false
48+
}
49+
}
50+
51+
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
52+
val state = state.get()
53+
if (state.isActive) {
54+
continuation._state = contState
55+
continuation.resumeMode = mode
56+
state.threadLocalQueue.add(continuation)
57+
return
58+
}
59+
60+
runLoop(state, block)
61+
}
62+
63+
inline fun execute(task: DispatchedTask<*>, block: () -> Unit) {
64+
val state = state.get()
65+
if (state.isActive) {
66+
state.threadLocalQueue.add(task)
67+
return
68+
}
69+
70+
runLoop(state, block)
71+
}
72+
73+
inline fun runLoop(state: State, block: () -> Unit) {
74+
try {
75+
state.isActive = true
76+
block()
77+
while (!state.threadLocalQueue.isEmpty()) {
78+
val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
79+
element.run()
80+
}
81+
} catch (e: Throwable) {
82+
/*
83+
* This exception doesn't happen normally, only if user either submitted throwing runnable
84+
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
85+
*/
86+
state.threadLocalQueue.clear()
87+
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
88+
} finally {
89+
state.isActive = false
90+
}
91+
}
92+
}
1393

1494
internal class DispatchedContinuation<in T>(
1595
@JvmField val dispatcher: CoroutineDispatcher,
1696
@JvmField val continuation: Continuation<T>
1797
) : Continuation<T> by continuation, DispatchedTask<T> {
18-
private var _state: Any? = UNDEFINED
98+
@JvmField
99+
@Suppress("PropertyName")
100+
internal var _state: Any? = UNDEFINED
19101
public override var resumeMode: Int = 0
20102

21103
override fun takeState(): Any? {
@@ -30,39 +112,48 @@ internal class DispatchedContinuation<in T>(
30112

31113
override fun resumeWith(result: Result<T>) {
32114
val context = continuation.context
115+
val state = result.toState()
33116
if (dispatcher.isDispatchNeeded(context)) {
34-
_state = result.toState()
117+
_state = state
35118
resumeMode = MODE_ATOMIC_DEFAULT
36119
dispatcher.dispatch(context, this)
37120
} else {
38-
resumeUndispatchedWith(result)
121+
UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
122+
withCoroutineContext(this.context) {
123+
continuation.resumeWith(result)
124+
}
125+
}
39126
}
40127
}
41128

42129
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
43130
inline fun resumeCancellable(value: T) {
44-
val context = continuation.context
45131
if (dispatcher.isDispatchNeeded(context)) {
46132
_state = value
47133
resumeMode = MODE_CANCELLABLE
48134
dispatcher.dispatch(context, this)
49135
} else {
50-
if (!resumeCancelled()) {
51-
resumeUndispatched(value)
136+
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
137+
if (!resumeCancelled()) {
138+
resumeUndispatched(value)
139+
}
52140
}
53141
}
54142
}
55143

56144
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
57145
inline fun resumeCancellableWithException(exception: Throwable) {
58146
val context = continuation.context
147+
val state = CompletedExceptionally(exception)
59148
if (dispatcher.isDispatchNeeded(context)) {
60149
_state = CompletedExceptionally(exception)
61150
resumeMode = MODE_CANCELLABLE
62151
dispatcher.dispatch(context, this)
63152
} else {
64-
if (!resumeCancelled()) {
65-
resumeUndispatchedWithException(exception)
153+
UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) {
154+
if (!resumeCancelled()) {
155+
resumeUndispatchedWithException(exception)
156+
}
66157
}
67158
}
68159
}
@@ -78,13 +169,6 @@ internal class DispatchedContinuation<in T>(
78169
return false
79170
}
80171

81-
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
82-
inline fun resumeUndispatchedWith(result: Result<T>) {
83-
withCoroutineContext(context) {
84-
continuation.resumeWith(result)
85-
}
86-
}
87-
88172
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
89173
inline fun resumeUndispatched(value: T) {
90174
withCoroutineContext(context) {
@@ -182,12 +266,15 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
182266
useMode = MODE_UNDISPATCHED
183267
}
184268
}
185-
// slow-path - use delegate
186-
val state = takeState()
187-
val exception = getExceptionalResult(state)
188-
if (exception != null) {
189-
delegate.resumeWithExceptionMode(exception, useMode)
190-
} else {
191-
delegate.resumeMode(getSuccessfulResult(state), useMode)
269+
270+
UndispatchedEventLoop.execute(this) {
271+
// slow-path - use delegate
272+
val state = takeState()
273+
val exception = getExceptionalResult(state)
274+
if (exception != null) {
275+
delegate.resumeWithExceptionMode(exception, useMode)
276+
} else {
277+
delegate.resumeMode(getSuccessfulResult(state), useMode)
278+
}
192279
}
193280
}

common/kotlinx-coroutines-core-common/src/Unconfined.kt

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,44 +4,13 @@
44

55
package kotlinx.coroutines
66

7-
import kotlinx.coroutines.internal.*
87
import kotlin.coroutines.*
9-
import kotlin.jvm.*
108

119
/**
1210
* A coroutine dispatcher that is not confined to any specific thread.
1311
*/
14-
@NativeThreadLocal
1512
internal object Unconfined : CoroutineDispatcher() {
16-
private data class State(@JvmField var isActive: Boolean = false,
17-
@JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList())
18-
private val state = CommonThreadLocal { State() }
19-
20-
override fun dispatch(context: CoroutineContext, block: Runnable) {
21-
// Stack-based event loop on top of thread-local arraylist
22-
val state = state.get()
23-
if (state.isActive) {
24-
state.threadLocalQueue.add(block)
25-
return
26-
}
27-
28-
try {
29-
state.isActive = true
30-
block.run()
31-
while (!state.threadLocalQueue.isEmpty()) {
32-
val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
33-
element.run()
34-
}
35-
} catch (e: Throwable) {
36-
/*
37-
* This exception doesn't happen normally, only if user either submitted throwing runnable
38-
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
39-
*/
40-
state.threadLocalQueue.clear()
41-
throw DispatchException("Unexpected exception in Unconfined loop, clearing pending tasks", e)
42-
} finally {
43-
state.isActive = false
44-
}
45-
}
13+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
14+
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
4615
override fun toString(): String = "Unconfined"
4716
}

0 commit comments

Comments
 (0)