Skip to content

Commit ab30d72

Browse files
committed
New deterministic algorithm for working with blocking tasks
Invariants: * Steal only one task per attempt to avoid missing steals that potentially may block the progress (check-park-check may miss tasks that are being stolen) * New WorkQueue.add invariant: bufferSize < capacity => add is always successful * Re-visited tests that expose a lot of problems * Ability to steal from the middle of work queue in order to steal blocking tasks with ABA prevention Changes: * Instead of "blocking workers" use "blocking tasks" state that is incremented on each blocking submit and decrement only when task is completed * On each work signalling try to compensate blocking tasks by enforcinf invariant "created threads == blocking tasks + up to core size workers" * Now if worker was not spuriously woken up, it has a task dedicated for him that should be found. For that reason attempt to steal blocking tasks (that may be in the middle of the work queue). Additionally, instead of scanning the whole global queue, just split it in two (one for blocking, one for regular tasks) * Get rid of conditional remove from the global queue * Avoid excessive unparks for threads that are not yet able to steal the task due to workstealing resolution: do not add such workers to the stack
1 parent 4236c8c commit ab30d72

17 files changed

+511
-576
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
# Kotlin
6-
version=1.3.2-sched10
6+
version=1.3.2-SNAPSHOT
77
group=org.jetbrains.kotlinx
88
kotlin_version=1.3.60
99

kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,9 @@ internal open class LockFreeTaskQueue<E : Any>(
5454
}
5555

