Skip to content

Commit 9483301

Browse files
committed
Actually fixed startup/shutdown of DefaultExecutor for tests to prevent
thread leak. Includes test-case.
1 parent 62ee4cd commit 9483301

File tree

3 files changed

+84
-43
lines changed

3 files changed

+84
-43
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,39 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
4343
@Volatile
4444
private var debugStatus: Int = FRESH
4545

46+
private val isShutdownRequested: Boolean get() {
47+
val debugStatus = debugStatus
48+
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
49+
}
50+
4651
override fun run() {
4752
timeSource.registerTimeLoopThread()
4853
try {
4954
var shutdownNanos = Long.MAX_VALUE
50-
if (notifyStartup()) {
51-
runLoop@ while (true) {
52-
Thread.interrupted() // just reset interruption flag
53-
var parkNanos = processNextEvent()
54-
if (parkNanos == Long.MAX_VALUE) {
55-
// nothing to do, initialize shutdown timeout
56-
if (shutdownNanos == Long.MAX_VALUE) {
57-
val now = timeSource.nanoTime()
58-
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
59-
val tillShutdown = shutdownNanos - now
60-
if (tillShutdown <= 0) break@runLoop // shut thread down
61-
parkNanos = parkNanos.coerceAtMost(tillShutdown)
62-
} else
63-
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
64-
}
65-
if (parkNanos > 0) {
66-
// check if shutdown was requested and bail out in this case
67-
if (debugStatus == SHUTDOWN_REQ) {
68-
acknowledgeShutdown()
69-
break@runLoop
70-
} else {
71-
timeSource.parkNanos(this, parkNanos)
72-
}
73-
}
55+
if (!notifyStartup()) return
56+
while (true) {
57+
Thread.interrupted() // just reset interruption flag
58+
var parkNanos = processNextEvent()
59+
if (parkNanos == Long.MAX_VALUE) {
60+
// nothing to do, initialize shutdown timeout
61+
if (shutdownNanos == Long.MAX_VALUE) {
62+
val now = timeSource.nanoTime()
63+
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
64+
val tillShutdown = shutdownNanos - now
65+
if (tillShutdown <= 0) return // shut thread down
66+
parkNanos = parkNanos.coerceAtMost(tillShutdown)
67+
} else
68+
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
69+
}
70+
if (parkNanos > 0) {
71+
// check if shutdown was requested and bail out in this case
72+
if (isShutdownRequested) return
73+
timeSource.parkNanos(this, parkNanos)
7474
}
7575
}
7676
} finally {
7777
_thread = null // this thread is dead
78+
acknowledgeShutdownIfNeeded()
7879
timeSource.unregisterTimeLoopThread()
7980
// recheck if queues are empty after _thread reference was set to null (!!!)
8081
if (!isEmpty) thread() // recreate thread if it is needed
@@ -110,32 +111,31 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
110111

111112
@Synchronized
112113
private fun notifyStartup(): Boolean {
113-
if (debugStatus == SHUTDOWN_REQ) return false
114+
if (isShutdownRequested) return false
114115
debugStatus = ACTIVE
115116
(this as Object).notifyAll()
116117
return true
117118
}
118119

119120
// used for tests
120121
@Synchronized
121-
internal fun shutdown(timeout: Long) {
122-
if (_thread != null) {
123-
val deadline = System.currentTimeMillis() + timeout
124-
if (debugStatus == ACTIVE || debugStatus == FRESH) debugStatus = SHUTDOWN_REQ
125-
unpark()
126-
// loop while there is anything to do immediately or deadline passes
127-
while (debugStatus != SHUTDOWN_ACK && _thread != null) {
128-
val remaining = deadline - System.currentTimeMillis()
129-
if (remaining <= 0) break
130-
(this as Object).wait(timeout)
131-
}
122+
fun shutdown(timeout: Long) {
123+
val deadline = System.currentTimeMillis() + timeout
124+
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
125+
// loop while there is anything to do immediately or deadline passes
126+
while (debugStatus != SHUTDOWN_ACK && _thread != null) {
127+
_thread?.let { timeSource.unpark(it) } // wake up thread if present
128+
val remaining = deadline - System.currentTimeMillis()
129+
if (remaining <= 0) break
130+
(this as Object).wait(timeout)
132131
}
133132
// restore fresh status
134133
debugStatus = FRESH
135134
}
136135

137136
@Synchronized
138-
private fun acknowledgeShutdown() {
137+
private fun acknowledgeShutdownIfNeeded() {
138+
if (!isShutdownRequested) return
139139
debugStatus = SHUTDOWN_ACK
140140
resetAll() // clear queues
141141
(this as Object).notifyAll()

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,13 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
288288
}
289289

290290
final override fun dispose() = synchronized(this) {
291-
when (state) {
292-
DELAYED -> _delayed.value?.remove(this)
293-
RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
294-
else -> return
291+
when (state) {
292+
DELAYED -> _delayed.value?.remove(this)
293+
RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
294+
else -> return
295+
}
296+
state = REMOVED
295297
}
296-
state = REMOVED
297-
}
298298

299299
override fun toString(): String = "Delayed[nanos=$nanoTime]"
300300
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.junit.*
20+
21+
class TestBaseTest : TestBase() {
22+
@Test
23+
fun testThreadsShutdown() {
24+
val SHUTDOWN_TIMEOUT = 1_000L
25+
repeat(1000 * stressTestMultiplier) { i ->
26+
CommonPool.usePrivatePool()
27+
val threadsBefore = currentThreads()
28+
runBlocking {
29+
val sub = launch(DefaultDispatcher) {
30+
delay(10000000L)
31+
}
32+
sub.cancel()
33+
sub.join()
34+
}
35+
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
36+
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
37+
checkTestThreads(threadsBefore)
38+
}
39+
40+
}
41+
}

0 commit comments

Comments
 (0)