Skip to content

Commit 94c587f

Browse files
committed
Adaptive spinning, parking and load balancing mechanism:
Spin and park adaptively to reduce CPU consumption Provide mechanism to work offloading when worker detects it's overloaded Implement hand-rolled thread-local xorshift random, which appears to be 15% faster than ThreadLocal<Random> (ThreadLocalRandom is unavailable on Java 1.6) Use queue size as metrics for work stealing in addition to task deadline
1 parent e03221a commit 94c587f

File tree

13 files changed

+431
-145
lines changed

13 files changed

+431
-145
lines changed

benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
package benchmarks
22

33
import kotlinx.coroutines.experimental.launch
4-
import kotlinx.coroutines.experimental.scheduling.ForkedMarker
54
import org.openjdk.jmh.annotations.*
65
import java.util.concurrent.CyclicBarrier
76
import java.util.concurrent.TimeUnit
87

98
/*
109
* Benchmark to measure scheduling overhead in comparison with FJP.
11-
* LaunchBenchmark.massiveLaunch experimental avgt 30 187.342 ± 20.244 us/op
10+
* LaunchBenchmark.massiveLaunch experimental avgt 30 328.662 ± 52.789 us/op
1211
* LaunchBenchmark.massiveLaunch fjp avgt 30 179.762 ± 3.931 us/op
1312
*/
1413
@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
15-
@Measurement(iterations = 100000, time = 1, timeUnit = TimeUnit.SECONDS)
16-
@Fork(value = 3, jvmArgsAppend = ["-XX:+PreserveFramePointer"])
14+
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
15+
@Fork(value = 2)
1716
@BenchmarkMode(Mode.AverageTime)
1817
@OutputTimeUnit(TimeUnit.MICROSECONDS)
1918
@State(Scope.Benchmark)
2019
open class LaunchBenchmark : ParametrizedDispatcherBase() {
2120

22-
@Param("experimental")
21+
@Param("experimental", "fjp")
2322
override var dispatcher: String = "fjp"
2423

2524
private val jobsToLaunch = 100
@@ -31,7 +30,7 @@ open class LaunchBenchmark : ParametrizedDispatcherBase() {
3130
@Benchmark
3231
fun massiveLaunch() {
3332
repeat(submitters) {
34-
launch(benchmarkContext + ForkedMarker) {
33+
launch(benchmarkContext) {
3534
// Wait until all cores are occupied
3635
allLaunched.await()
3736
allLaunched.reset()

benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ import kotlin.coroutines.experimental.CoroutineContext
1818
abstract class ParametrizedDispatcherBase {
1919

2020
abstract var dispatcher: String
21-
lateinit var benchmarkContext: CoroutineContext // coroutinesContext clashes with scope parameter
21+
lateinit var benchmarkContext: CoroutineContext // coroutineContext clashes with scope parameter
2222
var closeable: Closeable? = null
2323

2424
@Setup
2525
open fun setup() {
2626
benchmarkContext = when {
2727
dispatcher == "fjp" -> CommonPool
2828
dispatcher == "experimental" -> {
29-
ExperimentalCoroutineDispatcher(CORES_COUNT)
29+
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
3030
}
3131
dispatcher.startsWith("ftp") -> {
3232
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }

benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import kotlin.coroutines.experimental.CoroutineContext
1111

1212
/*
1313
* Benchmark (dispatcher) Mode Cnt Score Error Units
14+
* PingPongActorBenchmark.coresCountPingPongs experimental avgt 10 185.066 ± 21.692 ms/op
1415
* PingPongActorBenchmark.coresCountPingPongs fjp avgt 10 200.581 ± 22.669 ms/op
1516
* PingPongActorBenchmark.coresCountPingPongs ftp_1 avgt 10 494.334 ± 27.450 ms/op
1617
* PingPongActorBenchmark.coresCountPingPongs ftp_2 avgt 10 498.754 ± 27.743 ms/op
1718
* PingPongActorBenchmark.coresCountPingPongs ftp_8 avgt 10 804.498 ± 69.826 ms/op
1819
*
20+
* PingPongActorBenchmark.singlePingPong experimental avgt 10 45.521 ± 3.281 ms/op
1921
* PingPongActorBenchmark.singlePingPong fjp avgt 10 217.005 ± 18.693 ms/op
2022
* PingPongActorBenchmark.singlePingPong ftp_1 avgt 10 57.632 ± 1.835 ms/op
2123
* PingPongActorBenchmark.singlePingPong ftp_2 avgt 10 112.723 ± 5.280 ms/op
@@ -30,7 +32,7 @@ import kotlin.coroutines.experimental.CoroutineContext
3032
open class PingPongActorBenchmark : ParametrizedDispatcherBase() {
3133
data class Letter(val message: Any?, val sender: SendChannel<Letter>)
3234

33-
@Param("fjp", "ftp_1", "ftp_2", "ftp_8", "experimental")
35+
@Param("experimental")
3436
override var dispatcher: String = "fjp"
3537

3638
@Benchmark
Lines changed: 180 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package kotlinx.coroutines.experimental.scheduling
22

3+
import kotlinx.atomicfu.atomic
34
import kotlinx.coroutines.experimental.Runnable
45
import java.io.Closeable
56
import java.util.*
67
import java.util.concurrent.ConcurrentLinkedQueue
78
import java.util.concurrent.Executor
9+
import java.util.concurrent.TimeUnit
810
import java.util.concurrent.locks.LockSupport
911

1012
/**
@@ -13,9 +15,27 @@ import java.util.concurrent.locks.LockSupport
1315
class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable {
1416

1517
private val workers: Array<PoolWorker>
16-
private val globalWorkQueue: Queue<Task> = ConcurrentLinkedQueue<Task>()
18+
private val globalWorkQueue = ConcurrentLinkedQueue<Task>()
19+
private val parkedWorkers = atomic(0)
20+
private val random = Random()
21+
1722
@Volatile
18-
private var isClosed = false
23+
private var isTerminated = false
24+
25+
companion object {
26+
private const val STEAL_ATTEMPTS = 4
27+
private const val MAX_SPINS = 1000L
28+
private const val MAX_YIELDS = 500L
29+
@JvmStatic
30+
private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1)
31+
@JvmStatic
32+
private val MIN_PARK_TIME_NS = (WORK_STEALING_TIME_RESOLUTION_NS / 4).coerceAtLeast(10).coerceAtMost(MAX_PARK_TIME_NS)
33+
34+
// Local queue 'offer' results
35+
private const val ADDED = -1
36+
private const val ADDED_WITH_OFFLOADING = 0 // Added to the local queue, but pool requires additional worker to keep up
37+
private const val NOT_ADDED = 1
38+
}
1939

2040
init {
2141
require(corePoolSize >= 1, { "Expected positive core pool size, but was $corePoolSize" })
@@ -26,41 +46,111 @@ class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable {
2646
override fun execute(command: Runnable) = dispatch(command)
2747

2848
override fun close() {
29-
isClosed = true
49+
isTerminated = true
3050
}
3151

32-
fun dispatch(command: Runnable, intensive: Boolean = false) {
33-
val task = TimedTask(System.nanoTime(), command)
34-
if (!submitToLocalQueue(task, intensive)) {
52+
fun dispatch(command: Runnable) {
53+
val task = TimedTask(schedulerTimeSource.nanoTime(), command)
54+
55+
val offerResult = submitToLocalQueue(task)
56+
if (offerResult == ADDED) {
57+
return
58+
}
59+
60+
if (offerResult == NOT_ADDED) {
3561
globalWorkQueue.add(task)
3662
}
63+
64+
unparkIdleWorker()
3765
}
3866

39-
private fun submitToLocalQueue(task: Task, intensive: Boolean): Boolean {
40-
val worker = Thread.currentThread() as? PoolWorker ?: return false
41-
if (intensive && worker.localQueue.bufferSize > FORKED_TASK_OFFLOAD_THRESHOLD) return false
42-
worker.localQueue.offer(task, globalWorkQueue)
43-
return true
67+
private fun unparkIdleWorker() {
68+
// If no threads are parked don't try to wake anyone
69+
val parked = parkedWorkers.value
70+
if (parked == 0) {
71+
return
72+
}
73+
74+
// Try to wake one worker
75+
repeat(STEAL_ATTEMPTS) {
76+
val victim = workers[random.nextInt(workers.size)]
77+
if (victim.isParking) {
78+
/*
79+
* Benign data race, victim can wake up after this check, but before 'unpark' call succeeds,
80+
* making first 'park' in next idle period a no-op
81+
*/
82+
LockSupport.unpark(victim)
83+
return
84+
}
85+
}
86+
}
87+
88+
89+
private fun submitToLocalQueue(task: Task): Int {
90+
val worker = Thread.currentThread() as? PoolWorker ?: return NOT_ADDED
91+
if (worker.localQueue.offer(task, globalWorkQueue)) {
92+
// We're close to queue capacity, wakeup anyone to steal
93+
if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD) {
94+
return ADDED_WITH_OFFLOADING
95+
}
96+
97+
return ADDED
98+
}
99+
100+
return ADDED_WITH_OFFLOADING
101+
}
102+
103+
/**
104+
* Returns a string identifying state of this scheduler for nicer debugging
105+
*/
106+
override fun toString(): String {
107+
var parkedWorkers = 0
108+
val queueSizes = arrayListOf<Int>()
109+
for (worker in workers) {
110+
if (worker.isParking) {
111+
++parkedWorkers
112+
} else {
113+
queueSizes += worker.localQueue.bufferSize
114+
}
115+
}
116+
117+
return "${super.toString()}[core pool size = ${workers.size}, " +
118+
"parked workers = $parkedWorkers, " +
119+
"active workers buffer sizes = $queueSizes, " +
120+
"global queue size = ${globalWorkQueue.size}]"
44121
}
45122

46-
private inner class PoolWorker(index: Int) : Thread("CoroutinesScheduler-worker-$index") {
123+
124+
internal inner class PoolWorker(index: Int) : Thread("CoroutinesScheduler-worker-$index") {
47125
init {
48126
isDaemon = true
49127
}
50128

51129
val localQueue: WorkQueue = WorkQueue()
130+
/**
131+
* Time of last call to [unparkIdleWorker] due to missing tasks deadlines.
132+
* Used as throttling mechanism to avoid unparking multiple threads when it's not really necessary.
133+
*/
134+
private var lastExhaustionTime = 0L
52135

53136
@Volatile
54-
var yields = 0
137+
var isParking = false
138+
@Volatile
139+
private var spins = 0L
140+
private var yields = 0L
141+
private var parkTimeNs = MIN_PARK_TIME_NS
142+
private var rngState = random.nextInt()
55143

56144
override fun run() {
57-
while (!isClosed) {
145+
while (!isTerminated) {
58146
try {
59147
val job = findTask()
60148
if (job == null) {
61-
awaitWork()
149+
// Wait for a job with potential park
150+
idle()
62151
} else {
63-
yields = 0
152+
idleReset()
153+
checkExhaustion(job)
64154
job.task.run()
65155
}
66156
} catch (e: Throwable) {
@@ -69,15 +159,77 @@ class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable {
69159
}
70160
}
71161

72-
private fun awaitWork() {
73-
// Temporary solution
74-
if (++yields > 100000) {
75-
LockSupport.parkNanos(WORK_STEALING_TIME_RESOLUTION / 2)
162+
private fun checkExhaustion(job: Task) {
163+
val parked = parkedWorkers.value
164+
if (parked == 0) {
165+
return
166+
}
167+
168+
// Check last exhaustion time to avoid the race between steal and next task execution
169+
val now = schedulerTimeSource.nanoTime()
170+
if (now - job.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS && now - lastExhaustionTime >= WORK_STEALING_TIME_RESOLUTION_NS * 5) {
171+
lastExhaustionTime = now
172+
unparkIdleWorker()
173+
}
174+
}
175+
176+
/*
177+
* Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
178+
* ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on ktor benchmarks
179+
*/
180+
internal fun nextInt(upperBound: Int): Int {
181+
rngState = rngState xor (rngState shl 13)
182+
rngState = rngState xor (rngState shr 17)
183+
rngState = rngState xor (rngState shl 5)
184+
val mask = upperBound - 1
185+
// Fast path for power of two bound
186+
if (mask and upperBound == 0) {
187+
return rngState and mask
76188
}
189+
190+
return (rngState and Int.MAX_VALUE) % upperBound
191+
}
192+
193+
private fun idle() {
194+
/*
195+
* Simple adaptive await of work:
196+
* Spin on the volatile field with an empty loop in hope that new work will arrive,
197+
* then start yielding to reduce CPU pressure, and finally start adaptive parking.
198+
*
199+
* The main idea is not to park while it's possible (otherwise throughput on asymmetric workloads suffers due to too frequent
200+
* park/unpark calls and delays between job submission and thread queue checking)
201+
*/
202+
when {
203+
spins < MAX_SPINS -> ++spins
204+
++yields <= MAX_YIELDS -> Thread.yield()
205+
else -> {
206+
if (!isParking) {
207+
isParking = true
208+
parkedWorkers.incrementAndGet()
209+
}
210+
211+
if (parkTimeNs < MAX_PARK_TIME_NS) {
212+
parkTimeNs = (parkTimeNs * 1.5).toLong().coerceAtMost(MAX_PARK_TIME_NS)
213+
}
214+
215+
LockSupport.parkNanos(parkTimeNs)
216+
}
217+
}
218+
}
219+
220+
private fun idleReset() {
221+
if (isParking) {
222+
isParking = false
223+
parkTimeNs = MIN_PARK_TIME_NS
224+
parkedWorkers.decrementAndGet()
225+
}
226+
227+
spins = 0
228+
yields = 0
77229
}
78230

79231
private fun findTask(): Task? {
80-
// TODO explain, probabilistic check with park counter?
232+
// TODO probabilistic check if thread is not idle?
81233
var task: Task? = globalWorkQueue.poll()
82234
if (task != null) return task
83235

@@ -92,16 +244,17 @@ class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable {
92244
return null
93245
}
94246

95-
while (true) {
96-
val worker = workers[RANDOM_PROVIDER().nextInt(workers.size)]
247+
// Probe a couple of workers
248+
repeat(STEAL_ATTEMPTS) {
249+
val worker = workers[nextInt(workers.size)]
97250
if (worker !== this) {
98-
worker.localQueue.offloadWork(true) {
99-
localQueue.offer(it, globalWorkQueue)
251+
if (localQueue.trySteal(worker.localQueue, globalWorkQueue)) {
252+
return@repeat
100253
}
101-
102-
return localQueue.poll()
103254
}
104255
}
256+
257+
return localQueue.poll()
105258
}
106259
}
107260
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,23 @@ package kotlinx.coroutines.experimental.scheduling
33
import kotlinx.coroutines.experimental.*
44
import java.io.Closeable
55
import java.util.concurrent.TimeUnit
6-
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
76
import kotlin.coroutines.experimental.CoroutineContext
87

9-
/**
10-
* Unstable API and subject to change.
11-
* Context marker which gives scheduler a hint that submitted jobs can be distributed among cores aggressively.
12-
* Usually it's useful for massive jobs submission produced by single coroutine, e.g. data intensive fork-join tasks
13-
* or fan-out notifications for a large number of listeners.
14-
*/
15-
object ForkedMarker : AbstractCoroutineContextElement(ForkedKey)
16-
17-
private object ForkedKey : CoroutineContext.Key<ForkedMarker>
188

199
class ExperimentalCoroutineDispatcher(threads: Int = Runtime.getRuntime().availableProcessors()) : CoroutineDispatcher(), Delay, Closeable {
2010

2111
private val coroutineScheduler = CoroutineScheduler(threads)
2212

2313
override fun dispatch(context: CoroutineContext, block: Runnable) {
24-
coroutineScheduler.dispatch(block, context[ForkedKey] != null)
14+
coroutineScheduler.dispatch(block)
2515
}
2616

2717
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
2818
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
2919

3020
override fun close() = coroutineScheduler.close()
21+
override fun toString(): String {
22+
return "${super.toString()}[scheduler = $coroutineScheduler]"
23+
}
24+
3125
}

0 commit comments

Comments
 (0)