Skip to content

Commit e50fb89

Browse files
committed
Scheduler improvements:
Fix (one more) ABA problem in tryUnpark Global TaskContext to speedup CPU tasks Documentation improvements Stress tests constants tuned to be more robust and faster on local machine (and when ran from IDEA) Style fixes
1 parent 5c7454b commit e50fb89

File tree

11 files changed

+78
-59
lines changed

11 files changed

+78
-59
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPMCQueue.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ internal open class LockFreeMPMCQueue<T : LockFreeMPMCQueueNode<T>> {
2323
atomic(LockFreeMPMCQueueNode<T>() as T) // sentinel
2424

2525
private val tail = atomic(head.value)
26+
internal val headValue: T get() = head.value
2627

2728
public fun addLast(node: T): Boolean {
2829
tail.loop { curTail ->
@@ -47,8 +48,7 @@ internal open class LockFreeMPMCQueue<T : LockFreeMPMCQueueNode<T>> {
4748
}
4849
}
4950

50-
@PublishedApi internal val headValue: T get() = head.value
51-
@PublishedApi internal fun headCas(curHead: T, update: T) = head.compareAndSet(curHead, update)
51+
fun headCas(curHead: T, update: T) = head.compareAndSet(curHead, update)
5252

5353
public inline fun removeFistOrNullIf(predicate: (T) -> Boolean): T? {
5454
while (true) {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import java.util.concurrent.*
99
import java.util.concurrent.locks.*
1010

1111
/**
12-
* Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutine over worker threads,
12+
* Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines over worker threads,
1313
* including both CPU-intensive and blocking tasks.
1414
*
1515
* Current scheduler implementation has two optimization targets:
@@ -27,7 +27,7 @@ import java.util.concurrent.locks.*
2727
*
2828
* When a coroutine is dispatched from within scheduler worker, it's placed into the head of worker run queue.
2929
* If the head is not empty, the task from the head is moved to the tail. Though it is unfair scheduling policy,
30-
* it couples communicating coroutines into one and eliminates scheduling latency that arises from placing task in the end of the queue.
30+
* it effectively couples communicating coroutines into one and eliminates scheduling latency that arises from placing task to the end of the queue.
3131
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise queue degenerates to stack.
3232
* When a coroutine is dispatched from an external thread, it's put into the global queue.
3333
*
@@ -70,7 +70,7 @@ internal class CoroutineScheduler(
7070
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
7171
}
7272
require(idleWorkerKeepAliveNs > 0) {
73-
"Idle worker keep alive time $idleWorkerKeepAliveNs must be postiive"
73+
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
7474
}
7575
}
7676

@@ -107,18 +107,21 @@ internal class CoroutineScheduler(
107107
*
108108
* Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
109109
*/
110-
private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
111-
require(oldIndex > 0 && newIndex >= 0)
110+
private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
112111
parkedWorkersStack.loop { top ->
113112
val index = (top and PARKED_INDEX_MASK).toInt()
114113
val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
115114
val updIndex = if (index == oldIndex) {
116-
if (newIndex == 0)
115+
if (newIndex == 0) {
117116
parkedWorkersStackNextIndex(workers[oldIndex]!!)
118-
else
117+
}
118+
else {
119119
newIndex
120-
} else
120+
}
121+
} else {
121122
index // no change to index, but update version
123+
}
124+
122125
if (updIndex < 0) return@loop // retry
123126
if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
124127
}
@@ -132,7 +135,6 @@ internal class CoroutineScheduler(
132135
* See [Worker.doPark].
133136
*/
134137
private fun parkedWorkersStackPush(worker: Worker) {
135-
// assert(worker.isParked)
136138
if (worker.nextParkedWorker !== NOT_IN_STACK) return // already in stack, bail out
137139
/*
138140
* The below loop can be entered only if this worker was not in the stack and, since no other thread
@@ -172,7 +174,7 @@ internal class CoroutineScheduler(
172174
* Successful CAS of the stack top completes successful pop.
173175
*/
174176
if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
175-
/**
177+
/*
176178
* We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
177179
* currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
178180
* adding itself again. It does not matter, since we are going it invoke unpark on the thread
@@ -213,8 +215,11 @@ internal class CoroutineScheduler(
213215
* [workers] is array of lazily created workers up to [maxPoolSize] workers.
214216
* [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
215217
* [blockingWorkers] is count of running workers which are executing [TaskMode.PROBABLY_BLOCKING] task.
218+
* All mutations of array's content are guarded by lock.
216219
*
217-
* **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value).
220+
* **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
221+
* workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
222+
* works properly
218223
*/
219224
private val workers: Array<Worker?> = arrayOfNulls(maxPoolSize + 1)
220225

@@ -323,7 +328,7 @@ internal class CoroutineScheduler(
323328
* @param taskContext concurrency context of given [block]
324329
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
325330
*/
326-
fun dispatch(block: Runnable, taskContext: TaskContext? = null, fair: Boolean = false) {
331+
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
327332
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
328333
val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
329334
// try to submit the task to the local queue and act depending on the result
@@ -455,7 +460,6 @@ internal class CoroutineScheduler(
455460
if (worker.scheduler !== this) return NOT_ADDED // different scheduler's worker (!!!)
456461

457462
var result = ADDED
458-
459463
if (task.mode == TaskMode.NON_BLOCKING) {
460464
/*
461465
* If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
@@ -602,7 +606,7 @@ internal class CoroutineScheduler(
602606
* It is set to the termination deadline when started doing [blockingWorkerIdle] and it reset
603607
* when there is a task. It servers as protection against spurious wakeups of parkNanos.
604608
*/
605-
var terminationDeadline = 0L
609+
private var terminationDeadline = 0L
606610

607611
/**
608612
* Reference to the next worker in the [parkedWorkersStack].
@@ -613,17 +617,17 @@ internal class CoroutineScheduler(
613617
var nextParkedWorker: Any? = NOT_IN_STACK
614618

615619
/**
616-
* Tries to set [terminationState] to [FORBIDDEN] and returns `false` if the worker is
617-
* already [TERMINATED].
620+
* Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
621+
* This attempt may fail either because worker terminated itself or because someone else
622+
* claimed this worker (though this case is rare, because require very bad timings)
618623
*/
619-
fun tryForbidTermination(): Boolean {
620-
terminationState.loop {state ->
621-
when(state) {
622-
TERMINATED -> return false // already terminated
623-
FORBIDDEN -> return true // already forbidden
624-
ALLOWED -> if (terminationState.compareAndSet(ALLOWED, FORBIDDEN)) return true
625-
else -> error("Invalid terminationState = $state")
626-
}
624+
fun tryForbidTermination(): Boolean {
625+
val state = terminationState.value
626+
return when (state) {
627+
TERMINATED -> false // already terminated
628+
FORBIDDEN -> false // already forbidden, someone else claimed this worker
629+
ALLOWED -> terminationState.compareAndSet(ALLOWED, FORBIDDEN)
630+
else -> error("Invalid terminationState = $state")
627631
}
628632
}
629633

@@ -730,7 +734,6 @@ internal class CoroutineScheduler(
730734
}
731735
}
732736

733-
734737
private fun afterTask(task: Task) {
735738
if (task.mode != TaskMode.NON_BLOCKING) {
736739
decrementBlockingWorkers()
@@ -852,9 +855,9 @@ internal class CoroutineScheduler(
852855
}
853856

854857
/**
855-
* Method checks whether new blocking tasks arrived to pool when worker decided
858+
* Checks whether new blocking tasks arrived to the pool when worker decided
856859
* it can go to deep park/termination and puts recently arrived task to its local queue.
857-
* Returns `true` if there is no blocking task in queue.
860+
* Returns `true` if there is no blocking tasks in the queue.
858861
*/
859862
private fun blockingQuiescence(): Boolean {
860863
globalQueue.removeFirstBlockingModeOrNull()?.let {
@@ -914,6 +917,8 @@ internal class CoroutineScheduler(
914917
val created = createdWorkers
915918
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
916919
if (created < 2) return null
920+
921+
// TODO to guarantee quiescence it's probably worth to do a full scan
917922
var stealIndex = lastStealIndex
918923
if (stealIndex == 0) stealIndex = nextInt(created) // start with random steal index
919924
stealIndex++ // then go sequentially
@@ -930,22 +935,26 @@ internal class CoroutineScheduler(
930935
}
931936

932937
enum class WorkerState {
933-
/*
934-
* Has CPU token and either executes [TaskMode.NON_BLOCKING] task or tries to steal one (~in busy wait).
938+
/**
939+
* Has CPU token and either executes [TaskMode.NON_BLOCKING] task or tries to steal one
935940
*/
936941
CPU_ACQUIRED,
942+
937943
/**
938944
* Executing task with [TaskMode.PROBABLY_BLOCKING].
939945
*/
940946
BLOCKING,
947+
941948
/**
942949
* Currently parked.
943950
*/
944951
PARKING,
952+
945953
/**
946954
* Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
947955
*/
948956
RETIRING,
957+
949958
/**
950959
* Terminal state, will no longer be used
951960
*/

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ExperimentalCoroutineDispatcher(
5454
return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
5555
}
5656

57-
internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit =
57+
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
5858
coroutineScheduler.dispatch(block, context, fair)
5959

6060
// fot tests only

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,15 @@ internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
4242
internal var schedulerTimeSource: TimeSource = NanoTimeSource
4343

4444
internal enum class TaskMode {
45-
// Marker indicating that task is CPU-bound and will not block
45+
46+
/**
47+
* Marker indicating that task is CPU-bound and will not block
48+
*/
4649
NON_BLOCKING,
47-
// Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required
50+
51+
/**
52+
* Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required
53+
*/
4854
PROBABLY_BLOCKING,
4955
}
5056

@@ -53,22 +59,26 @@ internal interface TaskContext {
5359
fun afterTask()
5460
}
5561

62+
internal object NonBlockingContext : TaskContext {
63+
override val taskMode: TaskMode = TaskMode.NON_BLOCKING
64+
65+
override fun afterTask() {
66+
// Nothing for non-blocking context
67+
}
68+
}
69+
5670
internal class Task(
5771
@JvmField val block: Runnable,
5872
@JvmField val submissionTime: Long,
59-
@JvmField val taskContext: TaskContext?
73+
@JvmField val taskContext: TaskContext
6074
) : Runnable, LockFreeMPMCQueueNode<Task>() {
61-
val mode: TaskMode get() = taskContext?.taskMode ?: TaskMode.NON_BLOCKING
75+
val mode: TaskMode get() = taskContext.taskMode
6276

6377
override fun run() {
64-
if (taskContext == null) {
78+
try {
6579
block.run()
66-
} else {
67-
try {
68-
block.run()
69-
} finally {
70-
taskContext.afterTask()
71-
}
80+
} finally {
81+
taskContext.afterTask()
7282
}
7383
}
7484

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ internal class WorkQueue {
8787
}
8888

8989
/**
90-
* Tries stealing from [victim] queue into this queue, using [globalQueue] to offload on overflow.
91-
*
90+
* Tries stealing from [victim] queue into this queue, using [globalQueue] to offload stolen tasks in case of current queue overflow.
91+
*
9292
* @return whether any task was stolen
9393
*/
9494
fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean {
@@ -119,6 +119,7 @@ internal class WorkQueue {
119119
if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) {
120120
return false
121121
}
122+
122123
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
123124
add(lastScheduled, globalQueue)
124125
return true

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,22 @@ package kotlinx.coroutines.experimental.scheduling
22

33
import kotlinx.coroutines.experimental.*
44
import org.junit.*
5+
import java.util.concurrent.*
56
import java.util.concurrent.atomic.*
67

78
class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
89
private val concurrentWorkers = AtomicInteger(0)
910

11+
@Before
12+
fun setUp() {
13+
// In case of starvation test will hang
14+
idleWorkerKeepAliveNs = Long.MAX_VALUE
15+
}
16+
1017
@Test
1118
fun testAddPollRace() = runBlocking {
1219
val limitingDispatcher = blockingDispatcher(1)
13-
val iterations = 100_000 * stressTestMultiplier
20+
val iterations = 25_000 * stressTestMultiplier
1421
// Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
1522
for (i in 1..iterations) {
1623
val tasks = (1..2).map {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherStressTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.junit.*
55
import java.util.concurrent.*
66
import java.util.concurrent.atomic.*
77

8-
class BlockingCoroutineDispatcherStressTest() : SchedulerTestBase() {
8+
class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() {
99

1010
init {
1111
corePoolSize = CORES_COUNT
@@ -84,7 +84,7 @@ class BlockingCoroutineDispatcherStressTest() : SchedulerTestBase() {
8484
@Test
8585
fun testBlockingTasksStarvation() = runBlocking {
8686
corePoolSize = 2 // Easier to reproduce race with unparks
87-
val iterations = 50_000 * stressTestMultiplier
87+
val iterations = 10_000 * stressTestMultiplier
8888
val blockingLimit = 4 // CORES_COUNT * 3
8989
val blocking = blockingDispatcher(blockingLimit)
9090

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class CoroutineSchedulerStressTest : TestBase() {
1313

1414
private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher()
1515
private val observedThreads = ConcurrentHashMap<Thread, Long>()
16-
private val tasksNum = 4_000_000 * stressTestMultiplier
16+
private val tasksNum = 2_000_000 * stressTestMultiplier
1717
private val processed = AtomicInteger(0)
1818
private val finishLatch = CountDownLatch(1)
1919

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,6 @@ abstract class SchedulerTestBase : TestBase() {
3737
require(threads in range) { "Expected threads in $range interval, but has $threads" }
3838
}
3939

40-
/**
41-
* Asserts that [expectedThreadsCount] of pool worker threads exists at the time of method invocation
42-
*/
43-
fun checkPoolThreadsExist(expectedThreadsCount: Int = CORES_COUNT) {
44-
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }.count()
45-
require(threads == expectedThreadsCount) { "Expected $expectedThreadsCount threads, but has $threads" }
46-
}
47-
4840
private fun maxSequenceNumber(): Int? {
4941
return Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }
5042
.map { sequenceNumber(it.name) }.max()

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import kotlin.test.*
1111
class WorkQueueStressTest : TestBase() {
1212

1313
private val threads = mutableListOf<Thread>()
14-
private val offerIterations = 100_000 * stressTestMultiplier
14+
private val offerIterations = 100_000 * stressTestMultiplierSqrt // memory pressure, not CPU time
1515
private val stealersCount = 6
1616
private val stolenTasks = Array(stealersCount) { Queue() }
1717
private val globalQueue = Queue() // only producer will use it

0 commit comments

Comments
 (0)