Skip to content

Commit ffc61ae

Browse files
committed
Dump pool threads on BroadcastChannelMultiReceiveStressTest failure;
Move thread test utils to different file
1 parent 6f5bd3f commit ffc61ae

File tree

17 files changed

+67
-98
lines changed

17 files changed

+67
-98
lines changed

core/kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,54 +27,6 @@ import java.util.concurrent.ConcurrentHashMap
2727
import java.util.concurrent.TimeUnit
2828
import java.util.concurrent.locks.LockSupport
2929

30-
private const val WAIT_LOST_THREADS = 10_000L // 10s
31-
private val ignoreLostThreads = mutableSetOf<String>()
32-
33-
fun ignoreLostThreads(vararg s: String) { ignoreLostThreads += s }
34-
35-
fun currentThreads(): Set<Thread> {
36-
var estimate = 0
37-
while (true) {
38-
estimate = estimate.coerceAtLeast(Thread.activeCount() + 1)
39-
val arrayOfThreads = Array<Thread?>(estimate) { null }
40-
val n = Thread.enumerate(arrayOfThreads)
41-
if (n >= estimate) {
42-
estimate = n + 1
43-
continue // retry with a better size estimate
44-
}
45-
val threads = hashSetOf<Thread>()
46-
for (i in 0 until n)
47-
threads.add(arrayOfThreads[i]!!)
48-
return threads
49-
}
50-
}
51-
52-
fun checkTestThreads(threadsBefore: Set<Thread>) {
53-
// give threads some time to shutdown
54-
val waitTill = System.currentTimeMillis() + WAIT_LOST_THREADS
55-
var diff: List<Thread>
56-
do {
57-
val threadsAfter = currentThreads()
58-
diff = (threadsAfter - threadsBefore).filter { thread ->
59-
ignoreLostThreads.none { prefix -> thread.name.startsWith(prefix) }
60-
}
61-
if (diff.isEmpty()) break
62-
} while (System.currentTimeMillis() <= waitTill)
63-
ignoreLostThreads.clear()
64-
if (diff.isEmpty()) return
65-
val message = "Lost threads ${diff.map { it.name }}"
66-
println("!!! $message")
67-
println("=== Dumping lost thread stack traces")
68-
diff.forEach { thread ->
69-
println("Thread \"${thread.name}\" ${thread.state}")
70-
val trace = thread.stackTrace
71-
for (t in trace) println("\tat ${t.className}.${t.methodName}(${t.fileName}:${t.lineNumber})")
72-
println()
73-
}
74-
println("===")
75-
error(message)
76-
}
77-
7830
fun trackTask(block: Runnable) = timeSource.trackTask(block)
7931

