Skip to content

Commit 67891d8

Browse files
committed
Added context parameter to CoroutineDispatcher methods, implemented Executor.toCoroutineDispatcher
1 parent e65e47c commit 67891d8

File tree

12 files changed

+161
-42
lines changed

12 files changed

+161
-42
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import java.util.concurrent.Executor
44
import java.util.concurrent.Executors
55
import java.util.concurrent.ForkJoinPool
66
import java.util.concurrent.atomic.AtomicInteger
7+
import kotlin.coroutines.CoroutineContext
78

89
/**
910
* Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
1011
* It uses [ForkJoinPool] when available, which implements efficient work-stealing algorithm for its queues, so every
1112
* coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
1213
* When available, it wraps [ForkJoinPool.commonPool] and provides a similar shared pool where not.
1314
*/
14-
object CommonPool : CoroutineDispatcher(), Yield {
15+
object CommonPool : CoroutineDispatcher() {
1516
private val pool: Executor = findPool()
1617

1718
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
@@ -35,10 +36,6 @@ object CommonPool : CoroutineDispatcher(), Yield {
3536

3637
private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
3738

38-
override fun isDispatchNeeded(): Boolean = true
39-
override fun dispatch(block: Runnable) = pool.execute(block)
40-
41-
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
42-
pool.execute { continuation.resume(Unit) }
43-
}
39+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
40+
override fun dispatch(context: CoroutineContext, block: Runnable) = pool.execute(block)
4441
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.coroutines.experimental
33
import kotlin.coroutines.AbstractCoroutineContextElement
44
import kotlin.coroutines.Continuation
55
import kotlin.coroutines.ContinuationInterceptor
6+
import kotlin.coroutines.CoroutineContext
67

78
/**
89
* Base class that shall be extended by all coroutine dispatcher implementations.
@@ -30,12 +31,12 @@ public abstract class CoroutineDispatcher :
3031
/**
3132
* Return `true` if execution shall be dispatched onto another thread.
3233
*/
33-
public abstract fun isDispatchNeeded(): Boolean
34+
public abstract fun isDispatchNeeded(context: CoroutineContext): Boolean
3435

3536
/**
36-
* Dispatches execution of a runnable [block] onto another thread.
37+
* Dispatches execution of a runnable [block] onto another thread in the given [context].
3738
*/
38-
public abstract fun dispatch(block: Runnable)
39+
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
3940

4041
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
4142
DispatchedContinuation<T>(this, continuation)
@@ -46,27 +47,29 @@ private class DispatchedContinuation<T>(
4647
val continuation: Continuation<T>
4748
): Continuation<T> by continuation {
4849
override fun resume(value: T) {
49-
if (dispatcher.isDispatchNeeded())
50-
dispatcher.dispatch(Runnable {
51-
withDefaultCoroutineContext(continuation.context) {
50+
val context = continuation.context
51+
if (dispatcher.isDispatchNeeded(context))
52+
dispatcher.dispatch(context, Runnable {
53+
withDefaultCoroutineContext(context) {
5254
continuation.resume(value)
5355
}
5456
})
5557
else
56-
withDefaultCoroutineContext(continuation.context) {
58+
withDefaultCoroutineContext(context) {
5759
continuation.resume(value)
5860
}
5961
}
6062

6163
override fun resumeWithException(exception: Throwable) {
62-
if (dispatcher.isDispatchNeeded())
63-
dispatcher.dispatch(Runnable {
64-
withDefaultCoroutineContext(continuation.context) {
64+
val context = continuation.context
65+
if (dispatcher.isDispatchNeeded(context))
66+
dispatcher.dispatch(context, Runnable {
67+
withDefaultCoroutineContext(context) {
6568
continuation.resumeWithException(exception)
6669
}
6770
})
6871
else
69-
withDefaultCoroutineContext(continuation.context) {
72+
withDefaultCoroutineContext(context) {
7073
continuation.resumeWithException(exception)
7174
}
7275
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ internal val CURRENT_CONTEXT = ThreadLocal<CoroutineContext>()
1919
* mandating any specific threading policy.
2020
*/
2121
public object Here : CoroutineDispatcher() {
22-
override fun isDispatchNeeded(): Boolean = false
23-
override fun dispatch(block: Runnable) { throw UnsupportedOperationException() }
22+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
23+
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
2424
}
2525

2626
/**

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
4343
scheduleResumeAfterDelay(time, unit, cont)
4444
return@sc
4545
}
46-
val timeout = scheduledExecutor.schedule({ cont.resume(Unit) }, time, unit)
47-
cont.cancelFutureOnCompletion(timeout)
46+
scheduledExecutor.scheduleResumeAfterDelay(time, unit, cont)
4847
}
4948
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
44
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
55
import java.util.concurrent.locks.LockSupport
66
import kotlin.coroutines.Continuation
7+
import kotlin.coroutines.CoroutineContext
78

89
/**
910
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -49,9 +50,9 @@ internal class EventLoopImpl(
4950
this.parentJob = coroutine
5051
}
5152

52-
override fun isDispatchNeeded(): Boolean = Thread.currentThread() != thread
53+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = Thread.currentThread() != thread
5354

54-
override fun dispatch(block: Runnable) {
55+
override fun dispatch(context: CoroutineContext, block: Runnable) {
5556
schedule(Dispatch(block))
5657
}
5758

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package kotlinx.coroutines.experimental
2+
3+
import java.util.concurrent.Executor
4+
import java.util.concurrent.ExecutorService
5+
import java.util.concurrent.ScheduledExecutorService
6+
import java.util.concurrent.TimeUnit
7+
import kotlin.coroutines.CoroutineContext
8+
9+
/**
10+
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
11+
*/
12+
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
13+
ExecutorCoroutineDispatcher(this)
14+
15+
internal open class ExecutorCoroutineDispatcher(val executor: Executor) : CoroutineDispatcher(), Delay {
16+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
17+
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
18+
19+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
20+
(executor as? ScheduledExecutorService ?: scheduledExecutor).scheduleResumeAfterDelay(time, unit, continuation)
21+
}
22+
}
23+
24+
internal fun ExecutorService.scheduleResume(cont: CancellableContinuation<Unit>) {
25+
val future = submit { cont.resume(Unit) }
26+
cont.cancelFutureOnCompletion(future)
27+
}
28+
29+
internal fun ScheduledExecutorService.scheduleResumeAfterDelay(time: Long, unit: TimeUnit, cont: CancellableContinuation<Unit>) {
30+
val timeout = schedule({ cont.resume(Unit) }, time, unit)
31+
cont.cancelFutureOnCompletion(timeout)
32+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ import java.util.concurrent.ScheduledExecutorService
55
import java.util.concurrent.TimeUnit
66
import java.util.concurrent.atomic.AtomicInteger
77
import kotlin.concurrent.thread
8-
import kotlin.coroutines.ContinuationInterceptor
98
import kotlin.coroutines.CoroutineContext
109

1110
/**
12-
* Creates new coroutine execution context with the a single thread and built-in [delay] support.
11+
* Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
1312
* All continuations are dispatched immediately when invoked inside the thread of this context.
1413
* Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
1514
* The specified [name] defines the name of the new thread.
@@ -19,16 +18,16 @@ fun newSingleThreadContext(name: String, parent: Job? = null): CoroutineContext
1918
newFixedThreadPoolContext(1, name, parent)
2019

2120
/**
22-
* Creates new coroutine execution context with the fixed-size thread-pool and built-in [delay] support.
21+
* Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
2322
* All continuations are dispatched immediately when invoked inside the threads of this context.
2423
* Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
2524
* The specified [name] defines the names of the threads.
2625
* An optional [parent] job may be specified upon creation.
2726
*/
2827
fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
2928
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
30-
val lifetime = Job(parent)
31-
return lifetime + ThreadPoolDispatcher(nThreads, name, lifetime)
29+
val job = Job(parent)
30+
return job + ThreadPoolDispatcher(nThreads, name, job)
3231
}
3332

3433
private val thisThreadContext = ThreadLocal<ThreadPoolDispatcher>()
@@ -37,7 +36,7 @@ private class ThreadPoolDispatcher(
3736
nThreads: Int,
3837
name: String,
3938
val job: Job
40-
) : CoroutineDispatcher(), ContinuationInterceptor, Yield, Delay {
39+
) : CoroutineDispatcher(), Yield, Delay {
4140
val threadNo = AtomicInteger()
4241
val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
4342
thread(start = false, isDaemon = true,
@@ -51,16 +50,15 @@ private class ThreadPoolDispatcher(
5150
job.onCompletion { executor.shutdown() }
5251
}
5352

54-
override fun isDispatchNeeded(): Boolean = thisThreadContext.get() != this
53+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = thisThreadContext.get() != this
5554

56-
override fun dispatch(block: Runnable) = executor.execute(block)
55+
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
5756

5857
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
59-
executor.execute { continuation.resume(Unit) }
58+
executor.scheduleResume(continuation)
6059
}
6160

6261
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
63-
val timeout = executor.schedule({ continuation.resume(Unit) }, time, unit)
64-
continuation.cancelFutureOnCompletion(timeout)
62+
executor.scheduleResumeAfterDelay(time, unit, continuation)
6563
}
6664
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import kotlin.coroutines.ContinuationInterceptor
44

55
/**
66
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that
7-
* natively support [yield] function.
7+
* natively support [yield] function. It shall be implemented only by [CoroutineDispatcher]
8+
* classes with non-trivial [CoroutineDispatcher.isDispatchNeeded] implementations.
89
*/
910
public interface Yield {
1011
/**
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package kotlinx.coroutines.experimental
2+
3+
import org.junit.After
4+
import org.junit.Before
5+
import org.junit.Test
6+
import java.util.concurrent.Executors
7+
8+
class ExecutorsTest {
9+
fun threadNames(): Set<String> {
10+
val arrayOfThreads = Array<Thread?>(Thread.activeCount()) { null }
11+
val n = Thread.enumerate(arrayOfThreads)
12+
val names = hashSetOf<String>()
13+
for (i in 0 until n)
14+
names.add(arrayOfThreads[i]!!.name)
15+
return names
16+
}
17+
18+
lateinit var threadNamesBefore: Set<String>
19+
20+
@Before
21+
fun before() {
22+
threadNamesBefore = threadNames()
23+
}
24+
25+
@After
26+
fun after() {
27+
// give threads some time to shutdown
28+
val waitTill = System.currentTimeMillis() + 1000L
29+
var diff: Set<String>
30+
do {
31+
val threadNamesAfter = threadNames()
32+
diff = threadNamesAfter - threadNamesBefore
33+
if (diff.isEmpty()) break
34+
} while (System.currentTimeMillis() <= waitTill)
35+
diff.forEach { println("Lost thread '$it'") }
36+
check(diff.isEmpty()) { "Lost ${diff.size} threads"}
37+
}
38+
39+
fun checkThreadName(prefix: String) {
40+
val name = Thread.currentThread().name
41+
check(name.startsWith(prefix)) { "Expected thread name to start with '$prefix', found: '$name'" }
42+
}
43+
44+
@Test
45+
fun testSingleThread() {
46+
val context = newSingleThreadContext("TestThread")
47+
runBlocking(context) {
48+
checkThreadName("TestThread")
49+
}
50+
context[Job]!!.cancel()
51+
}
52+
53+
@Test
54+
fun testFixedThreadPool() {
55+
val context = newFixedThreadPoolContext(2, "TestPool")
56+
runBlocking(context) {
57+
checkThreadName("TestPool")
58+
}
59+
context[Job]!!.cancel()
60+
}
61+
62+
@Test
63+
fun testToExecutor() {
64+
val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
65+
runBlocking(executor.toCoroutineDispatcher()) {
66+
checkThreadName("TestExecutor")
67+
}
68+
executor.shutdown()
69+
}
70+
71+
@Test
72+
fun testTwoThreads() {
73+
val ctx1 = newSingleThreadContext("Ctx1")
74+
val ctx2 = newSingleThreadContext("Ctx2")
75+
runBlocking(ctx1) {
76+
checkThreadName("Ctx1")
77+
run(ctx2) {
78+
checkThreadName("Ctx2")
79+
}
80+
checkThreadName("Ctx1")
81+
}
82+
ctx1[Job]!!.cancel()
83+
ctx2[Job]!!.cancel()
84+
}
85+
}

kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.coroutines.experimental.*
1111
import kotlinx.coroutines.experimental.javafx.JavaFx.delay
1212
import java.util.concurrent.CopyOnWriteArrayList
1313
import java.util.concurrent.TimeUnit
14+
import kotlin.coroutines.CoroutineContext
1415

1516

1617
/**
@@ -21,8 +22,8 @@ object JavaFx : CoroutineDispatcher(), Yield, Delay {
2122
PulseTimer().apply { start() }
2223
}
2324

24-
override fun isDispatchNeeded(): Boolean = !Platform.isFxApplicationThread()
25-
override fun dispatch(block: Runnable) = Platform.runLater(block)
25+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = !Platform.isFxApplicationThread()
26+
override fun dispatch(context: CoroutineContext, block: Runnable) = Platform.runLater(block)
2627

2728
/**
2829
* Suspends coroutine until next JavaFx pulse and returns time of the pulse on resumption.

0 commit comments

Comments
 (0)