Skip to content

Commit e8986b8

Browse files
committed
Fixed startup/shutdown of DefaultExecutor for tests to prevent thread leak
1 parent 9a0d8ac commit e8986b8

File tree

1 file changed

+29
-25
lines changed
  • core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental

1 file changed

+29
-25
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,31 +44,32 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
4444
private var debugStatus: Int = FRESH
4545

4646
override fun run() {
47-
var shutdownNanos = Long.MAX_VALUE
4847
timeSource.registerTimeLoopThread()
49-
notifyStartup()
5048
try {
51-
runLoop@ while (true) {
52-
Thread.interrupted() // just reset interruption flag
53-
var parkNanos = processNextEvent()
54-
if (parkNanos == Long.MAX_VALUE) {
55-
// nothing to do, initialize shutdown timeout
56-
if (shutdownNanos == Long.MAX_VALUE) {
57-
val now = timeSource.nanoTime()
58-
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
59-
val tillShutdown = shutdownNanos - now
60-
if (tillShutdown <= 0) break@runLoop // shut thread down
61-
parkNanos = parkNanos.coerceAtMost(tillShutdown)
62-
} else
63-
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
64-
}
65-
if (parkNanos > 0) {
66-
// check if shutdown was requested and bail out in this case
67-
if (debugStatus == SHUTDOWN_REQ) {
68-
acknowledgeShutdown()
69-
break@runLoop
70-
} else {
71-
timeSource.parkNanos(this, parkNanos)
49+
var shutdownNanos = Long.MAX_VALUE
50+
if (notifyStartup()) {
51+
runLoop@ while (true) {
52+
Thread.interrupted() // just reset interruption flag
53+
var parkNanos = processNextEvent()
54+
if (parkNanos == Long.MAX_VALUE) {
55+
// nothing to do, initialize shutdown timeout
56+
if (shutdownNanos == Long.MAX_VALUE) {
57+
val now = timeSource.nanoTime()
58+
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
59+
val tillShutdown = shutdownNanos - now
60+
if (tillShutdown <= 0) break@runLoop // shut thread down
61+
parkNanos = parkNanos.coerceAtMost(tillShutdown)
62+
} else
63+
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
64+
}
65+
if (parkNanos > 0) {
66+
// check if shutdown was requested and bail out in this case
67+
if (debugStatus == SHUTDOWN_REQ) {
68+
acknowledgeShutdown()
69+
break@runLoop
70+
} else {
71+
timeSource.parkNanos(this, parkNanos)
72+
}
7273
}
7374
}
7475
}
@@ -101,23 +102,26 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
101102
@Synchronized
102103
internal fun ensureStarted() {
103104
assert(_thread == null) // ensure we are at a clean state
105+
assert(debugStatus == FRESH || debugStatus == SHUTDOWN_ACK)
104106
debugStatus = FRESH
105107
createThreadSync() // create fresh thread
106108
while (debugStatus == FRESH) (this as Object).wait()
107109
}
108110

109111
@Synchronized
110-
private fun notifyStartup() {
112+
private fun notifyStartup(): Boolean {
113+
if (debugStatus == SHUTDOWN_REQ) return false
111114
debugStatus = ACTIVE
112115
(this as Object).notifyAll()
116+
return true
113117
}
114118

115119
// used for tests
116120
@Synchronized
117121
internal fun shutdown(timeout: Long) {
118122
if (_thread != null) {
119123
val deadline = System.currentTimeMillis() + timeout
120-
if (debugStatus == ACTIVE) debugStatus = SHUTDOWN_REQ
124+
if (debugStatus == ACTIVE || debugStatus == FRESH) debugStatus = SHUTDOWN_REQ
121125
unpark()
122126
// loop while there is anything to do immediately or deadline passes
123127
while (debugStatus != SHUTDOWN_ACK && _thread != null) {

0 commit comments

Comments
 (0)