8032
// helper function to dump exception to stdout for ease of debugging failed tests

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import guide.test.checkTestThreads
20-
import guide.test.currentThreads
2119
import org.junit.After
2220
import org.junit.Before
2321
import java.util.concurrent.atomic.AtomicBoolean
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package kotlinx.coroutines.experimental
2+
3+
private const val WAIT_LOST_THREADS = 10_000L // 10s
4+
private val ignoreLostThreads = mutableSetOf<String>()
5+
6+
fun ignoreLostThreads(vararg s: String) { ignoreLostThreads += s }
7+
8+
fun currentThreads(): Set<Thread> {
9+
var estimate = 0
10+
while (true) {
11+
estimate = estimate.coerceAtLeast(Thread.activeCount() + 1)
12+
val arrayOfThreads = Array<Thread?>(estimate) { null }
13+
val n = Thread.enumerate(arrayOfThreads)
14+
if (n >= estimate) {
15+
estimate = n + 1
16+
continue // retry with a better size estimate
17+
}
18+
val threads = hashSetOf<Thread>()
19+
for (i in 0 until n)
20+
threads.add(arrayOfThreads[i]!!)
21+
return threads
22+
}
23+
}
24+
25+
fun List<Thread>.dumpThreads(header: String) {
26+
println("=== $header")
27+
forEach { thread ->
28+
println("Thread \"${thread.name}\" ${thread.state}")
29+
val trace = thread.stackTrace
30+
for (t in trace) println("\tat ${t.className}.${t.methodName}(${t.fileName}:${t.lineNumber})")
31+
println()
32+
}
33+
println("===")
34+
}
35+
36+
fun ThreadPoolDispatcher.dumpThreads(header: String) =
37+
currentThreads().filter { it is PoolThread && it.dispatcher == this@dumpThreads }.dumpThreads(header)
38+
39+
fun checkTestThreads(threadsBefore: Set<Thread>) {
40+
// give threads some time to shutdown
41+
val waitTill = System.currentTimeMillis() + WAIT_LOST_THREADS
42+
var diff: List<Thread>
43+
do {
44+
val threadsAfter = currentThreads()
45+
diff = (threadsAfter - threadsBefore).filter { thread ->
46+
ignoreLostThreads.none { prefix -> thread.name.startsWith(prefix) }
47+
}
48+
if (diff.isEmpty()) break
49+
} while (System.currentTimeMillis() <= waitTill)
50+
ignoreLostThreads.clear()
51+
if (diff.isEmpty()) return
52+
val message = "Lost threads ${diff.map { it.name }}"
53+
println("!!! $message")
54+
diff.dumpThreads("Dumping lost thread stack traces")
55+
error(message)
56+
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class BroadcastChannelMultiReceiveStressTest(
111111
}
112112
} catch (e: Exception) {
113113
println("Failed: $e")
114+
pool.dumpThreads("Threads in pool")
114115
receivers.indices.forEach { index ->
115116
println("lastReceived[$index] = ${lastReceived[index].get()}")
116117
}

integration/kotlinx-coroutines-guava/src/test/kotlin/kotlinx/coroutines/experimental/guava/ListenableFutureTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package kotlinx.coroutines.experimental.guava
1818

1919
import com.google.common.util.concurrent.MoreExecutors
2020
import com.google.common.util.concurrent.SettableFuture
21-
import guide.test.ignoreLostThreads
2221
import kotlinx.coroutines.experimental.*
2322
import org.hamcrest.core.IsEqual
2423
import org.hamcrest.core.IsInstanceOf

integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package kotlinx.coroutines.experimental.future
1818

19-
import guide.test.ignoreLostThreads
2019
import kotlinx.coroutines.experimental.*
2120
import org.hamcrest.core.IsEqual
2221
import org.junit.Assert.*

integration/kotlinx-coroutines-quasar/src/test/kotlin/kotlinx/coroutines/experimental/quasar/QuasarTest.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ import co.paralleluniverse.fibers.Fiber
2020
import co.paralleluniverse.fibers.SuspendExecution
2121
import co.paralleluniverse.strands.SuspendableCallable
2222
import co.paralleluniverse.strands.dataflow.Val
23-
import guide.test.ignoreLostThreads
24-
import kotlinx.coroutines.experimental.CompletableDeferred
25-
import kotlinx.coroutines.experimental.TestBase
26-
import kotlinx.coroutines.experimental.launch
27-
import kotlinx.coroutines.experimental.runBlocking
23+
import kotlinx.coroutines.experimental.*
2824
import org.junit.Before
2925
import org.junit.Test
3026
import java.util.concurrent.TimeUnit

reactive/kotlinx-coroutines-reactor/src/test/kotlin/kotlinx/coroutines/experimental/reactor/MonoTest.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.reactor
1818

19-
import guide.test.ignoreLostThreads
20-
import kotlinx.coroutines.experimental.CommonPool
21-
import kotlinx.coroutines.experimental.TestBase
19+
import kotlinx.coroutines.experimental.*
2220
import kotlinx.coroutines.experimental.reactive.awaitFirst
2321
import kotlinx.coroutines.experimental.reactive.awaitLast
2422
import kotlinx.coroutines.experimental.reactive.awaitSingle
25-
import kotlinx.coroutines.experimental.runBlocking
26-
import kotlinx.coroutines.experimental.yield
2723
import org.hamcrest.core.IsEqual
2824
import org.hamcrest.core.IsInstanceOf
2925
import org.junit.Assert

reactive/kotlinx-coroutines-reactor/src/test/kotlin/kotlinx/coroutines/experimental/reactor/SchedulerTest.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental.reactor
1818

19-
import guide.test.ignoreLostThreads
20-
import kotlinx.coroutines.experimental.TestBase
21-
import kotlinx.coroutines.experimental.delay
22-
import kotlinx.coroutines.experimental.run
23-
import kotlinx.coroutines.experimental.runBlocking
19+
import kotlinx.coroutines.experimental.*
2420
import org.hamcrest.core.IsEqual
2521
import org.hamcrest.core.IsNot
2622
import org.junit.Assert.assertThat

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/SchedulerTest.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental.rx1
1818

19-
import guide.test.ignoreLostThreads
20-
import kotlinx.coroutines.experimental.TestBase
21-
import kotlinx.coroutines.experimental.delay
22-
import kotlinx.coroutines.experimental.run
23-
import kotlinx.coroutines.experimental.runBlocking
19+
import kotlinx.coroutines.experimental.*
2420
import org.hamcrest.core.IsEqual
2521
import org.hamcrest.core.IsNot
2622
import org.junit.Assert.assertThat

0 commit comments

Comments
 (0)