Skip to content

Commit ede2923

Browse files
elizarovqwwdfsad
authored andcommitted
Fixed race in a new Job with listeners between start & cancel
1 parent bca98f4 commit ede2923

File tree

2 files changed

+104
-31
lines changed

2 files changed

+104
-31
lines changed

common/kotlinx-coroutines-core-common/src/JobSupport.kt

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,10 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
373373
onStartInternal()
374374
return TRUE
375375
}
376-
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
377-
return state.tryMakeActive().also { result ->
378-
if (result == TRUE) onStartInternal()
379-
}
376+
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
377+
if (!_state.compareAndSet(state, state.list)) return RETRY
378+
onStartInternal()
379+
return TRUE
380380
}
381381
else -> return FALSE // not a new state
382382
}
@@ -486,13 +486,15 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
486486
list.addLastIf(node) { this.state === expect }
487487

488488
private fun promoteEmptyToNodeList(state: Empty) {
489-
// try to promote it to list in new state
490-
_state.compareAndSet(state, NodeList(state.isActive))
489+
// try to promote it to LIST state with the corresponding state
490+
val list = NodeList()
491+
val update = if (state.isActive) list else InactiveNodeList(list)
492+
_state.compareAndSet(state, update)
491493
}
492494

493495
private fun promoteSingleToNodeList(state: JobNode<*>) {
494496
// try to promote it to list (SINGLE+ state)
495-
state.addOneIfEmpty(NodeList(active = true))
497+
state.addOneIfEmpty(NodeList())
496498
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
497499
val list = state.nextNode // either our NodeList or somebody else won the race, updated state
498500
// just attempt converting it to list if state is still the same, then we'll continue lock-free loop
@@ -597,14 +599,13 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
597599
is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
598600
promoteSingleToNodeList(state)
599601
}
600-
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
601-
if (state.isActive) {
602-
if (tryMakeCancelling(state, state.list, cause)) return true
603-
} else {
604-
// cancelling a non-started coroutine makes it immediately cancelled
605-
if (updateStateCancelled(state, cause))
606-
return true
607-
}
602+
is NodeList -> { // LIST -- active list of completion handlers
603+
if (tryMakeCancelling(state, state.list, cause)) return true
604+
}
605+
is InactiveNodeList -> { // LIST -- inactive list of completion handlers
606+
// cancelling a non-started coroutine makes it immediately cancelled
607+
if (updateStateCancelled(state, cause))
608+
return true
608609
}
609610
is Finishing -> { // Completing/Cancelling the job, may cancel
610611
if (state.cancelled != null) {
@@ -1079,31 +1080,30 @@ internal abstract class JobNode<out J : Job>(
10791080
override fun dispose() = (job as JobSupport).removeNode(this)
10801081
}
10811082

1082-
internal class NodeList(
1083-
active: Boolean
1084-
) : LockFreeLinkedListHead(), Incomplete {
1085-
private val _active = atomic(if (active) 1 else 0)
1086-
1087-
override val isActive: Boolean get() = _active.value != 0
1083+
internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1084+
override val isActive: Boolean get() = true
10881085
override val list: NodeList get() = this
10891086

1090-
fun tryMakeActive(): Int {
1091-
if (_active.value != 0) return FALSE
1092-
if (_active.compareAndSet(0, 1)) return TRUE
1093-
return RETRY
1094-
}
1095-
1096-
override fun toString(): String = buildString {
1097-
append("List")
1098-
append(if (isActive) "{Active}" else "{New}")
1099-
append("[")
1087+
fun getString(state: String) = buildString {
1088+
append("List{")
1089+
append(state)
1090+
append("}[")
11001091
var first = true
11011092
this@NodeList.forEach<JobNode<*>> { node ->
11021093
if (first) first = false else append(", ")
11031094
append(node)
11041095
}
11051096
append("]")
11061097
}
1098+
1099+
override fun toString(): String = getString("Active")
1100+
}
1101+
1102+
internal class InactiveNodeList(
1103+
override val list: NodeList
1104+
) : Incomplete {
1105+
override val isActive: Boolean get() = false
1106+
override fun toString(): String = list.getString("New")
11071107
}
11081108

11091109
private class InvokeOnCompletion(
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental
6+
7+
import org.junit.*
8+
import org.junit.Test
9+
import java.util.concurrent.*
10+
import kotlin.test.*
11+
12+
class JobActivationStressTest : TestBase() {
13+
private val N_ITERATIONS = 10_000 * stressTestMultiplier
14+
private val pool = newFixedThreadPoolContext(3, "JobActivationStressTest")
15+
16+
@After
17+
fun tearDown() {
18+
pool.close()
19+
}
20+
21+
/**
22+
* Perform concurrent start & cancel of a job with prior installed completion handlers
23+
*/
24+
@Test
25+
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
26+
fun testActivation() = runTest {
27+
val barrier = CyclicBarrier(3)
28+
val scope = CoroutineScope(pool)
29+
repeat(N_ITERATIONS) {
30+
var wasStarted = false
31+
val d = scope.async(start = CoroutineStart.LAZY) {
32+
wasStarted = true
33+
throw TestException()
34+
}
35+
// need to add on completion handler
36+
val causeHolder = object {
37+
var cause: Throwable? = null
38+
}
39+
// we use synchronization on causeHolder to work around the fact that completion listeners
40+
// are invoked after the job is in the final state, so when "d.join()" completes there is
41+
// no guarantee that this listener was already invoked
42+
d.invokeOnCompletion {
43+
synchronized(causeHolder) {
44+
causeHolder.cause = it ?: Error("Empty cause")
45+
(causeHolder as Object).notifyAll()
46+
}
47+
}
48+
// concurrent cancel
49+
val canceller = scope.launch {
50+
barrier.await()
51+
d.cancel()
52+
}
53+
// concurrent cancel
54+
val starter = scope.launch {
55+
barrier.await()
56+
d.start()
57+
}
58+
barrier.await()
59+
joinAll(d, canceller, starter)
60+
if (wasStarted) {
61+
val exception = d.getCompletionExceptionOrNull()
62+
assertTrue(exception is TestException, "exception=$exception")
63+
val cause = synchronized(causeHolder) {
64+
while (causeHolder.cause == null) (causeHolder as Object).wait()
65+
causeHolder.cause
66+
}
67+
assertTrue(cause is TestException, "cause=$cause")
68+
}
69+
}
70+
}
71+
72+
private class TestException : Exception()
73+
}

0 commit comments

Comments
 (0)