Skip to content

Commit d77c17c

Browse files
committed
Improve performance of task acquisition
* Do not push worker to the stack during second pass of "min duration scanning" * Locally cache whether local queue has any work to execute to save atomic getAndSet and a bunch of atomic loads * More precise rendezvous for parking * Long-scanning stealing to emulate spinning * Properly handle interference of termination sequence and protection against spurious wakeups * Documentation improvements, naming, proper spurious wakeup check
1 parent ab30d72 commit d77c17c

File tree

9 files changed

+91
-71
lines changed

9 files changed

+91
-71
lines changed

benchmarks/src/jmh/kotlin/benchmarks/scheduler/DispatchersContextSwitchBenchmark.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,10 @@ open class DispatchersContextSwitchBenchmark {
6464
repeat(nCoroutines) {
6565
launch(dispatcher) {
6666
repeat(nRepeatDelay) {
67-
delayOrYield()
67+
delay(delayTimeMs)
6868
}
6969
}
7070
}
7171
}
72+
}
7273

73-
private suspend fun delayOrYield() {
74-
delay(delayTimeMs)
75-
}
76-
}

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/CycledActorsBenchmark.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import java.util.concurrent.*
2929
* CycledActorsBenchmark.cycledActors 262144 experimental avgt 14 1804.146 ± 57.275 ms/op
3030
*/
3131
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32-
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
33-
@Fork(value = 3)
32+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
33+
@Fork(value = 1)
3434
@BenchmarkMode(Mode.AverageTime)
3535
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3636
@State(Scope.Benchmark)
@@ -43,7 +43,7 @@ open class CycledActorsBenchmark : ParametrizedDispatcherBase() {
4343
@Param("fjp", "ftp_1", "scheduler")
4444
override var dispatcher: String = "fjp"
4545

46-
@Param("524288")
46+
@Param("1", "1024")
4747
var actorStateSize = 1
4848

4949
@Benchmark

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongActorBenchmark.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.concurrent.*
2727
*/
2828
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
2929
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
30-
@Fork(value = 2)
30+
@Fork(value = 1)
3131
@BenchmarkMode(Mode.AverageTime)
3232
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3333
@State(Scope.Benchmark)

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import kotlin.coroutines.*
2020
* PingPongWithBlockingContext.withContextPingPong avgt 20 761.669 ± 41.371 ms/op
2121
*/
2222
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
23-
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
24-
@Fork(value = 2)
23+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
24+
@Fork(value = 1)
2525
@BenchmarkMode(Mode.AverageTime)
2626
@OutputTimeUnit(TimeUnit.MILLISECONDS)
2727
@State(Scope.Benchmark)

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/StatefulActorBenchmark.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import java.util.concurrent.*
3333
*/
3434
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
3535
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
36-
@Fork(value = 2)
36+
@Fork(value = 1)
3737
@BenchmarkMode(Mode.AverageTime)
3838
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3939
@State(Scope.Benchmark)

kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ internal class LockFreeTaskQueueCore<E : Any>(
175175
}
176176
// element == Placeholder can only be when add has not finished yet
177177
if (element is Placeholder) return null // consider it not added yet
178-
// now we tentative know element to remove -- check predicate
179178
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
180179
val newHead = (head + 1) and MAX_CAPACITY_MASK
181180
if (_state.compareAndSet(state, state.updateHead(newHead))) {

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 80 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import kotlin.random.*
2525
* ### Structural overview
2626
*
2727
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] lazily created threads
28-
* to execute blocking tasks. Every worker a has local queue in addition to a global scheduler queue and the global queue
28+
* to execute blocking tasks. Every worker has a local queue in addition to a global scheduler queue and the global queue
2929
* has priority over local queue to avoid starvation of externally-submitted (e.g. from Android UI thread) tasks.
3030
* Work-stealing is implemented on top of that queues to provide even load distribution and illusion of centralized run queue.
3131
*
@@ -245,7 +245,7 @@ internal class CoroutineScheduler(
245245
*/
246246
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
247247
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
248-
private val availableCpuPermits: Int inline get() = (controlState.value and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
248+
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
249249

250250
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
251251
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
@@ -261,14 +261,11 @@ internal class CoroutineScheduler(
261261
controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
262262
}
263263

264-
private inline fun tryAcquireCpuPermit(): Boolean {
265-
while (true) {
266-
val state = controlState.value
267-
val available = availableCpuPermits(state)
268-
if (available == 0) return false
269-
val update = state - (1L shl CPU_PERMITS_SHIFT)
270-
if (controlState.compareAndSet(state, update)) return true
271-
}
264+
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
265+
val available = availableCpuPermits(state)
266+
if (available == 0) return false
267+
val update = state - (1L shl CPU_PERMITS_SHIFT)
268+
if (controlState.compareAndSet(state, update)) return true
272269
}
273270

274271
private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
@@ -283,9 +280,12 @@ internal class CoroutineScheduler(
283280
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
284281

285282
// Worker termination states
286-
private const val FORBIDDEN = -1
287-
private const val ALLOWED = 0
283+
private const val TERMINATION_FORBIDDEN = -1
284+
private const val TERMINATION_ALLOWED = 0
288285
private const val TERMINATED = 1
286+
// Worker parking states
287+
private const val PARKING_FORBIDDEN = -1
288+
private const val PARKING_ALLOWED = 0
289289
private const val PARKED = 1
290290

291291
// Masks of control state
@@ -334,7 +334,7 @@ internal class CoroutineScheduler(
334334
globalCpuQueue.close()
335335
// Finish processing tasks from globalQueue and/or from this worker's local queue
336336
while (true) {
337-
val task = currentWorker?.findTask()
337+
val task = currentWorker?.findTask(true)
338338
?: globalCpuQueue.removeFirstOrNull()
339339
?: globalBlockingQueue.removeFirstOrNull()
340340
?: break
@@ -419,7 +419,7 @@ internal class CoroutineScheduler(
419419
private fun tryUnpark(): Boolean {
420420
while (true) {
421421
val worker = parkedWorkersStackPop() ?: return false
422-
if (!worker.parkingState.compareAndSet(ALLOWED, FORBIDDEN)) {
422+
if (!worker.parkingState.compareAndSet(PARKING_ALLOWED, PARKING_FORBIDDEN)) {
423423
LockSupport.unpark(worker)
424424
}
425425
if (worker.tryForbidTermination()) return true
@@ -469,10 +469,10 @@ internal class CoroutineScheduler(
469469
*/
470470
if (worker.state === WorkerState.TERMINATED) return task
471471
// Do not add CPU tasks in local queue if we are not able to execute it
472-
// TODO discuss: maybe add it to the local queue and offload back in the global queue iff permit wasn't acquired?
473472
if (task.mode == TaskMode.NON_BLOCKING && worker.isBlocking) {
474473
return task
475474
}
475+
worker.mayHaveLocalTasks = true
476476
return worker.localQueue.add(task, fair = fair)
477477
}
478478

@@ -525,7 +525,7 @@ internal class CoroutineScheduler(
525525
"CPU = $cpuWorkers, " +
526526
"blocking = $blockingWorkers, " +
527527
"parked = $parkedWorkers, " +
528-
"retired = $dormant, " +
528+
"dormant = $dormant, " +
529529
"terminated = $terminated}, " +
530530
"running workers queues = $queueSizes, "+
531531
"global CPU queue size = ${globalCpuQueue.size}, " +
@@ -581,16 +581,16 @@ internal class CoroutineScheduler(
581581
/**
582582
* Small state machine for termination.
583583
* Followed states are allowed:
584-
* [ALLOWED] -- worker can wake up and terminate itself
585-
* [FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
584+
* [TERMINATION_ALLOWED] -- worker can wake up and terminate itself
585+
* [TERMINATION_FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
586586
* [TERMINATED] -- final state, thread is terminating and cannot be resurrected
587587
*
588588
* Allowed transitions:
589-
* [ALLOWED] -> [FORBIDDEN]
590-
* [ALLOWED] -> [TERMINATED]
591-
* [FORBIDDEN] -> [ALLOWED]
589+
* [TERMINATION_ALLOWED] -> [TERMINATION_FORBIDDEN]
590+
* [TERMINATION_ALLOWED] -> [TERMINATED]
591+
* [TERMINATION_FORBIDDEN] -> [TERMINATION_ALLOWED]
592592
*/
593-
private val terminationState = atomic(ALLOWED)
593+
private val terminationState = atomic(TERMINATION_ALLOWED)
594594

595595
/**
596596
* It is set to the termination deadline when started doing [park] and it reset
@@ -610,22 +610,22 @@ internal class CoroutineScheduler(
610610
* The delay until at least one task in other worker queues will become stealable.
611611
*/
612612
private var minDelayUntilStealableTaskNs = 0L
613-
// ALLOWED | PARKED | FORBIDDEN
614-
val parkingState = atomic(ALLOWED)
613+
// PARKING_ALLOWED | PARKING_FORBIDDEN | PARKED
614+
val parkingState = atomic(PARKING_ALLOWED)
615615

616616
private var rngState = Random.nextInt()
617617
/**
618-
* Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
618+
* Tries to set [terminationState] to [TERMINATION_FORBIDDEN], returns `false` if this attempt fails.
619619
* This attempt may fail either because worker terminated itself or because someone else
620620
* claimed this worker (though this case is rare, because require very bad timings)
621621
*/
622622
fun tryForbidTermination(): Boolean =
623623
when (val state = terminationState.value) {
624624
TERMINATED -> false // already terminated
625-
FORBIDDEN -> false // already forbidden, someone else claimed this worker
626-
ALLOWED -> terminationState.compareAndSet(
627-
ALLOWED,
628-
FORBIDDEN
625+
TERMINATION_FORBIDDEN -> false // already forbidden, someone else claimed this worker
626+
TERMINATION_ALLOWED -> terminationState.compareAndSet(
627+
TERMINATION_ALLOWED,
628+
TERMINATION_FORBIDDEN
629629
)
630630
else -> error("Invalid terminationState = $state")
631631
}
@@ -658,61 +658,77 @@ internal class CoroutineScheduler(
658658
}
659659

660660
override fun run() = runWorker()
661+
@JvmField
662+
var mayHaveLocalTasks = false
661663

662664
private fun runWorker() {
663665
var rescanned = false
664666
while (!isTerminated && state != WorkerState.TERMINATED) {
665-
val task = findTask()
667+
val task = findTask(mayHaveLocalTasks)
666668
// Task found. Execute and repeat
667669
if (task != null) {
668670
rescanned = false
669671
minDelayUntilStealableTaskNs = 0L
670672
executeTask(task)
671673
continue
674+
} else {
675+
mayHaveLocalTasks = false
672676
}
673677
/*
674678
* No tasks were found:
675679
* 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
676680
* Then its deadline is stored in [minDelayUntilStealableTask]
677681
*
678682
* Then just park for that duration (ditto re-scanning).
679-
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
683+
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
680684
* excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
681685
* it with "spinning via scans" mechanism.
686+
* NB: this short potential parking does not interfere with `tryUnpark`
682687
*/
683688
if (minDelayUntilStealableTaskNs != 0L) {
684689
if (!rescanned) {
685690
rescanned = true
686-
continue
687691
} else {
692+
rescanned = false
688693
tryReleaseCpu(WorkerState.PARKING)
694+
interrupted()
689695
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
690696
minDelayUntilStealableTaskNs = 0L
691697
}
698+
continue
692699
}
693700
/*
694-
* 2) No tasks available, time to park and, potentially, shut down the thread.
695-
*
701+
* 2) Or no tasks available, time to park and, potentially, shut down the thread.
696702
* Add itself to the stack of parked workers, re-scans all the queues
697703
* to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
698704
*/
699-
parkingState.value = ALLOWED
700-
if (parkedWorkersStackPush(this)) {
701-
continue
702-
} else {
703-
assert { localQueue.size == 0 }
705+
tryPark()
706+
}
707+
tryReleaseCpu(WorkerState.TERMINATED)
708+
}
709+
710+
// Counterpart to "tryUnpark"
711+
private fun tryPark() {
712+
if (!inStack()) {
713+
parkingState.value = PARKING_ALLOWED
714+
}
715+
if (parkedWorkersStackPush(this)) {
716+
return
717+
} else {
718+
assert { localQueue.size == 0 }
719+
// Failed to get a parking permit => we are not in the stack
720+
while (inStack()) {
721+
if (isTerminated || state == WorkerState.TERMINATED) break
722+
if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
723+
return
724+
}
704725
tryReleaseCpu(WorkerState.PARKING)
705726
interrupted() // Cleanup interruptions
706-
while (inStack()) { // Prevent spurious wakeups
707-
if (isTerminated) break
708-
if (!parkingState.compareAndSet(ALLOWED, PARKED)) {
709-
break
710-
}
727+
if (inStack()) {
711728
park()
712729
}
713730
}
714731
}
715-
tryReleaseCpu(WorkerState.TERMINATED)
716732
}
717733

718734
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
@@ -763,7 +779,7 @@ internal class CoroutineScheduler(
763779
}
764780

765781
private fun park() {
766-
terminationState.value = ALLOWED
782+
terminationState.value = TERMINATION_ALLOWED
767783
// set termination deadline the first time we are here (it is reset in idleReset)
768784
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
769785
// actually park
@@ -789,7 +805,7 @@ internal class CoroutineScheduler(
789805
* See tryUnpark for state reasoning.
790806
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
791807
*/
792-
if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
808+
if (!terminationState.compareAndSet(TERMINATION_ALLOWED, TERMINATED)) return
793809
/*
794810
* At this point this thread is no longer considered as usable for scheduling.
795811
* We need multi-step choreography to reindex workers.
@@ -841,22 +857,30 @@ internal class CoroutineScheduler(
841857
}
842858
}
843859

844-
fun findTask(): Task? {
845-
if (tryAcquireCpuPermit()) return findAnyTask()
860+
fun findTask(scanLocalQueue: Boolean): Task? {
861+
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
846862
// If we can't acquire a CPU permit -- attempt to find blocking task
847-
val task = localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
863+
val task = if (scanLocalQueue) {
864+
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
865+
} else {
866+
globalBlockingQueue.removeFirstOrNull()
867+
}
848868
return task ?: trySteal(blockingOnly = true)
849869
}
850870

851-
private fun findAnyTask(): Task? {
871+
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
852872
/*
853873
* Anti-starvation mechanism: probabilistically poll either local
854874
* or global queue to ensure progress for both external and internal tasks.
855875
*/
856-
val globalFirst = nextInt(2 * corePoolSize) == 0
857-
if (globalFirst) pollGlobalQueues()?.let { return it }
858-
localQueue.poll()?.let { return it }
859-
if (!globalFirst) pollGlobalQueues()?.let { return it }
876+
if (scanLocalQueue) {
877+
val globalFirst = nextInt(2 * corePoolSize) == 0
878+
if (globalFirst) pollGlobalQueues()?.let { return it }
879+
localQueue.poll()?.let { return it }
880+
if (!globalFirst) pollGlobalQueues()?.let { return it }
881+
} else {
882+
pollGlobalQueues()?.let { return it }
883+
}
860884
return trySteal(blockingOnly = false)
861885
}
862886

@@ -880,7 +904,7 @@ internal class CoroutineScheduler(
880904

881905
var currentIndex = nextInt(created)
882906
var minDelay = Long.MAX_VALUE
883-
repeat(created) {
907+
repeat(workers.length()) {
884908
++currentIndex
885909
if (currentIndex > created) currentIndex = 1
886910
val worker = workers[currentIndex]

kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ internal val MAX_POOL_SIZE = systemProp(
4545

4646
@JvmField
4747
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
48-
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 100000L)
48+
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
4949
)
5050

5151
@JvmField

kotlinx-coroutines-core/jvm/test/scheduling/BlockingIOTerminationStressTest.kt renamed to kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.junit.*
99
import java.util.*
1010
import java.util.concurrent.*
1111

12-
class BlockingIOTerminationStressTest : TestBase() {
12+
class BlockingCoroutineDispatcherTerminationStressTest : TestBase() {
1313
private val baseDispatcher = ExperimentalCoroutineDispatcher(
1414
2, 20,
1515
TimeUnit.MILLISECONDS.toNanos(10)

0 commit comments

Comments
 (0)