Skip to content

Commit e9f6449

Browse files
committed
Fixed IllegalStateException in select that concurrently selects, which
was due a problem with Job concurrent cancel and dispose of handle. The bug was introduced by addition of Cancelling state for a Job. Original stack trace in select was: java.lang.IllegalStateException: Check failed. at kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode.remove(LockFreeLinkedList.kt:236) at kotlinx.coroutines.experimental.JobSupport.removeNode$kotlinx_coroutines_core(Job.kt:734) at kotlinx.coroutines.experimental.JobNode.dispose(Job.kt:995) at kotlinx.coroutines.experimental.selects.SelectBuilderImpl.initCancellability(Select.kt:294) at kotlinx.coroutines.experimental.selects.SelectBuilderImpl.getResult(Select.kt:273)
1 parent a047a11 commit e9f6449

File tree

2 files changed

+101
-2
lines changed

2 files changed

+101
-2
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,14 @@ public open class LockFreeLinkedListNode {
227227
// ------ removeXXX ------
228228

229229
/**
230-
* Removes this node from the list. Returns `true` when removed successfully.
230+
* Removes this node from the list. Returns `true` when removed successfully, or `false` if the node was already
231+
* removed or if it was not added to any list in the first place.
231232
*/
232233
public open fun remove(): Boolean {
233234
while (true) { // lock-free loop on next
234235
val next = this.next
235236
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
236-
check(next !== this) // sanity check -- can be true for sentinel nodes only, but they are never removed
237+
if (next === this) return false // was not even added
237238
val removed = (next as Node).removed()
238239
if (NEXT.compareAndSet(this, next, removed)) {
239240
// was removed successfully (linearized remove) -- fixup the list
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.junit.Test
20+
import kotlin.concurrent.thread
21+
22+
/**
23+
* Tests concurrent cancel & dispose of the jobs.
24+
*/
25+
class JobDisposeTest: TestBase() {
26+
private val TEST_DURATION = 3 * stressTestMultiplier // seconds
27+
28+
@Volatile
29+
private var done = false
30+
@Volatile
31+
private var job: TestJob? = null
32+
@Volatile
33+
private var handle: DisposableHandle? = null
34+
35+
@Volatile
36+
private var exception: Throwable? = null
37+
38+
fun testThread(name: String, block: () -> Unit): Thread =
39+
thread(start = false, name = name, block = block).apply {
40+
setUncaughtExceptionHandler { t, e ->
41+
exception = e
42+
println("Exception in ${t.name}: $e")
43+
e.printStackTrace()
44+
}
45+
}
46+
47+
@Test
48+
fun testConcurrentDispose() {
49+
// create threads
50+
val threads = mutableListOf<Thread>()
51+
threads += testThread("creator") {
52+
while (!done) {
53+
val job = TestJob()
54+
val handle = job.invokeOnCancellation { /* nothing */ }
55+
this.job = job // post job to cancelling thread
56+
this.handle = handle // post handle to concurrent disposer thread
57+
handle.dispose() // dispose of handle from this thread (concurrently with other disposer)
58+
}
59+
}
60+
threads += testThread("canceller") {
61+
var prevJob: Job? = null
62+
while (!done) {
63+
val job = this.job ?: continue
64+
val result = job.cancel()
65+
if (job != prevJob) {
66+
check(result) // must have returned true
67+
prevJob = job
68+
} else
69+
check(!result) // must have returned false
70+
}
71+
}
72+
threads += testThread("disposer") {
73+
while (!done) {
74+
handle?.dispose()
75+
}
76+
}
77+
// start threads
78+
threads.forEach { it.start() }
79+
// wait
80+
for (i in 1..TEST_DURATION) {
81+
println("$i: Running")
82+
Thread.sleep(1000)
83+
if (exception != null) break
84+
}
85+
// done
86+
done = true
87+
// join threads
88+
threads.forEach { it.join() }
89+
// rethrow exception if any
90+
exception?.let { throw it }
91+
}
92+
93+
class TestJob : JobSupport(active = true) {
94+
// The bug was triggering only with cancelling state
95+
override val hasCancellingState: Boolean
96+
get() = true
97+
}
98+
}

0 commit comments

Comments
 (0)