Skip to content

Commit 4236c8c

Browse files
committed
New work stealing and unparking strategies
* Work stealing: get rid of global queue for offloading during stealing because it never happens in fact * Guard all critical invariants related to work-stealing with asserts * New work signalling strategy that guarantees complete liveness in the face of "accidentally-blocking" CPU tasks * Advanced double-phase unparking mechanism that mitigates the most expensive part of signalling an additional work * Current limitation: blocking tasks are not yet properly signalled
1 parent f27d176 commit 4236c8c

File tree

7 files changed

+199
-253
lines changed

7 files changed

+199
-253
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 53 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.atomicfu.*
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.internal.*
1010
import java.io.*
11+
import java.lang.AssertionError
1112
import java.util.concurrent.*
1213
import java.util.concurrent.atomic.*
1314
import java.util.concurrent.locks.*
@@ -272,12 +273,6 @@ internal class CoroutineScheduler(
272273
@JvmField
273274
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
274275

275-
// Local queue 'add' results
276-
private const val ADDED = -1
277-
// Added to the local queue, but pool requires additional worker to keep up
278-
private const val ADDED_REQUIRES_HELP = 0
279-
private const val NOT_ADDED = 1
280-
281276
// Worker termination states
282277
private const val FORBIDDEN = -1
283278
private const val ALLOWED = 0
@@ -351,18 +346,13 @@ internal class CoroutineScheduler(
351346
trackTask() // this is needed for virtual time support
352347
val task = createTask(block, taskContext)
353348
// try to submit the task to the local queue and act depending on the result
354-
when (submitToLocalQueue(task, fair)) {
355-
ADDED -> return
356-
NOT_ADDED -> {
357-
// try to offload task to global queue
358-
if (!globalQueue.addLast(task)) {
359-
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
360-
throw RejectedExecutionException("$schedulerName was terminated")
361-
}
362-
requestCpuWorker()
349+
if (!submitToLocalQueue(task, fair)) {
350+
if (!globalQueue.addLast(task)) {
351+
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
352+
throw RejectedExecutionException("$schedulerName was terminated")
363353
}
364-
else -> requestCpuWorker() // ask for help
365354
}
355+
requestCpuWorker()
366356
}
367357

368358
internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
@@ -420,40 +410,12 @@ internal class CoroutineScheduler(
420410
private fun tryUnpark(): Boolean {
421411
while (true) {
422412
val worker = parkedWorkersStackPop() ?: return false
423-
/*
424-
* If we successfully took the worker out of the queue, it could be in the following states:
425-
* 1) Worker is parked. Just wake up it and reset its termination deadline to avoid
426-
* "termination during tryUnpark" race.
427-
* 2) Worker is not parked and is rescanning the queue before actual parking.
428-
* Worker state may be CPU_ACQUIRED or BLOCKING (has no permit, wants to terminate).
429-
* 3) Worker is executing some task. We can't really distinguish it from the previous case, so just proceed.
430-
* 4) Worker is terminated, proceed and try to find another one.
431-
*
432-
*
433-
* Check that the thread we've found in the queue was indeed in parking state, before we
434-
* actually try to unpark it.
435-
*/
436-
val wasParking = worker.isParking
437-
/*
438-
* Send unpark signal anyway, because the thread may have made decision to park but have not yet set its
439-
* state to parking and this could be the last thread we have (unparking random thread would not harm).
440-
*/
441-
LockSupport.unpark(worker)
442-
/*
443-
* If worker was parking, then we can be sure that our signal is not lost.
444-
* Otherwise it could be a thread in state "3", so let's try ti find another thread.
445-
*/
446-
if (!wasParking) continue
447-
/*
448-
* Terminating worker could be selected.
449-
* If it's already TERMINATED or we cannot forbid it from terminating, then try find another worker.
450-
*/
451-
if (!worker.tryForbidTermination()) continue
452-
/*
453-
* Here we've successfully unparked a thread that was parked and had forbidden it from making
454-
* decision to terminate, so we are now sure we've got some help.
455-
*/
456-
return true
413+
val time = worker.minDelayUntilStealableTask // TODO explain
414+
worker.parkingAllowed = false
415+
if (worker.signallingAllowed && time == 0L) {
416+
LockSupport.unpark(worker)
417+
}
418+
if (time == 0L && worker.tryForbidTermination()) return true
457419
}
458420
}
459421

@@ -489,57 +451,24 @@ internal class CoroutineScheduler(
489451
}
490452

491453
/**
492-
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
454+
* Returns `true` if added, `false` otherwise
493455
*/
494-
private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
495-
val worker = currentWorker() ?: return NOT_ADDED
456+
private fun submitToLocalQueue(task: Task, fair: Boolean): Boolean {
457+
val worker = currentWorker() ?: return false
496458

497459
/*
498460
* This worker could have been already terminated from this thread by close/shutdown and it should not
499461
* accept any more tasks into its local queue.
500462
*/
501-
if (worker.state === WorkerState.TERMINATED) return NOT_ADDED
502-
503-
var result = ADDED
504-
if (task.mode == TaskMode.NON_BLOCKING) {
505-
/*
506-
* If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
507-
* 1) Blocking worker is finishing its block and resumes non-blocking continuation
508-
* 2) Blocking worker starts to create non-blocking jobs
509-
*
510-
* First use-case is expected (as recommended way of using blocking contexts),
511-
* so we add non-blocking task to local queue, but also request CPU worker to mitigate second case
512-
*/
513-
if (worker.isBlocking) {
514-
result = ADDED_REQUIRES_HELP
515-
} else {
516-
/*
517-
* If thread is not blocking, then it's just tries to finish its
518-
* local work in order to park (or grab another blocking task), do not add non-blocking tasks
519-
* to its local queue if it can't acquire CPU
520-
*/
521-
val hasPermit = worker.tryAcquireCpuPermit()
522-
if (!hasPermit) {
523-
return NOT_ADDED
524-
}
525-
}
526-
}
527-
528-
val noOffloadingHappened = if (fair) {
529-
worker.localQueue.addLast(task, globalQueue)
530-
} else {
531-
worker.localQueue.add(task, globalQueue)
532-
}
533-
534-
if (noOffloadingHappened) {
535-
// When we're close to queue capacity, wake up anyone to steal work
536-
// Note: non-atomic bufferSize here is Ok (it is just a performance optimization)
537-
if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD) {
538-
return ADDED_REQUIRES_HELP
539-
}
540-
return result
541-
}
542-
return ADDED_REQUIRES_HELP
463+
if (worker.state === WorkerState.TERMINATED) return false
464+
if (task.mode == TaskMode.NON_BLOCKING && worker.isBlocking) {
465+
return false
466+
}
467+
val notAdded = with(worker.localQueue) {
468+
if (fair) addLast(task) else add(task)
469+
} ?: return true // Forgive me, Father, for this formatting
470+
globalQueue.addLast(notAdded)
471+
return true
543472
}
544473

545474
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
@@ -563,7 +492,7 @@ internal class CoroutineScheduler(
563492
val queueSizes = arrayListOf<String>()
564493
for (index in 1 until workers.length()) {
565494
val worker = workers[index] ?: continue
566-
val queueSize = worker.localQueue.size()
495+
val queueSize = worker.localQueue.size
567496
when (worker.state) {
568497
WorkerState.PARKING -> ++parkedWorkers
569498
WorkerState.BLOCKING -> {
@@ -668,6 +597,10 @@ internal class CoroutineScheduler(
668597
*/
669598
@Volatile
670599
var nextParkedWorker: Any? = NOT_IN_STACK
600+
@Volatile // TODO ughm don't ask
601+
var parkingAllowed = false
602+
@Volatile
603+
var signallingAllowed = false
671604

672605
/**
673606
* Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
@@ -719,8 +652,12 @@ internal class CoroutineScheduler(
719652
private var lastExhaustionTime = 0L
720653

721654
private var rngState = Random.nextInt()
722-
// The delay until at least one task in other worker queues will become stealable
723-
private var minDelayUntilStealableTask = 0L
655+
/*
656+
* The delay until at least one task in other worker queues will become stealable.
657+
* Volatile to avoid benign data-race
658+
*/
659+
@Volatile
660+
public var minDelayUntilStealableTask = 0L
724661

725662
override fun run() = runWorker()
726663

@@ -744,15 +681,21 @@ internal class CoroutineScheduler(
744681
*
745682
* Park duration depends on the possible state: either this is the idleWorkerKeepAliveNs or stealing deadline.
746683
*/
684+
parkingAllowed = true
747685
if (parkedWorkersStackPush(this)) {
748686
continue
749687
} else {
750-
tryReleaseCpu(WorkerState.PARKING)
751-
if (minDelayUntilStealableTask > 0) {
752-
LockSupport.parkNanos(minDelayUntilStealableTask) // No spurious wakeup check here
753-
} else {
754-
park()
688+
signallingAllowed = true
689+
if (parkingAllowed) {
690+
tryReleaseCpu(WorkerState.PARKING)
691+
if (minDelayUntilStealableTask > 0) {
692+
LockSupport.parkNanos(minDelayUntilStealableTask) // No spurious wakeup check here
693+
} else {
694+
assert { localQueue.size == 0 }
695+
park()
696+
}
755697
}
698+
signallingAllowed = false
756699
}
757700
}
758701
tryReleaseCpu(WorkerState.TERMINATED)
@@ -800,7 +743,7 @@ internal class CoroutineScheduler(
800743
val currentState = state
801744
// Shutdown sequence of blocking dispatcher
802745
if (currentState !== WorkerState.TERMINATED) {
803-
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
746+
assert { (currentState == WorkerState.BLOCKING).also { if (!it) throw AssertionError("AAA: $currentState") } } // "Expected BLOCKING state, but has $currentState"
804747
state = WorkerState.RETIRING
805748
}
806749
}
@@ -910,10 +853,12 @@ internal class CoroutineScheduler(
910853
* Checks whether new blocking tasks arrived to the pool when worker decided
911854
* it can go to deep park/termination and puts recently arrived task to its local queue.
912855
* Returns `true` if there is no blocking tasks in the queue.
856+
* Invariant: invoked only with empty local queue
913857
*/
914858
private fun blockingQuiescence(): Boolean {
859+
assert { localQueue.size == 0 }
915860
globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING)?.let {
916-
localQueue.add(it, globalQueue)
861+
localQueue.add(it)
917862
return false
918863
}
919864
return true
@@ -960,6 +905,7 @@ internal class CoroutineScheduler(
960905
}
961906

962907
private fun trySteal(): Task? {
908+
assert { localQueue.size == 0 }
963909
val created = createdWorkers
964910
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
965911
if (created < 2) {
@@ -973,7 +919,8 @@ internal class CoroutineScheduler(
973919
if (currentIndex > created) currentIndex = 1
974920
val worker = workers[currentIndex]
975921
if (worker !== null && worker !== this) {
976-
val stealResult = localQueue.trySteal(worker.localQueue, globalQueue)
922+
assert { localQueue.size == 0 }
923+
val stealResult = localQueue.tryStealFrom(victim = worker.localQueue)
977924
if (stealResult == TASK_STOLEN) {
978925
return localQueue.poll()
979926
} else if (stealResult > 0) {

0 commit comments

Comments
 (0)