Skip to content

Commit 7ee722a

Browse files
committed
Rework Dispatchers.Unconfined to use thread-local event loop
Fixes #704
1 parent 1e70839 commit 7ee722a

File tree

9 files changed

+211
-6
lines changed

9 files changed

+211
-6
lines changed

common/kotlinx-coroutines-core-common/src/Dispatchers.common.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ public expect object Dispatchers {
4242

4343
/**
4444
* A coroutine dispatcher that is not confined to any specific thread.
45-
* It executes initial continuation of the coroutine _immediately_ in the current call-frame
45+
* It executes initial continuation of the coroutine in the current call-frame
4646
* and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without
47-
* mandating any specific threading policy.
48-
* **Note: use with extreme caution, not for general code**.
47+
* mandating any specific threading policy. Nested coroutines launched in this dispatcher form an event-loop to avoid
48+
* stack overflows.
4949
*
5050
* Note, that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
5151
* but still want to execute it in the current call-frame until its first suspension, then you can use

common/kotlinx-coroutines-core-common/src/Unconfined.kt

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,44 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
78
import kotlin.coroutines.*
9+
import kotlin.jvm.*
810

911
/**
1012
* A coroutine dispatcher that is not confined to any specific thread.
1113
*/
14+
@NativeThreadLocal
1215
internal object Unconfined : CoroutineDispatcher() {
13-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
14-
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
16+
private data class State(@JvmField var isActive: Boolean = false,
17+
@JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList())
18+
private val state = CommonThreadLocal { State() }
19+
20+
override fun dispatch(context: CoroutineContext, block: Runnable) {
21+
// Stack-based event loop on top of thread-local arraylist
22+
val state = state.get()
23+
if (state.isActive) {
24+
state.threadLocalQueue.add(block)
25+
return
26+
}
27+
28+
try {
29+
state.isActive = true
30+
block.run()
31+
while (!state.threadLocalQueue.isEmpty()) {
32+
val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
33+
element.run()
34+
}
35+
} catch (e: Throwable) {
36+
/*
37+
* This exception doesn't happen normally, only if user either submitted throwing runnable
38+
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
39+
*/
40+
state.threadLocalQueue.clear()
41+
throw DispatchException("Unexpected exception in Unconfined loop, clearing pending tasks", e)
42+
} finally {
43+
state.isActive = false
44+
}
45+
}
1546
override fun toString(): String = "Unconfined"
1647
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
@OptionalExpectation
8+
@UseExperimental(ExperimentalMultiplatform::class)
9+
internal expect annotation class NativeThreadLocal()
10+
11+
internal expect class CommonThreadLocal<T>(supplier: () -> T) {
12+
fun get(): T
13+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.test.*
8+
9+
class UnconfinedTest : TestBase() {
10+
11+
@Test
12+
fun testOrder() = runTest {
13+
expect(1)
14+
launch(Dispatchers.Unconfined) {
15+
expect(2)
16+
launch {
17+
expect(4)
18+
launch {
19+
expect(7)
20+
}
21+
22+
launch {
23+
expect(6)
24+
}
25+
expect(5)
26+
}
27+
28+
expect(3)
29+
}
30+
31+
finish(8)
32+
}
33+
34+
@Test
35+
fun testBlockThrows() = runTest {
36+
expect(1)
37+
try {
38+
withContext(Dispatchers.Unconfined) {
39+
expect(2)
40+
withContext(Dispatchers.Unconfined + CoroutineName("a")) {
41+
expect(3)
42+
}
43+
44+
expect(4)
45+
launch(start = CoroutineStart.ATOMIC) {
46+
expect(5)
47+
}
48+
49+
throw TestException()
50+
}
51+
} catch (e: TestException) {
52+
finish(6)
53+
}
54+
}
55+
56+
@Test
57+
fun enterMultipleTimes() = runTest {
58+
launch(Unconfined) {
59+
expect(1)
60+
}
61+
62+
launch(Unconfined) {
63+
expect(2)
64+
}
65+
66+
launch(Unconfined) {
67+
expect(3)
68+
}
69+
70+
finish(4)
71+
}
72+
73+
class TestException : Throwable()
74+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import java.lang.ThreadLocal
8+
9+
internal actual typealias CommonThreadLocal<T> = ThreadLocalWithInitialValue<T>
10+
11+
internal class ThreadLocalWithInitialValue<T>(private val supplier: () -> T) : ThreadLocal<T>() {
12+
override fun initialValue(): T = supplier()
13+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import org.junit.*
8+
import org.junit.Test
9+
import java.util.concurrent.*
10+
import kotlin.test.*
11+
12+
class UnconfinedConcurrentTest : TestBase() {
13+
private val threads = 4
14+
private val executor = newFixedThreadPoolContext(threads, "UnconfinedConcurrentTest")
15+
private val threadLocal = ThreadLocal<Int>()
16+
17+
@After
18+
fun tearDown() {
19+
executor.close()
20+
}
21+
22+
@Test(timeout = 10_000L)
23+
fun testConcurrent() = runTest {
24+
val iterations = 10_000
25+
val startBarrier = CyclicBarrier(threads + 1)
26+
val finishLatch = CountDownLatch(threads)
27+
28+
repeat(threads) { id ->
29+
launch(executor) {
30+
startBarrier.await()
31+
repeat(iterations) {
32+
threadLocal.set(0)
33+
launch(Dispatchers.Unconfined) {
34+
assertEquals(0, threadLocal.get())
35+
launch(Dispatchers.Unconfined) {
36+
assertEquals(id, threadLocal.get())
37+
}
38+
39+
threadLocal.set(id)
40+
}
41+
}
42+
43+
finishLatch.countDown()
44+
}
45+
}
46+
47+
startBarrier.await()
48+
finishLatch.await()
49+
}
50+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
internal actual class CommonThreadLocal<T> actual constructor(supplier: () -> T) {
8+
private val value = supplier()
9+
actual fun get(): T = value
10+
}

native/kotlinx-coroutines-core-native/src/Dispatchers.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public actual object Dispatchers {
1212

1313
public actual val Main: MainCoroutineDispatcher = NativeMainDispatcher(Default)
1414

15-
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
15+
public actual val Unconfined: CoroutineDispatcher get() = kotlinx.coroutines.Unconfined // Avoid freezing
1616
}
1717

1818
private class NativeMainDispatcher(val delegate: CoroutineDispatcher) : MainCoroutineDispatcher() {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
import kotlin.native.concurrent.*
7+
8+
@Suppress("ACTUAL_WITHOUT_EXPECT")
9+
internal actual typealias NativeThreadLocal = kotlin.native.ThreadLocal
10+
11+
internal actual class CommonThreadLocal<T> actual constructor(supplier: () -> T) {
12+
private val value = supplier()
13+
actual fun get(): T = value
14+
}

0 commit comments

Comments
 (0)