Skip to content

Commit 1a016bd

Browse files
committed
More orderly shutdown of executors in tests
1 parent 65afd41 commit 1a016bd

File tree

3 files changed

+32
-23
lines changed

3 files changed

+32
-23
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import java.util.concurrent.ExecutorService
20-
import java.util.concurrent.Executors
21-
import java.util.concurrent.RejectedExecutionException
22-
import java.util.concurrent.TimeUnit
19+
import java.util.concurrent.*
2320
import java.util.concurrent.atomic.AtomicInteger
2421
import kotlin.coroutines.experimental.CoroutineContext
2522

@@ -33,7 +30,7 @@ object CommonPool : CoroutineDispatcher() {
3330
private var usePrivatePool = false
3431

3532
@Volatile
36-
private var _pool: ExecutorService? = null
33+
private var _pool: Executor? = null
3734

3835
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
3936

@@ -59,7 +56,7 @@ object CommonPool : CoroutineDispatcher() {
5956
private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
6057

6158
@Synchronized
62-
private fun getOrCreatePoolSync(): ExecutorService =
59+
private fun getOrCreatePoolSync(): Executor =
6360
_pool ?: createPool().also { _pool = it }
6461

6562
override fun dispatch(context: CoroutineContext, block: Runnable) =
@@ -69,20 +66,29 @@ object CommonPool : CoroutineDispatcher() {
6966
// used for tests
7067
@Synchronized
7168
internal fun usePrivatePool() {
72-
shutdownAndRelease(0)
69+
shutdown(0)
7370
usePrivatePool = true
71+
_pool = null
7472
}
7573

7674
// used for tests
7775
@Synchronized
78-
internal fun shutdownAndRelease(timeout: Long) {
79-
_pool?.apply {
76+
internal fun shutdown(timeout: Long) {
77+
(_pool as? ExecutorService)?.apply {
8078
shutdown()
8179
if (timeout > 0)
8280
awaitTermination(timeout, TimeUnit.MILLISECONDS)
83-
_pool = null
81+
shutdownNow().forEach { defaultExecutor.execute(it) }
8482
}
83+
_pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") }
84+
}
85+
86+
// used for tests
87+
@Synchronized
88+
internal fun restore() {
89+
shutdown(0)
8590
usePrivatePool = false
91+
_pool = null
8692
}
8793

8894
override fun toString(): String = "CommonPool"

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,11 @@ private fun getOrCreateExecutorSync(): ScheduledExecutorService =
5050

5151
// used for tests
5252
@Synchronized
53-
internal fun defaultExecutorShutdownNow() {
54-
_executor?.shutdownNow()
55-
}
56-
57-
@Synchronized
58-
internal fun defaultExecutorShutdownNowAndRelease() {
53+
internal fun shutdownDefaultExecutor(timeout: Long) {
5954
_executor?.apply {
60-
shutdownNow()
55+
shutdown()
56+
awaitTermination(timeout, TimeUnit.MILLISECONDS)
57+
shutdownNow() // ignore all remaining
6158
_executor = null
6259
}
6360
}

kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.junit.Assert.assertTrue
2222
import java.io.ByteArrayInputStream
2323
import java.io.ByteArrayOutputStream
2424
import java.io.PrintStream
25+
import java.util.concurrent.TimeUnit
2526

2627
fun test(block: () -> Unit): List<String> {
2728
val oldOut = System.out
@@ -42,24 +43,29 @@ fun test(block: () -> Unit): List<String> {
4243
// capture output
4344
bytes = bytesOut.toByteArray()
4445
// the shutdown
45-
defaultExecutorShutdownNow()
46-
shutdownDispatcherPools()
47-
CommonPool.shutdownAndRelease(10000L) // wait at most 10 sec
48-
defaultExecutorShutdownNowAndRelease()
46+
val timeout = 5000L // 5 sec at most to wait
47+
CommonPool.shutdown(timeout)
48+
shutdownDispatcherPools(timeout)
49+
shutdownDefaultExecutor(timeout) // the last man standing -- kill it too
50+
CommonPool.restore()
4951
System.setOut(oldOut)
5052
System.setErr(oldErr)
5153

5254
}
5355
return ByteArrayInputStream(bytes).bufferedReader().readLines()
5456
}
5557

56-
private fun shutdownDispatcherPools() {
58+
private fun shutdownDispatcherPools(timeout: Long) {
5759
val threads = arrayOfNulls<Thread>(Thread.activeCount())
5860
val n = Thread.enumerate(threads)
5961
for (i in 0 until n) {
6062
val thread = threads[i]
6163
if (thread is PoolThread)
62-
thread.dispatcher.executor.shutdownNow()
64+
thread.dispatcher.executor.apply {
65+
shutdown()
66+
awaitTermination(timeout, TimeUnit.MILLISECONDS)
67+
shutdownNow().forEach { defaultExecutor.execute(it) }
68+
}
6369
}
6470
}
6571

0 commit comments

Comments
 (0)