Skip to content

Commit 18a9134

Browse files
committed
Improved tasks dispatching:
Merge blocking and created workers into single state variable Replace j.u.c. queue of parked workers with intrusive stack to support shrinking and increase performance Replace j.u.c. global queue with Michael & Scott queue which supports conditional polling Poll for blocking tasks from global queue when CPU token is not available Make last ditch effort before thread termination Idle reset on requestCpuWorker
1 parent b23729e commit 18a9134

File tree

12 files changed

+395
-173
lines changed

12 files changed

+395
-173
lines changed

benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,41 +55,41 @@ open class PingPongActorBenchmark : ParametrizedDispatcherBase() {
5555
me.receive()
5656
}
5757
}
58+
}
5859

59-
private fun pingActorCoroutine(context: CoroutineContext, pingChannel: SendChannel<Letter>,
60-
capacity: Int = 1) = actor<Letter>(context, capacity = capacity) {
61-
var initiator: SendChannel<Letter>? = null
62-
for (letter in channel) with(letter) {
63-
when (message) {
64-
is Start -> {
65-
initiator = sender
66-
pingChannel.send(Letter(Ball(0), channel))
67-
}
68-
is Ball -> {
69-
pingChannel.send(Letter(Ball(message.count + 1), channel))
70-
}
71-
is Stop -> {
72-
initiator!!.send(Letter(Stop(), channel))
73-
return@actor
74-
}
75-
else -> error("Cannot happen $message")
60+
fun pingActorCoroutine(context: CoroutineContext, pingChannel: SendChannel<PingPongActorBenchmark.Letter>,
61+
capacity: Int = 1) = actor<PingPongActorBenchmark.Letter>(context, capacity = capacity) {
62+
var initiator: SendChannel<PingPongActorBenchmark.Letter>? = null
63+
for (letter in channel) with(letter) {
64+
when (message) {
65+
is Start -> {
66+
initiator = sender
67+
pingChannel.send(PingPongActorBenchmark.Letter(Ball(0), channel))
7668
}
69+
is Ball -> {
70+
pingChannel.send(PingPongActorBenchmark.Letter(Ball(message.count + 1), channel))
71+
}
72+
is Stop -> {
73+
initiator!!.send(PingPongActorBenchmark.Letter(Stop(), channel))
74+
return@actor
75+
}
76+
else -> error("Cannot happen $message")
7777
}
7878
}
79+
}
7980

80-
private fun pongActorCoroutine(context: CoroutineContext, capacity: Int = 1) = actor<Letter>(context, capacity = capacity) {
81-
for (letter in channel) with(letter) {
82-
when (message) {
83-
is Ball -> {
84-
if (message.count >= N_MESSAGES) {
85-
sender.send(Letter(Stop(), channel))
86-
return@actor
87-
} else {
88-
sender.send(Letter(Ball(message.count + 1), channel))
89-
}
81+
fun pongActorCoroutine(context: CoroutineContext, capacity: Int = 1) = actor<PingPongActorBenchmark.Letter>(context, capacity = capacity) {
82+
for (letter in channel) with(letter) {
83+
when (message) {
84+
is Ball -> {
85+
if (message.count >= N_MESSAGES) {
86+
sender.send(PingPongActorBenchmark.Letter(Stop(), channel))
87+
return@actor
88+
} else {
89+
sender.send(PingPongActorBenchmark.Letter(Ball(message.count + 1), channel))
9090
}
91-
else -> error("Cannot happen $message")
9291
}
92+
else -> error("Cannot happen $message")
9393
}
9494
}
9595
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package benchmarks.actors
2+
3+
import kotlinx.coroutines.experimental.*
4+
import kotlinx.coroutines.experimental.channels.*
5+
import kotlinx.coroutines.experimental.scheduling.*
6+
import org.openjdk.jmh.annotations.*
7+
import java.util.concurrent.*
8+
import kotlin.coroutines.experimental.*
9+
10+
11+
/*
12+
* Benchmark Mode Cnt Score Error Units
13+
* PingPongWithBlockingContext.commonPoolWithContextPingPong avgt 20 972.662 ± 103.448 ms/op
14+
* PingPongWithBlockingContext.limitingDispatcherPingPong avgt 20 136.167 ± 4.971 ms/op
15+
* PingPongWithBlockingContext.withContextPingPong avgt 20 761.669 ± 41.371 ms/op
16+
*/
17+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
18+
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
19+
@Fork(value = 2)
20+
@BenchmarkMode(Mode.AverageTime)
21+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
22+
@State(Scope.Benchmark)
23+
open class PingPongWithBlockingContext {
24+
25+
private val experimental = ExperimentalCoroutineDispatcher(8)
26+
private val blocking = experimental.blocking(8)
27+
private val threadPool = newFixedThreadPoolContext(8, "PongCtx")
28+
29+
@TearDown
30+
fun tearDown() {
31+
threadPool.close()
32+
}
33+
34+
35+
@Benchmark
36+
fun limitingDispatcherPingPong() = runBlocking {
37+
runPingPongs(experimental, blocking)
38+
}
39+
40+
41+
@Benchmark
42+
fun withContextPingPong() = runBlocking {
43+
runPingPongs(experimental, threadPool)
44+
}
45+
46+
@Benchmark
47+
fun commonPoolWithContextPingPong() = runBlocking {
48+
runPingPongs(CommonPool, threadPool)
49+
}
50+
51+
private suspend fun runPingPongs(pingContext: CoroutineContext, pongContext: CoroutineContext) {
52+
val me = Channel<PingPongActorBenchmark.Letter>()
53+
val pong = pongActorCoroutine(pongContext)
54+
val ping = pingActorCoroutine(pingContext, pong)
55+
ping.send(PingPongActorBenchmark.Letter(Start(), me))
56+
57+
me.receive()
58+
}
59+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.coroutines.experimental.scheduling
22

33
import kotlinx.atomicfu.*
44
import kotlinx.coroutines.experimental.*
5-
import netscape.security.Privilege.*
65
import java.io.*
76
import java.util.*
87
import java.util.concurrent.*
@@ -52,7 +51,7 @@ internal class CoroutineScheduler(
5251
private val maxPoolSize: Int = corePoolSize * 128
5352
) : Closeable {
5453

55-
private val globalWorkQueue: GlobalQueue = ConcurrentLinkedQueue<Task>()
54+
private val globalWorkQueue: LockFreeQueue = LockFreeQueue()
5655

5756
/*
5857
* Permits to execute non-blocking (~CPU-intensive) tasks.
@@ -62,8 +61,41 @@ internal class CoroutineScheduler(
6261
*/
6362
private val cpuPermits = Semaphore(corePoolSize, false)
6463

65-
// TODO: Make queue intrusive with PoolWorker and [probably] replace with stack
66-
private val parkedWorkers = ConcurrentLinkedQueue<PoolWorker>()
64+
/*
65+
* The stack of parker workers (with an artificial object to make call-sites more understandable).
66+
* Every worker registers itself in a stack before parking (if it was not previously registered)
67+
* and callers of [requestCpuWorker] will try to unpark a thread from the top of a stack.
68+
* This is a form of intrusive garbage-free Treiber stack where PoolWorker also is a stack node.
69+
*
70+
* The stack is better than a queue (even with contention on top) because it unparks threads
71+
* in most-recently used order, improving both performance and locality.
72+
* Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
73+
* the latter half will never be unparked and will terminate itself after [BLOCKING_WORKER_KEEP_ALIVE_NS].
74+
*/
75+
@Suppress("ClassName")
76+
private object parkedWorkersStack
77+
private val head = atomic<PoolWorker?>(null)
78+
79+
@Suppress("unused")
80+
private fun parkedWorkersStack.push(next: PoolWorker) {
81+
head.loop { h ->
82+
next.nextParkedWorker = h
83+
if (head.compareAndSet(h, next)) return
84+
}
85+
}
86+
87+
@Suppress("unused")
88+
private fun parkedWorkersStack.pop(): PoolWorker? {
89+
// TODO investigate ABA possibility
90+
head.loop { h ->
91+
if (h == null) return null
92+
val next = h.nextParkedWorker
93+
if (head.compareAndSet(h, next)) {
94+
h.nextParkedWorker = null
95+
return h
96+
}
97+
}
98+
}
6799

68100
/**
69101
* State of worker threads
@@ -73,22 +105,21 @@ internal class CoroutineScheduler(
73105
*/
74106
private val workers: Array<PoolWorker?>
75107

76-
// TODO describe, asserts?
108+
/*
109+
* Long describing state of workers in this pool.
110+
* Currently includes created and blocking workers
111+
*/
77112
private val controlState = atomic(0L)
78113
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
79114
private val blockingWorkers: Int inline get() = (controlState.value and BLOCKING_MASK shr 21).toInt()
80-
private val idleWorkers: Int inline get() = (controlState.value and IDLE_MASK shr 42).toInt()
81115

82116
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
83117
private inline fun blockingWorkers(state: Long): Int = (state and BLOCKING_MASK shr 21).toInt()
84-
private inline fun idleWorkers(state: Long): Int = (state and IDLE_MASK shr 42).toInt()
85118

86119
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.addAndGet(1L))
87120
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.addAndGet(-1L))
88121
private inline fun incrementBlockingWorkers(): Int = blockingWorkers(controlState.addAndGet(1L shl 21))
89122
private inline fun decrementBlockingWorkers(): Int = blockingWorkers(controlState.addAndGet(-(1L shl 21)))
90-
private inline fun incrementIdleWorkers(): Int = idleWorkers(controlState.addAndGet(1L shl 42))
91-
private inline fun decrementIdleWorkers(): Int = idleWorkers(controlState.addAndGet(-(1L shl 42)))
92123

93124
private val random = Random()
94125
private val isTerminated = atomic(false)
@@ -106,7 +137,7 @@ internal class CoroutineScheduler(
106137
.coerceAtMost(MAX_PARK_TIME_NS)
107138

108139
@JvmStatic
109-
private val BLOCKING_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(2)
140+
private val BLOCKING_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(5)
110141

111142
// Local queue 'add' results
112143
private const val ADDED = -1
@@ -119,9 +150,9 @@ internal class CoroutineScheduler(
119150
private const val ALLOWED = 0
120151
private const val TERMINATED = 1
121152

153+
// Masks of control state
122154
private const val CREATED_MASK: Long = (1L shl 21) - 1
123155
private const val BLOCKING_MASK: Long = CREATED_MASK shl 21
124-
private const val IDLE_MASK: Long = BLOCKING_MASK shl 21
125156
}
126157

127158
init {
@@ -144,7 +175,7 @@ internal class CoroutineScheduler(
144175
}
145176

146177
/*
147-
* Closes current scheduler and waits until all threads will be stopped.
178+
* Closes current scheduler and waits until all threads are stopped.
148179
* This method uses unsafe API (unconditional unparks, ignoring interruptions etc.)
149180
* and intended to be used only for testing. Invocation has no additional effect if already closed.
150181
*/
@@ -162,7 +193,7 @@ internal class CoroutineScheduler(
162193
if (it.isAlive) {
163194
// Unparking alive thread is unsafe in general, but acceptable for testing purposes
164195
LockSupport.unpark(it)
165-
it.join(1_000)
196+
it.join(100)
166197
}
167198

168199
++finished
@@ -201,22 +232,24 @@ internal class CoroutineScheduler(
201232
private fun requestCpuWorker() {
202233
// No CPU available -- nothing to request
203234
if (cpuPermits.availablePermits() == 0) {
235+
tryUnpark()
204236
return
205237
}
206238

207239
/*
208-
* Fast path -- we have retired or parked worker, unpark it, and we're done.
240+
* Fast path -- we have retired or parked worker, unpark it and we're done.
209241
* The data race here: when only one permit is available, multiple retired workers
210242
* can be unparked, but only one will continue execution, so we're overproviding with threads
211243
* in case of race to avoid spurious starvation
212244
*/
213245
if (tryUnpark()) return
214246

215247
/*
248+
* Create a new thread.
216249
* It's not preferable to use 'cpuWorkersCounter' here (moreover, it's implicitly here as corePoolSize - cpuPermits.availableTokens),
217250
* cpuWorkersCounter doesn't take into account threads which are created (and either running or parked), but haven't
218251
* CPU token: retiring workers, recently unparked workers before `findTask` call, etc.
219-
* So if we will use cpuWorkersCounter, we start to overprovision too much.
252+
* So if we will use cpuWorkersCounter, we start to overprovide with threads too much.
220253
*/
221254
val state = controlState.value
222255
val created = createdWorkers(state)
@@ -233,7 +266,7 @@ internal class CoroutineScheduler(
233266

234267
private fun tryUnpark(): Boolean {
235268
while (true) {
236-
val worker = parkedWorkers.poll() ?: return false
269+
val worker = parkedWorkersStack.pop() ?: return false
237270
if (!worker.registeredInParkedQueue.value) {
238271
continue // Someone else succeeded
239272
} else if (!worker.registeredInParkedQueue.compareAndSet(true, false)) {
@@ -394,14 +427,14 @@ internal class CoroutineScheduler(
394427
}
395428
}
396429

397-
return "${super.toString()}[core pool size = ${workers.size}, " +
430+
return "${super.toString()}[core pool size = $corePoolSize, " +
398431
"CPU workers = $cpuWorkers, " +
399432
"blocking workers = $blockingWorkers, " +
400433
"parked workers = $parkedWorkers, " +
401434
"retired workers = $retired, " +
402435
"finished workers = $finished, " +
403-
"running workers queues = $queueSizes, " +
404-
"global queue size = ${globalWorkQueue.size}]"
436+
"running workers queues = $queueSizes, "+
437+
"global queue size = ${globalWorkQueue.size()}]"
405438
}
406439

407440
// todo: make name of the pool configurable (optional parameter to CoroutineScheduler) and base thread names on it
@@ -436,11 +469,12 @@ internal class CoroutineScheduler(
436469
val terminationState = atomic(ALLOWED)
437470

438471
/**
439-
* Whether worker was added to [parkedWorkers].
472+
* Whether worker was added to [parkedWorkersStack].
440473
* Worker registers itself in this queue once and will stay there until
441474
* someone will call [Queue.poll] which return it, then this flag is reset.
442475
*/
443476
val registeredInParkedQueue = atomic(false)
477+
var nextParkedWorker: PoolWorker? = null
444478

445479
/**
446480
* Tries to acquire CPU token if worker doesn't have one
@@ -591,7 +625,7 @@ internal class CoroutineScheduler(
591625
}
592626

593627
if (registeredInParkedQueue.compareAndSet(false, true)) {
594-
parkedWorkers.add(this)
628+
parkedWorkersStack.push(this)
595629
}
596630

597631
tryReleaseCpu(WorkerState.PARKING)
@@ -603,14 +637,14 @@ internal class CoroutineScheduler(
603637
private fun blockingWorkerIdle() {
604638
tryReleaseCpu(WorkerState.PARKING)
605639
if (registeredInParkedQueue.compareAndSet(false, true)) {
606-
parkedWorkers.add(this)
640+
parkedWorkersStack.push(this)
607641
}
608642

609643
terminationState.value = ALLOWED
610644
val time = System.nanoTime()
611645
LockSupport.parkNanos(BLOCKING_WORKER_KEEP_ALIVE_NS)
612646
// Protection against spurious wakeups of parkNanos
613-
if (System.nanoTime() - time > BLOCKING_WORKER_KEEP_ALIVE_NS) {
647+
if (System.nanoTime() - time >= BLOCKING_WORKER_KEEP_ALIVE_NS) {
614648
terminateWorker()
615649
}
616650
}
@@ -619,6 +653,14 @@ internal class CoroutineScheduler(
619653
* Stops execution of current thread and removes it from [createdWorkers]
620654
*/
621655
private fun terminateWorker() {
656+
// Last ditch polling: try to find blocking task before termination
657+
val task = globalWorkQueue.pollBlockingMode()
658+
if (task != null) {
659+
localQueue.add(task, globalWorkQueue)
660+
return
661+
}
662+
663+
622664
synchronized(workers) {
623665
// Someone else terminated, bail out
624666
if (createdWorkers <= corePoolSize) {
@@ -692,8 +734,15 @@ internal class CoroutineScheduler(
692734
return trySteal()
693735
}
694736

695-
// Retiring worker, only local queue polling is allowed
696-
return localQueue.poll()
737+
/*
738+
* If the local queue is empty, try to extract blocking task from global queue.
739+
* It's helpful for two reasons:
740+
* 1) We won't call excess park/unpark here and someone's else CPU token won't be transferred,
741+
* which is a performance win
742+
* 2) It helps with rare race when external submitter sends depending blocking tasks
743+
* one by one and one of the requested workers may miss CPU token
744+
*/
745+
return localQueue.poll() ?: globalWorkQueue.pollBlockingMode()
697746
}
698747

699748
private fun trySteal(): Task? {

0 commit comments

Comments
 (0)