Skip to content

Commit f04f51d

Browse files
committed
Refactored ThreadPoolDispatcher to reuse code of ExecutorCoroutineDispatcher
1 parent 3f87164 commit f04f51d

File tree

2 files changed

+28
-35
lines changed

2 files changed

+28
-35
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,45 +37,45 @@ public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
3737
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
3838
ExecutorCoroutineDispatcher(this)
3939

40-
private class ExecutorCoroutineDispatcher(
41-
private val executor: Executor
42-
) : CoroutineDispatcher(), Delay {
40+
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
41+
42+
internal abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
43+
abstract val executor: Executor
44+
4345
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
4446

4547
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
46-
val timeout = if (executor is ScheduledExecutorService)
47-
executor.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) else
48-
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
48+
val timeout = (executor as? ScheduledExecutorService)
49+
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit)
50+
?: scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
4951
continuation.cancelFutureOnCompletion(timeout)
5052
}
5153

5254
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
53-
val timeout = if (executor is ScheduledExecutorService)
54-
executor.schedule(block, time, unit) else
55-
scheduledExecutor.schedule(block, time, unit)
55+
val timeout = (executor as? ScheduledExecutorService)
56+
?.schedule(block, time, unit)
57+
?: scheduledExecutor.schedule(block, time, unit)
5658
return DisposableFutureHandle(timeout)
5759
}
5860

5961
override fun toString(): String = executor.toString()
60-
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcher && other.executor === executor
62+
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
6163
override fun hashCode(): Int = System.identityHashCode(executor)
6264
}
6365

64-
// --- reusing these classes in other places ---
65-
66-
internal class ResumeUndispatchedRunnable(
67-
private val dispatcher: CoroutineDispatcher,
68-
private val continuation: CancellableContinuation<Unit>
66+
internal class ResumeRunnable(
67+
private val continuation: Continuation<Unit>
6968
) : Runnable {
7069
override fun run() {
71-
with(continuation) { dispatcher.resumeUndispatched(Unit) }
70+
continuation.resume(Unit)
7271
}
7372
}
7473

75-
internal class ResumeRunnable(
76-
private val continuation: Continuation<Unit>
74+
private class ResumeUndispatchedRunnable(
75+
private val dispatcher: CoroutineDispatcher,
76+
private val continuation: CancellableContinuation<Unit>
7777
) : Runnable {
7878
override fun run() {
79-
continuation.resume(Unit)
79+
with(continuation) { dispatcher.resumeUndispatched(Unit) }
8080
}
8181
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,26 @@ fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null):
4646
}
4747

4848
internal class PoolThread(
49-
val dispatcher: ThreadPoolDispatcher, // for debugging & tests
49+
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
5050
target: Runnable, name: String
5151
) : Thread(target, name) {
5252
init { isDaemon = true }
5353
}
5454

5555
internal class ThreadPoolDispatcher(
56-
nThreads: Int,
57-
name: String,
58-
val job: Job
59-
) : CoroutineDispatcher(), Delay {
60-
val threadNo = AtomicInteger()
61-
val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
56+
private val nThreads: Int,
57+
private val name: String,
58+
job: Job
59+
) : ExecutorCoroutineDispatcherBase() {
60+
private val threadNo = AtomicInteger()
61+
62+
override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
6263
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
6364
}
6465

6566
init {
6667
job.invokeOnCompletion { executor.shutdown() }
6768
}
6869

69-
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
70-
71-
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
72-
val timeout = executor.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit)
73-
continuation.cancelFutureOnCompletion(timeout)
74-
}
75-
76-
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
77-
DisposableFutureHandle(executor.schedule(block, time, unit))
70+
override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
7871
}

0 commit comments

Comments
 (0)