Skip to content

Commit 6c63aea

Browse files
committed
LockFreeLinkedList refactored
1 parent 187eace commit 6c63aea

File tree

2 files changed

+82
-62
lines changed

2 files changed

+82
-62
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 81 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
55

66
private typealias Node = LockFreeLinkedListNode
77

8+
internal const val UNDECIDED = 0
9+
internal const val SUCCESS = 1
10+
internal const val FAILURE = 2
11+
812
/**
913
* Doubly-linked concurrent list node with remove support.
1014
* Based on paper
@@ -41,24 +45,21 @@ internal open class LockFreeLinkedListNode {
4145
private fun removed(): Removed =
4246
removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
4347

44-
abstract class CondAdd {
45-
internal lateinit var newNode: Node
46-
internal lateinit var oldNext: Node
48+
abstract class CondAdd(val newNode: Node) {
49+
lateinit var oldNext: Node
4750
@Volatile
4851
private var consensus: Int = UNDECIDED // status of operation
52+
4953
abstract fun isCondition(): Boolean
5054

5155
private companion object {
5256
@JvmStatic
5357
val CONSENSUS: AtomicIntegerFieldUpdater<CondAdd> =
54-
AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
55-
56-
const val UNDECIDED = 0
57-
const val SUCCESS = 1
58-
const val FAILURE = 2
58+
AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
5959
}
6060

61-
fun completeAdd(node: Node): Boolean {
61+
// returns either SUCCESS or FAILURE
62+
fun completeAdd(node: Node): Int {
6263
// make decision on status
6364
var consensus: Int
6465
while (true) {
@@ -75,10 +76,14 @@ internal open class LockFreeLinkedListNode {
7576
// only the thread the makes this update actually finishes add operation
7677
if (success) newNode.finishAdd(oldNext)
7778
}
78-
return success
79+
return consensus
7980
}
8081
}
8182

83+
private inline fun makeCondAdd(node: Node, crossinline condition: () -> Boolean): CondAdd = object : CondAdd(node) {
84+
override fun isCondition(): Boolean = condition()
85+
}
86+
8287
val isRemoved: Boolean get() = _next is Removed
8388

8489
private val isFresh: Boolean get() = _next === this && prev === this
@@ -101,36 +106,31 @@ internal open class LockFreeLinkedListNode {
101106

102107
// ------ addFirstXXX ------
103108

104-
private fun addFirstCC(node: Node, condAdd: CondAdd?): Boolean {
105-
require(node.isFresh)
106-
condAdd?.newNode = node
109+
/**
110+
* Adds first item to this list.
111+
*/
112+
fun addFirst(node: Node) {
107113
while (true) { // lock-free loop on next
108114
val next = this.next as Node // this sentinel node is never removed
109-
PREV.lazySet(node, this)
110-
NEXT.lazySet(node, next)
111-
condAdd?.oldNext = next
112-
if (NEXT.compareAndSet(this, next, condAdd ?: node)) {
113-
// added successfully (linearized add) -- fixup the list
114-
return condAdd?.completeAdd(this) ?: run { node.finishAdd(next); true }
115-
}
115+
if (addNext(node, next)) return
116116
}
117117
}
118118

119-
/**
120-
* Adds first item to this list.
121-
*/
122-
fun addFirst(node: Node) { addFirstCC(node, null) }
123-
124119
/**
125120
* Adds first item to this list atomically if the [condition] is true.
126121
*/
127-
inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean =
128-
addFirstCC(node, object : CondAdd() {
129-
override fun isCondition(): Boolean = condition()
130-
})
122+
inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean {
123+
val condAdd = makeCondAdd(node, condition)
124+
while (true) { // lock-free loop on next
125+
val next = this.next as Node // this sentinel node is never removed
126+
when (tryCondAddNext(node, next, condAdd)) {
127+
SUCCESS -> return true
128+
FAILURE -> return false
129+
}
130+
}
131+
}
131132

