5
5
package kotlinx.coroutines.internal
6
6
7
7
import kotlinx.atomicfu.*
8
+ import java.util.*
8
9
import java.util.concurrent.atomic.*
9
10
10
- private typealias Core <E > = LockFreeMPSCQueueCore <E >
11
+ private typealias Core <E > = LockFreeTaskQueueCore <E >
11
12
12
13
/* *
13
- * Lock-free Multiply-Producer Single-Consumer Queue.
14
- * *Note: This queue is NOT linearizable. It provides only quiescent consistency for its operations.*
14
+ * Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes.
15
15
*
16
+ * **Note 1: This queue is NOT linearizable. It provides only quiescent consistency for its operations.**
17
+ * However, this guarantee is strong enough for task-scheduling purposes.
16
18
* In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue:
17
19
*
18
20
* ```
19
21
* Thread 1: addLast(1) = true, removeFirstOrNull() = null
20
22
* Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread
21
23
* ```
24
+ *
25
+ * **Note 2: When this queue is used with multiple consumers (`singleConsumer == false`) this it is NOT lock-free.**
26
+ * In particular, consumer spins until producer finishes its operation in the case of near-empty queue.
27
+ * It is a very short window that could manifest itself rarely and only under specific load conditions,
28
+ * but it still deprives this algorithm of its lock-freedom.
22
29
*/
23
- internal class LockFreeMPSCQueue <E : Any > {
24
- private val _cur = atomic(Core <E >(Core .INITIAL_CAPACITY ))
30
+ internal open class LockFreeTaskQueue <E : Any >(
31
+ singleConsumer : Boolean // true when there is only a single consumer (slightly faster & lock-free)
32
+ ) {
33
+ private val _cur = atomic(Core <E >(Core .INITIAL_CAPACITY , singleConsumer))
25
34
26
35
// Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
27
36
val isEmpty: Boolean get() = _cur .value.isEmpty
37
+ val size: Int get() = _cur .value.size
28
38
29
39
fun close () {
30
40
_cur .loop { cur ->
@@ -44,22 +54,29 @@ internal class LockFreeMPSCQueue<E : Any> {
44
54
}
45
55
46
56
@Suppress(" UNCHECKED_CAST" )
47
- fun removeFirstOrNull (): E ? {
57
+ fun removeFirstOrNull (): E ? = removeFirstOrNullIf { true }
58
+
59
+ @Suppress(" UNCHECKED_CAST" )
60
+ inline fun removeFirstOrNullIf (predicate : (E ) -> Boolean ): E ? {
48
61
_cur .loop { cur ->
49
- val result = cur.removeFirstOrNull( )
62
+ val result = cur.removeFirstOrNullIf(predicate )
50
63
if (result != = Core .REMOVE_FROZEN ) return result as E ?
51
64
_cur .compareAndSet(cur, cur.next())
52
65
}
53
66
}
67
+
68
+ // Used for validation in tests only
69
+ fun <R > map (transform : (E ) -> R ): List <R > = _cur .value.map(transform)
54
70
}
55
71
56
72
/* *
57
- * Lock-free Multiply-Producer Single-Consumer Queue core.
58
- * *Note: This queue is NOT linearizable. It provides only quiescent consistency for its operations.*
59
- *
60
- * @see LockFreeMPSCQueue
73
+ * Lock-free Multiply-Producer xxx-Consumer Queue core.
74
+ * @see LockFreeTaskQueue
61
75
*/
62
- internal class LockFreeMPSCQueueCore <E : Any >(private val capacity : Int ) {
76
+ internal class LockFreeTaskQueueCore <E : Any >(
77
+ private val capacity : Int ,
78
+ private val singleConsumer : Boolean // true when there is only a single consumer (slightly faster)
79
+ ) {
63
80
private val mask = capacity - 1
64
81
private val _next = atomic<Core <E >? > (null )
65
82
private val _state = atomic(0L )
@@ -72,6 +89,7 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
72
89
73
90
// Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
74
91
val isEmpty: Boolean get() = _state .value.withState { head, tail -> head == tail }
92
+ val size: Int get() = _state .value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK }
75
93
76
94
fun close (): Boolean {
77
95
_state .update { state ->
@@ -87,9 +105,23 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
87
105
_state .loop { state ->
88
106
if (state and (FROZEN_MASK or CLOSED_MASK ) != 0L ) return state.addFailReason() // cannot add
89
107
state.withState { head, tail ->
90
- // there could be one REMOVE element beyond head that we cannot stump up ,
108
+ // If queue is Single-Consumer than there could be one element beyond head that we cannot overwrite ,
91
109
// so we check for full queue with an extra margin of one element
92
110
if ((tail + 2 ) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
111
+ // If queue is Multi-Consumer then the consumer could still have not cleared element
112
+ // despite the above check for one free slot.
113
+ if (! singleConsumer && array[tail and mask] != null ) {
114
+ // There are two options in this situation
115
+ // 1. Spin-wait until consumer clears the slot
116
+ // 2. Freeze & resize to avoid spinning
117
+ // We use heuristic here to avoid memory-overallocation
118
+ // Freeze & reallocate when queue is small or more than half of the queue is used
119
+ if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1 ) {
120
+ return ADD_FROZEN
121
+ }
122
+ // otherwise spin
123
+ return @loop
124
+ }
93
125
val newTail = (tail + 1 ) and MAX_CAPACITY_MASK
94
126
if (_state .compareAndSet(state, state.updateTail(newTail))) {
95
127
// successfully added
@@ -127,23 +159,38 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
127
159
return null
128
160
}
129
161
130
- // SINGLE CONSUMER
131
162
// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
132
- fun removeFirstOrNull (): Any? {
163
+ fun removeFirstOrNull (): Any? = removeFirstOrNullIf { true }
164
+
165
+ // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
166
+ inline fun removeFirstOrNullIf (predicate : (E ) -> Boolean ): Any? {
133
167
_state .loop { state ->
134
168
if (state and FROZEN_MASK != 0L ) return REMOVE_FROZEN // frozen -- cannot modify
135
169
state.withState { head, tail ->
136
170
if ((tail and mask) == (head and mask)) return null // empty
137
- // because queue is Single Consumer, then element == null|Placeholder can only be when add has not finished yet
138
- val element = array[head and mask] ? : return null
139
- if (element is Placeholder ) return null // same story -- consider it not added yet
171
+ val element = array[head and mask]
172
+ if (element == null ) {
173
+ // If queue is Single-Consumer, then element == null only when add has not finished yet
174
+ if (singleConsumer) return null // consider it not added yet
175
+ // retry (spin) until consumer adds it
176
+ return @loop
177
+ }
178
+ // element == Placeholder can only be when add has not finished yet
179
+ if (element is Placeholder ) return null // consider it not added yet
180
+ // now we tentative know element to remove -- check predicate
181
+ @Suppress(" UNCHECKED_CAST" )
182
+ if (! predicate(element as E )) return null
140
183
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
141
184
val newHead = (head + 1 ) and MAX_CAPACITY_MASK
142
185
if (_state .compareAndSet(state, state.updateHead(newHead))) {
186
+ // Array could have been copied by another thread and it is perfectly fine, since only elements
187
+ // between head and tail were copied and there are no extra steps we should take here
143
188
array[head and mask] = null // now can safely put null (state was updated)
144
189
return element // successfully removed in fast-path
145
190
}
146
- // Slow-path for remove in case of interference
191
+ // Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
192
+ if (! singleConsumer) return @loop
193
+ // Single-consumer queue goes to slow-path for remove in case of interference
147
194
var cur = this
148
195
while (true ) {
149
196
@Suppress(" UNUSED_VALUE" )
@@ -169,7 +216,7 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
169
216
}
170
217
}
171
218
172
- fun next (): LockFreeMPSCQueueCore <E > = allocateOrGetNextCopy(markFrozen())
219
+ fun next (): LockFreeTaskQueueCore <E > = allocateOrGetNextCopy(markFrozen())
173
220
174
221
private fun markFrozen (): Long =
175
222
_state .updateAndGet { state ->
@@ -185,7 +232,7 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
185
232
}
186
233
187
234
private fun allocateNextCopy (state : Long ): Core <E > {
188
- val next = LockFreeMPSCQueueCore <E >(capacity * 2 )
235
+ val next = LockFreeTaskQueueCore <E >(capacity * 2 , singleConsumer )
189
236
state.withState { head, tail ->
190
237
var index = head
191
238
while (index and mask != tail and mask) {
@@ -198,44 +245,64 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
198
245
return next
199
246
}
200
247
248
+ // Used for validation in tests only
249
+ fun <R > map (transform : (E ) -> R ): List <R > {
250
+ val res = ArrayList <R >(array.length())
251
+ _state .value.withState { head, tail ->
252
+ var index = head
253
+ while (index and mask != tail and mask) {
254
+ // replace nulls with placeholders on copy
255
+ val element = array[index and mask]
256
+ @Suppress(" UNCHECKED_CAST" )
257
+ if (element != null && element !is Placeholder ) res.add(transform(element as E ))
258
+ index++
259
+ }
260
+ }
261
+ return res
262
+ }
263
+
264
+
201
265
// Instance of this class is placed into array when we have to copy array, but addLast is in progress --
202
266
// it had already reserved a slot in the array (with null) and have not yet put its value there.
203
267
// Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
204
- private class Placeholder (@JvmField val index : Int )
268
+ // Internal because of inlining
269
+ internal class Placeholder (@JvmField val index : Int )
205
270
206
- @Suppress(" PrivatePropertyName" )
271
+ @Suppress(" PrivatePropertyName" , " MemberVisibilityCanBePrivate " )
207
272
internal companion object {
208
- internal const val INITIAL_CAPACITY = 8
273
+ const val INITIAL_CAPACITY = 8
274
+
275
+ const val CAPACITY_BITS = 30
276
+ const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS ) - 1
277
+ const val HEAD_SHIFT = 0
278
+ const val HEAD_MASK = MAX_CAPACITY_MASK .toLong() shl HEAD_SHIFT
279
+ const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
280
+ const val TAIL_MASK = MAX_CAPACITY_MASK .toLong() shl TAIL_SHIFT
209
281
210
- private const val CAPACITY_BITS = 30
211
- private const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS ) - 1
212
- private const val HEAD_SHIFT = 0
213
- private const val HEAD_MASK = MAX_CAPACITY_MASK .toLong() shl HEAD_SHIFT
214
- private const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
215
- private const val TAIL_MASK = MAX_CAPACITY_MASK .toLong() shl TAIL_SHIFT
282
+ const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
283
+ const val FROZEN_MASK = 1L shl FROZEN_SHIFT
284
+ const val CLOSED_SHIFT = FROZEN_SHIFT + 1
285
+ const val CLOSED_MASK = 1L shl CLOSED_SHIFT
216
286
217
- private const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
218
- private const val FROZEN_MASK = 1L shl FROZEN_SHIFT
219
- private const val CLOSED_SHIFT = FROZEN_SHIFT + 1
220
- private const val CLOSED_MASK = 1L shl CLOSED_SHIFT
287
+ const val MIN_ADD_SPIN_CAPACITY = 1024
221
288
222
- @JvmField internal val REMOVE_FROZEN = Symbol (" REMOVE_FROZEN" )
289
+ @JvmField val REMOVE_FROZEN = Symbol (" REMOVE_FROZEN" )
223
290
224
- internal const val ADD_SUCCESS = 0
225
- internal const val ADD_FROZEN = 1
226
- internal const val ADD_CLOSED = 2
291
+ const val ADD_SUCCESS = 0
292
+ const val ADD_FROZEN = 1
293
+ const val ADD_CLOSED = 2
227
294
228
- private infix fun Long.wo (other : Long ) = this and other.inv ()
229
- private fun Long.updateHead (newHead : Int ) = (this wo HEAD_MASK ) or (newHead.toLong() shl HEAD_SHIFT )
230
- private fun Long.updateTail (newTail : Int ) = (this wo TAIL_MASK ) or (newTail.toLong() shl TAIL_SHIFT )
295
+ infix fun Long.wo (other : Long ) = this and other.inv ()
296
+ fun Long.updateHead (newHead : Int ) = (this wo HEAD_MASK ) or (newHead.toLong() shl HEAD_SHIFT )
297
+ fun Long.updateTail (newTail : Int ) = (this wo TAIL_MASK ) or (newTail.toLong() shl TAIL_SHIFT )
231
298
232
- private inline fun <T > Long.withState (block : (head: Int , tail: Int ) -> T ): T {
299
+ inline fun <T > Long.withState (block : (head: Int , tail: Int ) -> T ): T {
233
300
val head = ((this and HEAD_MASK ) shr HEAD_SHIFT ).toInt()
234
301
val tail = ((this and TAIL_MASK ) shr TAIL_SHIFT ).toInt()
235
302
return block(head, tail)
236
303
}
237
304
238
305
// FROZEN | CLOSED
239
- private fun Long.addFailReason (): Int = if (this and CLOSED_MASK != 0L ) ADD_CLOSED else ADD_FROZEN
306
+ fun Long.addFailReason (): Int = if (this and CLOSED_MASK != 0L ) ADD_CLOSED else ADD_FROZEN
240
307
}
241
308
}
0 commit comments