Skip to content

Commit f27d176

Browse files
committed
CoroutineScheduler parking strategy rework
* WorkQueue.trySteal reports not only whether the steal was successful, but also a waiting time unless task becomes stealable * CoroutineScheduler.trySteal attempts to steal from all the workers (starting from the random position) per iteration to have deterministic stealing * Parking mechanics rework. After unsuccessful findTask, worker immediately adds itself to parking stack, then rescans all the queues to avoid missing tryUnparks and only then parks itself (parking duration depends on WorkQueue.trySteal result), terminating later * Excessive spinning and parking is completely eliminated, significantly (x3) reducing CPU-consumption and making CoroutineScheduler on-par with FJP and FTP on Ktor-like workloads * Downside of aggressive parking is a cost of slow-path unpark payed by external submitters that can be shown in degraded DispatchersContextSwitchBenchmark. Follow-up commits will fix that problem * Retry on tryStealLastScheduled failures to avoid potential starvation * Merge available CPU permits with controlState to simplify reasoning about pool state and make all state transitions atomic * Get rid of synthetic accessors
1 parent 5202a8b commit f27d176

File tree

8 files changed

+190
-182
lines changed

8 files changed

+190
-182
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
# Kotlin
6-
version=1.3.2-SNAPSHOT
6+
version=1.3.2-sched10
77
group=org.jetbrains.kotlinx
88
kotlin_version=1.3.60
99

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 149 additions & 152 deletions
Large diffs are not rendered by default.

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ internal const val BUFFER_CAPACITY_BASE = 7
1212
internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE
1313
internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
1414

15+
internal const val TASK_STOLEN = -1L
16+
internal const val NOTHING_TO_STEAL = -2L
17+
1518
/**
1619
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
1720
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
@@ -49,10 +52,7 @@ internal class WorkQueue {
4952
* This is in general harmless because steal will be blocked by timer
5053
*/
5154
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
52-
53-
// TODO replace with inlined array when atomicfu will support it
5455
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
55-
5656
private val lastScheduledTask = atomic<Task?>(null)
5757

5858
private val producerIndex = atomic(0)
@@ -93,30 +93,43 @@ internal class WorkQueue {
9393
/**
9494
* Tries stealing from [victim] queue into this queue, using [globalQueue] to offload stolen tasks in case of current queue overflow.
9595
*
96-
* @return whether any task was stolen
96+
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
97+
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
9798
*/
98-
fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean {
99+
fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Long {
99100
if (victim.stealBatch { task -> add(task, globalQueue) }) {
100-
return true
101+
return TASK_STOLEN
101102
}
102103
return tryStealLastScheduled(victim, globalQueue)
103104
}
104105

106+
/**
107+
* Contract on return value is the same as for [trySteal]
108+
*/
105109
private fun tryStealLastScheduled(
106110
victim: WorkQueue,
107111
globalQueue: GlobalQueue
108-
): Boolean {
109-
val lastScheduled = victim.lastScheduledTask.value ?: return false
110-
val time = schedulerTimeSource.nanoTime()
111-
if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) {
112-
return false
113-
}
112+
): Long {
113+
while (true) {
114+
val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
115+
// TODO time wraparound ?
116+
val time = schedulerTimeSource.nanoTime()
117+
val staleness = time - lastScheduled.submissionTime
118+
if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
119+
return WORK_STEALING_TIME_RESOLUTION_NS - staleness
120+
}
114121

115-
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
116-
add(lastScheduled, globalQueue)
117-
return true
122+
/*
123+
* If CAS has failed, either someone else had stolen this task or the owner executed this task
124+
* and dispatched another one. In the latter case we should retry to avoid missing task.
125+
*/
126+
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
127+
add(lastScheduled, globalQueue)
128+
return TASK_STOLEN
129+
}
130+
continue
118131
}
119-
return false
132+
120133
}
121134

122135
internal fun size(): Int = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package kotlinx.coroutines.scheduling
66

77
import kotlinx.coroutines.*
88
import org.junit.*
9+
import org.junit.Test
910
import java.util.concurrent.atomic.*
11+
import kotlin.test.*
1012

1113
class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
1214
private val concurrentWorkers = AtomicInteger(0)
@@ -33,30 +35,27 @@ class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
3335
}
3436
}
3537
}
36-
3738
tasks.forEach { it.await() }
3839
}
39-
4040
checkPoolThreadsCreated(2..4)
4141
}
4242

4343
@Test
4444
fun testPingPongThreadsCount() = runBlocking {
4545
corePoolSize = CORES_COUNT
4646
val iterations = 100_000 * stressTestMultiplier
47-
// Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
47+
val completed = AtomicInteger(0)
4848
for (i in 1..iterations) {
4949
val tasks = (1..2).map {
5050
async(dispatcher) {
5151
// Useless work
5252
concurrentWorkers.incrementAndGet()
5353
concurrentWorkers.decrementAndGet()
54+
completed.incrementAndGet()
5455
}
5556
}
56-
5757
tasks.forEach { it.await() }
5858
}
59-
60-
checkPoolThreadsCreated(CORES_COUNT)
59+
assertEquals(2 * iterations, completed.get())
6160
}
6261
}

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,4 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
221221
fun testZeroParallelism() {
222222
blockingDispatcher(0)
223223
}
224-
}
224+
}

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44

55
package kotlinx.coroutines.scheduling
66

7-
import kotlinx.coroutines.TestBase
8-
import org.junit.Test
7+
import kotlinx.coroutines.*
8+
import org.junit.*
99
import java.lang.Runnable
1010
import java.util.concurrent.*
11-
import java.util.concurrent.CountDownLatch
1211
import kotlin.coroutines.*
1312

1413
class CoroutineSchedulerTest : TestBase() {

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class WorkQueueStressTest : TestBase() {
9191
val myQueue = WorkQueue()
9292
startLatch.await()
9393
while (stolen.size != offerIterations) {
94-
if (!myQueue.trySteal(producerQueue, stolen)) {
94+
if (myQueue.trySteal(producerQueue, stolen) != NOTHING_TO_STEAL) {
9595
stolen.addAll(myQueue.drain().map { task(it) })
9696
}
9797
}

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ class WorkQueueTest : TestBase() {
6262
timeSource.step(3)
6363

6464
val stealer = WorkQueue()
65-
require(stealer.trySteal(victim, globalQueue))
65+
assertEquals(TASK_STOLEN, stealer.trySteal(victim, globalQueue))
6666
assertEquals(arrayListOf(1L), stealer.drain())
6767

68-
require(stealer.trySteal(victim, globalQueue))
68+
assertEquals(TASK_STOLEN, stealer.trySteal(victim, globalQueue))
6969
assertEquals(arrayListOf(2L), stealer.drain())
7070
}
7171
}

0 commit comments

Comments
 (0)