132133
fun addFirstIfEmpty(node: Node): Boolean {
133-
require(node.isFresh)
134134
PREV.lazySet(node, this)
135135
NEXT.lazySet(node, this)
136136
if (!NEXT.compareAndSet(this, this, node)) return false // this is not an empty list!
@@ -141,40 +141,51 @@ internal open class LockFreeLinkedListNode {
141141

142142
// ------ addLastXXX ------
143143

144-
private fun addLastCC(node: Node, condAdd: CondAdd?): Boolean {
145-
require(node.isFresh)
146-
condAdd?.newNode = node
144+
/**
145+
* Adds last item to this list.
146+
*/
147+
fun addLast(node: Node) {
147148
while (true) { // lock-free loop on prev.next
148149
val prev = prevHelper() ?: continue
149-
PREV.lazySet(node, prev)
150-
NEXT.lazySet(node, this)
151-
condAdd?.oldNext = this
152-
if (NEXT.compareAndSet(prev, this, condAdd ?: node)) {
153-
// added successfully (linearized add) -- fixup the list
154-
return condAdd?.completeAdd(prev) ?: run { node.finishAdd(this); true }
155-
}
150+
if (prev.addNext(node, this)) return
156151
}
157152
}
158153

159-
/**
160-
* Adds last item to this list.
161-
*/
162-
fun addLast(node: Node) { addLastCC(node, null) }
163-
164154
/**
165155
* Adds last item to this list atomically if the [condition] is true.
166156
*/
167-
inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean =
168-
addLastCC(node, object : CondAdd() {
169-
override fun isCondition(): Boolean = condition()
170-
})
157+
inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
158+
val condAdd = makeCondAdd(node, condition)
159+
while (true) { // lock-free loop on prev.next
160+
val prev = prevHelper() ?: continue
161+
when (prev.tryCondAddNext(node, this, condAdd)) {
162+
SUCCESS -> return true
163+
FAILURE -> return false
164+
}
165+
}
166+
}
171167

172168
inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
173-
require(node.isFresh)
174169
while (true) { // lock-free loop on prev.next
175170
val prev = prevHelper() ?: continue
176171
if (!predicate(prev)) return false
177-
if (addAfterPrev(node, prev)) return true
172+
if (prev.addNext(node, this)) return true
173+
}
174+
}
175+
176+
inline fun addLastIfPrevAndIf(
177+
node: Node,
178+
predicate: (Node) -> Boolean, // prev node predicate
179+
crossinline condition: () -> Boolean // atomically checked condition
180+
): Boolean {
181+
val condAdd = makeCondAdd(node, condition)
182+
while (true) { // lock-free loop on prev.next
183+
val prev = prevHelper() ?: continue
184+
if (!predicate(prev)) return false
185+
when (prev.tryCondAddNext(node, this, condAdd)) {
186+
SUCCESS -> return true
187+
FAILURE -> return false
188+
}
178189
}
179190
}
180191

@@ -185,15 +196,25 @@ internal open class LockFreeLinkedListNode {
185196
return null
186197
}
187198

188-
private fun addAfterPrev(node: Node, prev: Node): Boolean {
189-
PREV.lazySet(node, prev)
190-
NEXT.lazySet(node, this)
191-
if (NEXT.compareAndSet(prev, this, node)) {
192-
// added successfully (linearized add) -- fixup the list
193-
node.finishAdd(this)
194-
return true
195-
}
196-
return false
199+
// ------ addXXX util ------
200+
201+
private fun addNext(node: Node, next: Node): Boolean {
202+
PREV.lazySet(node, this)
203+
NEXT.lazySet(node, next)
204+
if (!NEXT.compareAndSet(this, next, node)) return false
205+
// added successfully (linearized add) -- fixup the list
206+
node.finishAdd(next)
207+
return true
208+
}
209+
210+
// returns UNDECIDED, SUCCESS or FAILURE
211+
private fun tryCondAddNext(node: Node, next: Node, condAdd: CondAdd): Int {
212+
PREV.lazySet(node, this)
213+
NEXT.lazySet(node, next)
214+
condAdd.oldNext = next
215+
if (!NEXT.compareAndSet(this, next, condAdd)) return UNDECIDED
216+
// added operation successfully (linearized) -- complete it & fixup the list
217+
return condAdd.completeAdd(this)
197218
}
198219

199220
// ------ removeXXX ------

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class RendezvousChannelAtomicityStressTest {
3131
launchReceiver()
3232
val rnd = Random()
3333
while (System.currentTimeMillis() < deadline) {
34-
when (rnd.nextInt(4)) {
34+
when (rnd.nextInt(3)) {
3535
0 -> { // cancel & restart sender
3636
stopSender()
3737
launchSender()
@@ -41,7 +41,6 @@ class RendezvousChannelAtomicityStressTest {
4141
launchReceiver()
4242
}
4343
2 -> yield() // just yield (burn a little time)
44-
3 -> delay(1L) // delay for more a bit
4544
}
4645
}
4746
stopSender()

0 commit comments

Comments
 (0)