Skip to content

Commit db0d4fc

Browse files
committed
JobHandlersUpgradeStressTest introduced
1 parent 4b9ae98 commit db0d4fc

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,29 @@ public open class LockFreeLinkedListNode {
189189

190190
// ------ addXXX util ------
191191

192+
/**
193+
* Given:
194+
* ```
195+
* +-----------------------+
196+
* this | node V next
197+
* +---+---+ +---+---+ +---+---+
198+
* ... <-- | P | N | | P | N | | P | N | --> ....
199+
* +---+---+ +---+---+ +---+---+
200+
* ^ |
201+
* +-----------------------+
202+
* ```
203+
* Produces:
204+
* ```
205+
* this node next
206+
* +---+---+ +---+---+ +---+---+
207+
* ... <-- | P | N | ==> | P | N | --> | P | N | --> ....
208+
* +---+---+ +---+---+ +---+---+
209+
* ^ | ^ |
210+
* +---------+ +---------+
211+
* ```
212+
* Where `==>` denotes linearization point.
213+
* Returns `false` if `next` was not following `this` node.
214+
*/
192215
@PublishedApi
193216
internal fun addNext(node: Node, next: Node): Boolean {
194217
node._prev.lazySet(this)
@@ -462,6 +485,28 @@ public open class LockFreeLinkedListNode {
462485

463486
// ------ other helpers ------
464487

488+
/**
489+
* Given:
490+
* ```
491+
*
492+
* prev this next
493+
* +---+---+ +---+---+ +---+---+
494+
* ... <-- | P | N | --> | P | N | --> | P | N | --> ....
495+
* +---+---+ +---+---+ +---+---+
496+
* ^ ^ | |
497+
* | +---------+ |
498+
* +-------------------------+
499+
* ```
500+
* Produces:
501+
* ```
502+
* prev this next
503+
* +---+---+ +---+---+ +---+---+
504+
* ... <-- | P | N | --> | P | N | --> | P | N | --> ....
505+
* +---+---+ +---+---+ +---+---+
506+
* ^ | ^ |
507+
* +---------+ +---------+
508+
* ```
509+
*/
465510
private fun finishAdd(next: Node) {
466511
next._prev.loop { nextPrev ->
467512
if (nextPrev is Removed || this.next !== next) return // next was removed, remover fixes up links
@@ -483,6 +528,7 @@ public open class LockFreeLinkedListNode {
483528
private fun markPrev(): Node {
484529
_prev.loop { prev ->
485530
if (prev is Removed) return prev.ref
531+
check(prev !== this) { "Cannot remove node that was not fully added" }
486532
val removedPrev = (prev as Node).removed()
487533
if (_prev.compareAndSet(prev, removedPrev)) return prev
488534
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlinx.atomicfu.*
20+
import java.util.*
21+
import java.util.concurrent.*
22+
import kotlin.concurrent.*
23+
import kotlin.test.*
24+
25+
class JobHandlersUpgradeStressTest : TestBase() {
26+
private val nSeconds = 3 * stressTestMultiplier
27+
private val nThreads = 4
28+
29+
private val cyclicBarrier = CyclicBarrier(1 + nThreads)
30+
private val threads = mutableListOf<Thread>()
31+
32+
private val inters = atomic(0)
33+
private val removed = atomic(0)
34+
private val fired = atomic(0)
35+
36+
private val sink = atomic(0)
37+
38+
@Volatile
39+
private var done = false
40+
41+
@Volatile
42+
private var job: Job? = null
43+
44+
class State {
45+
val state = atomic(0)
46+
}
47+
48+
@Test
49+
fun testStress() {
50+
println("--- JobHandlersUpgradeStressTest")
51+
threads += thread(name = "creator", start = false) {
52+
val rnd = Random()
53+
while (true) {
54+
job = if (done) null else Job()
55+
cyclicBarrier.await()
56+
val job = job ?: break
57+
// burn some time
58+
repeat(rnd.nextInt(30)) { sink.incrementAndGet() }
59+
// cancel job
60+
job.cancel()
61+
cyclicBarrier.await()
62+
inters.incrementAndGet()
63+
}
64+
}
65+
threads += List(nThreads) { threadId ->
66+
thread(name = "handler-$threadId", start = false) {
67+
val rnd = Random()
68+
while (true) {
69+
val onCancelling = rnd.nextBoolean()
70+
val invokeImmediately: Boolean = rnd.nextBoolean()
71+
cyclicBarrier.await()
72+
val job = job ?: break
73+
val state = State()
74+
// burn some time
75+
repeat(rnd.nextInt(10)) { sink.incrementAndGet() }
76+
val handle =
77+
job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) {
78+
if (!state.state.compareAndSet(0, 1))
79+
error("Fired more than once or too late: state=${state.state.value}")
80+
}
81+
// burn some time
82+
repeat(rnd.nextInt(10)) { sink.incrementAndGet() }
83+
// dispose
84+
handle.dispose()
85+
cyclicBarrier.await()
86+
val resultingState = state.state.value
87+
when (resultingState) {
88+
0 -> removed.incrementAndGet()
89+
1 -> fired.incrementAndGet()
90+
else -> error("Cannot happen")
91+
}
92+
if (!state.state.compareAndSet(resultingState, 2))
93+
error("Cannot fire late: resultingState=$resultingState")
94+
}
95+
}
96+
}
97+
threads.forEach { it.start() }
98+
repeat(nSeconds) { second ->
99+
Thread.sleep(1000)
100+
println("${second + 1}: ${inters.value} iterations")
101+
}
102+
done = true
103+
threads.forEach { it.join() }
104+
println(" Completed ${inters.value} iterations")
105+
println(" Removed handler ${removed.value} times")
106+
println(" Fired handler ${fired.value} times")
107+
108+
}
109+
}

0 commit comments

Comments
 (0)