5656
@Suppress("UNCHECKED_CAST")
57-
fun removeFirstOrNull(): E? = removeFirstOrNullIf { true }
58-
59-
@Suppress("UNCHECKED_CAST")
60-
inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): E? {
57+
fun removeFirstOrNull(): E? {
6158
_cur.loop { cur ->
62-
val result = cur.removeFirstOrNullIf(predicate)
59+
val result = cur.removeFirstOrNull()
6360
if (result !== Core.REMOVE_FROZEN) return result as E?
6461
_cur.compareAndSet(cur, cur.next())
6562
}
@@ -164,10 +161,7 @@ internal class LockFreeTaskQueueCore<E : Any>(
164161
}
165162

166163
// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
167-
fun removeFirstOrNull(): Any? = removeFirstOrNullIf { true }
168-
169-
// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
170-
inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): Any? {
164+
fun removeFirstOrNull(): Any? {
171165
_state.loop { state ->
172166
if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
173167
state.withState { head, tail ->
@@ -182,8 +176,6 @@ internal class LockFreeTaskQueueCore<E : Any>(
182176
// element == Placeholder can only be when add has not finished yet
183177
if (element is Placeholder) return null // consider it not added yet
184178
// now we tentative know element to remove -- check predicate
185-
@Suppress("UNCHECKED_CAST")
186-
if (!predicate(element as E)) return null
187179
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
188180
val newHead = (head + 1) and MAX_CAPACITY_MASK
189181
if (_state.compareAndSet(state, state.updateHead(newHead))) {

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 198 additions & 228 deletions
Large diffs are not rendered by default.

kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
1919
"kotlinx.coroutines.scheduler.resolution.ns", 100000L
2020
)
2121

22-
@JvmField
23-
internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = systemProp(
24-
"kotlinx.coroutines.scheduler.offload.threshold", 96, maxValue = BUFFER_CAPACITY
25-
)
26-
2722
@JvmField
2823
internal val BLOCKING_DEFAULT_PARALLELISM = systemProp(
2924
"kotlinx.coroutines.scheduler.blocking.parallelism", 16
@@ -50,7 +45,7 @@ internal val MAX_POOL_SIZE = systemProp(
5045

5146
@JvmField
5247
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
53-
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 5L)
48+
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 100000L)
5449
)
5550

5651
@JvmField
@@ -87,9 +82,11 @@ internal abstract class Task(
8782
@JvmField var taskContext: TaskContext
8883
) : Runnable {
8984
constructor() : this(0, NonBlockingContext)
90-
val mode: TaskMode get() = taskContext.taskMode
85+
inline val mode: TaskMode get() = taskContext.taskMode
9186
}
9287

88+
internal inline val Task.isBlocking get() = taskContext.taskMode == TaskMode.PROBABLY_BLOCKING
89+
9390
// Non-reusable Task implementation to wrap Runnable instances that do not otherwise implement task
9491
internal class TaskImpl(
9592
@JvmField val block: Runnable,
@@ -109,10 +106,7 @@ internal class TaskImpl(
109106
}
110107

111108
// Open for tests
112-
internal open class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false) {
113-
public fun removeFirstWithModeOrNull(mode: TaskMode): Task? =
114-
removeFirstOrNullIf { it.mode == mode }
115-
}
109+
internal class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false)
116110

117111
internal abstract class TimeSource {
118112
abstract fun nanoTime(): Long

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

Lines changed: 85 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ internal const val NOTHING_TO_STEAL = -2L
2626
* that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast.
2727
* E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order.
2828
*
29-
* ### Work offloading
30-
*
31-
* When the queue is full, half of existing tasks are offloaded to global queue which is regularly polled by other pool workers.
32-
* Offloading occurs in LIFO order for the sake of implementation simplicity: offloads should be extremely rare and occurs only in specific use-cases
33-
* (e.g. when coroutine starts heavy fork-join-like computation), so fairness is not important.
34-
* As an alternative, offloading directly to some [CoroutineScheduler.Worker] may be used, but then the strategy of selecting any idle worker
35-
* should be implemented and implementation should be aware multiple producers.
36-
*
37-
* @suppress **This is unstable API and it is subject to change.**
29+
* ### Algorithm and implementation details
30+
* This is a regular SPMC bounded queue with the additional property that tasks can be removed from the middle of the queue
31+
* (scheduler workers without a CPU permit steal blocking tasks via this mechanism). Such property enforces us to use CAS in
32+
* order to properly claim value from the buffer.
33+
* Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem.
34+
* Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless.
35+
* "I have discovered a truly marvelous proof of this, which this margin is too narrow to contain"
3836
*/
3937
internal class WorkQueue {
4038

@@ -58,18 +56,21 @@ internal class WorkQueue {
5856

5957
private val producerIndex = atomic(0)
6058
private val consumerIndex = atomic(0)
59+
// Shortcut to avoid scanning queue without blocking tasks
60+
private val blockingTasksInBuffer = atomic(0)
6161

6262
/**
6363
* Retrieves and removes task from the head of the queue
64-
* Invariant: this method is called only by the owner of the queue ([stealBatch] is not)
64+
* Invariant: this method is called only by the owner of the queue.
6565
*/
6666
fun poll(): Task? = lastScheduledTask.getAndSet(null) ?: pollBuffer()
6767

6868
/**
6969
* Invariant: Called only by the owner of the queue, returns
7070
* `null` if task was added, task that wasn't added otherwise.
7171
*/
72-
fun add(task: Task): Task? {
72+
fun add(task: Task, fair: Boolean = false): Task? {
73+
if (fair) return addLast(task)
7374
val previous = lastScheduledTask.getAndSet(task) ?: return null
7475
return addLast(previous)
7576
}
@@ -78,18 +79,20 @@ internal class WorkQueue {
7879
* Invariant: Called only by the owner of the queue, returns
7980
* `null` if task was added, task that wasn't added otherwise.
8081
*/
81-
fun addLast(task: Task): Task? {
82+
private fun addLast(task: Task): Task? {
83+
if (task.isBlocking) blockingTasksInBuffer.incrementAndGet()
8284
if (bufferSize == BUFFER_CAPACITY - 1) return task
83-
val headLocal = producerIndex.value
84-
val nextIndex = headLocal and MASK
85-
85+
val nextIndex = producerIndex.value and MASK
8686
/*
87-
* If current element is not null then we're racing with consumers for the tail. If we skip this check then
88-
* the consumer can null out current element and it will be lost. If we're racing for tail then
89-
* the queue is close to overflowing => return task
87+
* If current element is not null then we're racing with a really slow consumer that committed the consumer index,
88+
* but hasn't yet nulled out the slot, effectively preventing us from using it.
89+
* Such situations are very rare in practise (although possible) and we decided to give up a progress guarantee
90+
* to have a stronger invariant "add to queue with bufferSize == 0 is always successful".
91+
* This algorithm can still be wait-free for add, but if and only if tasks are not reusable, otherwise
92+
* nulling out the buffer wouldn't be possible.
9093
*/
91-
if (buffer[nextIndex] != null) {
92-
return task
94+
while (buffer[nextIndex] != null) {
95+
Thread.yield()
9396
}
9497
buffer.lazySet(nextIndex, task)
9598
producerIndex.incrementAndGet()
@@ -103,18 +106,52 @@ internal class WorkQueue {
103106
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
104107
*/
105108
fun tryStealFrom(victim: WorkQueue): Long {
106-
if (victim.stealBatch { task -> add(task) }) {
109+
assert { bufferSize == 0 }
110+
val task = victim.pollBuffer()
111+
if (task != null) {
112+
val notAdded = add(task)
113+
assert { notAdded == null }
107114
return TASK_STOLEN
108115
}
109-
return tryStealLastScheduled(victim)
116+
return tryStealLastScheduled(victim, blockingOnly = false)
117+
}
118+
119+
fun tryStealBlockingFrom(victim: WorkQueue): Long {
120+
assert { bufferSize == 0 }
121+
var start = victim.consumerIndex.value
122+
val end = victim.producerIndex.value
123+
val buffer = victim.buffer
124+
125+
while (start != end) {
126+
val index = start and MASK
127+
if (victim.blockingTasksInBuffer.value == 0) break
128+
val value = buffer[index]
129+
if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
130+
victim.blockingTasksInBuffer.decrementAndGet()
131+
add(value)
132+
return TASK_STOLEN
133+
} else {
134+
++start
135+
}
136+
}
137+
return tryStealLastScheduled(victim, blockingOnly = true)
138+
}
139+
140+
fun offloadAllWorkTo(globalQueue: GlobalQueue) {
141+
lastScheduledTask.getAndSet(null)?.let { globalQueue.add(it) }
142+
while (pollTo(globalQueue)) {
143+
// Steal everything
144+
}
110145
}
111146

112147
/**
113148
* Contract on return value is the same as for [tryStealFrom]
114149
*/
115-
private fun tryStealLastScheduled(victim: WorkQueue): Long {
150+
private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long {
116151
while (true) {
117152
val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
153+
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
154+
118155
// TODO time wraparound ?
119156
val time = schedulerTimeSource.nanoTime()
120157
val staleness = time - lastScheduled.submissionTime
@@ -134,49 +171,10 @@ internal class WorkQueue {
134171
}
135172
}
136173

137-
private fun GlobalQueue.add(task: Task) {
138-
/*
139-
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
140-
* been already shutdown (with the only exception of the last worker thread that might be performing
141-
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
142-
*/
143-
val added = addLast(task)
144-
assert { added }
145-
}
146-
147-
internal fun offloadAllWork(globalQueue: GlobalQueue) {
148-
lastScheduledTask.getAndSet(null)?.let { globalQueue.add(it) }
149-
while (stealBatchTo(globalQueue)) {
150-
// Steal everything
151-
}
152-
}
153-
154-
/**
155-
* Method that is invoked by external workers to steal work.
156-
* Half of the buffer (at least 1) is stolen, returns `true` if at least one task was stolen.
157-
*/
158-
private inline fun stealBatch(consumer: (Task) -> Unit): Boolean {
159-
val size = bufferSize
160-
if (size == 0) return false
161-
var toSteal = (size / 2).coerceAtLeast(1)
162-
var wasStolen = false
163-
while (toSteal-- > 0) {
164-
val tailLocal = consumerIndex.value
165-
if (tailLocal - producerIndex.value == 0) return wasStolen
166-
val index = tailLocal and MASK
167-
val element = buffer[index] ?: continue
168-
if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
169-
// 1) Help GC 2) Signal producer that this slot is consumed and may be used
170-
consumer(element)
171-
buffer[index] = null
172-
wasStolen = true
173-
}
174-
}
175-
return wasStolen
176-
}
177-
178-
private fun stealBatchTo(queue: GlobalQueue): Boolean {
179-
return stealBatch { queue.add(it) }
174+
private fun pollTo(queue: GlobalQueue): Boolean {
175+
val task = pollBuffer() ?: return false
176+
queue.add(task)
177+
return true
180178
}
181179

182180
private fun pollBuffer(): Task? {
@@ -185,8 +183,28 @@ internal class WorkQueue {
185183
if (tailLocal - producerIndex.value == 0) return null
186184
val index = tailLocal and MASK
187185
if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
188-
return buffer.getAndSet(index, null)
186+
// Nulls are allowed when blocking tasks are stolen from the middle of the queue.
187+
val value = buffer.getAndSet(index, null) ?: continue
188+
value.decrementIfBlocking()
189+
return value
189190
}
190191
}
191192
}
193+
194+
private fun Task?.decrementIfBlocking() {
195+
if (this != null && isBlocking) {
196+
val value = blockingTasksInBuffer.decrementAndGet()
197+
assert { value >= 0 }
198+
}
199+
}
192200
}
201+
202+
private fun GlobalQueue.add(task: Task) {
203+
/*
204+
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
205+
* been already shutdown (with the only exception of the last worker thread that might be performing
206+
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
207+
*/
208+
val added = addLast(task)
209+
assert { added }
210+
}

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt renamed to kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import org.junit.Test
1010
import java.util.concurrent.atomic.*
1111
import kotlin.test.*
1212

13-
class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
13+
/**
14+
* Test that ensures implementation correctness of [LimitingDispatcher] and
15+
* designed to stress its particular implementation details.
16+
*/
17+
class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() {
1418
private val concurrentWorkers = AtomicInteger(0)
1519

1620
@Before
@@ -29,15 +33,14 @@ class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
2933
async(limitingDispatcher) {
3034
try {
3135
val currentlyExecuting = concurrentWorkers.incrementAndGet()
32-
require(currentlyExecuting == 1)
36+
assertEquals(1, currentlyExecuting)
3337
} finally {
3438
concurrentWorkers.decrementAndGet()
3539
}
3640
}
3741
}
3842
tasks.forEach { it.await() }
3943
}
40-
checkPoolThreadsCreated(2..4)
4144
}
4245

4346
@Test

0 commit comments

Comments
 (0)