Skip to content

Commit bba6e89

Browse files
elizarovqwwdfsad
authored andcommitted
Scheduler cosmetics
Stealing logic cleaned up a bit (mostly style) Update worker thread names when worker threads are terminated to maintain consistency between their index in array and name; Improved debugging info (toString) Merged yield and spin counters, parkTimeNs is in Int
1 parent 1d3b019 commit bba6e89

File tree

7 files changed

+156
-128
lines changed

7 files changed

+156
-128
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlinx.coroutines.experimental.internal.*
1920
import kotlinx.coroutines.experimental.timeunit.TimeUnit
2021
import java.util.concurrent.*
2122
import java.util.concurrent.atomic.*
@@ -34,7 +35,6 @@ import kotlin.coroutines.experimental.*
3435
* privileged actions.
3536
*/
3637
object CommonPool : CoroutineDispatcher() {
37-
3838
/**
3939
* Name of the property that controls default parallelism level of [CommonPool].
4040
* If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM).
@@ -43,18 +43,8 @@ object CommonPool : CoroutineDispatcher() {
4343
*/
4444
public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
4545

46-
private val parallelism = run<Int> {
47-
val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) }
48-
if (property == null) {
49-
(Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
50-
} else {
51-
val parallelism = property.toIntOrNull()
52-
if (parallelism == null || parallelism < 1) {
53-
error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property")
54-
}
55-
parallelism
56-
}
57-
}
46+
private val parallelism = systemProp(DEFAULT_PARALLELISM_PROPERTY_NAME,
47+
(AVAILABLE_PROCESSORS - 1).coerceAtLeast(1))
5848

5949
// For debug and tests
6050
private var usePrivatePool = false

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import sun.text.normalizer.UTF16.*
19+
import kotlinx.coroutines.experimental.internal.*
2020
import java.util.concurrent.atomic.*
2121
import kotlin.coroutines.experimental.*
2222

@@ -40,9 +40,7 @@ public const val DEBUG_PROPERTY_VALUE_ON = "on"
4040
*/
4141
public const val DEBUG_PROPERTY_VALUE_OFF = "off"
4242

43-
internal val DEBUG = run {
44-
val value = try { System.getProperty(DEBUG_PROPERTY_NAME) }
45-
catch (e: SecurityException) { null }
43+
internal val DEBUG = systemProp(DEBUG_PROPERTY_NAME).let { value ->
4644
when (value) {
4745
DEBUG_PROPERTY_VALUE_AUTO, null -> CoroutineId::class.java.desiredAssertionStatus()
4846
DEBUG_PROPERTY_VALUE_ON, "" -> true
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental.internal
6+
7+
// number of processors at startup for consistent prop initialization
8+
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
9+
10+
internal fun systemProp(
11+
propertyName: String
12+
): String? =
13+
try {
14+
System.getProperty(propertyName)
15+
} catch (e: SecurityException) {
16+
null
17+
}
18+
19+
internal fun systemProp(
20+
propertyName: String,
21+
defaultValue: Int,
22+
minValue: Int = 1,
23+
maxValue: Int = Int.MAX_VALUE
24+
): Int
25+
= systemProp(propertyName, defaultValue.toLong(), minValue.toLong(), maxValue.toLong()).toInt()
26+
27+
internal fun systemProp(
28+
propertyName: String,
29+
defaultValue: Long,
30+
minValue: Long = 1,
31+
maxValue: Long = Long.MAX_VALUE
32+
): Long {
33+
val value = systemProp(propertyName) ?: return defaultValue
34+
val parsed = value.toLongOrNull()
35+
?: error("System property '$propertyName' has unrecognized value '$value'")
36+
if (parsed !in minValue..maxValue) {
37+
error("System property '$propertyName' should be in range $minValue..$maxValue, but is '$parsed'")
38+
}
39+
return parsed
40+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,14 @@ import java.util.concurrent.locks.*
5454
*/
5555
@Suppress("NOTHING_TO_INLINE")
5656
internal class CoroutineScheduler(
57+
private val schedulerName: String,
5758
private val corePoolSize: Int,
5859
private val maxPoolSize: Int
5960
) : Closeable {
61+
constructor(
62+
corePoolSize: Int,
63+
maxPoolSize: Int
64+
) : this("CoroutineScheduler", corePoolSize, maxPoolSize)
6065

6166
private val globalQueue: GlobalQueue = GlobalQueue()
6267

@@ -161,16 +166,16 @@ internal class CoroutineScheduler(
161166
private val isTerminated = atomic(false)
162167

163168
companion object {
164-
private const val MAX_SPINS = 1000L
165-
private const val MAX_YIELDS = 500L
166-
167-
@JvmStatic
168-
private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1)
169+
private const val MAX_SPINS = 1000
170+
private const val MAX_YIELDS = MAX_SPINS + 500
171+
172+
@JvmStatic // Note, that is fits into Int (it is is equal to 10^9)
173+
private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt()
169174

170175
@JvmStatic
171176
private val MIN_PARK_TIME_NS = (WORK_STEALING_TIME_RESOLUTION_NS / 4)
172177
.coerceAtLeast(10)
173-
.coerceAtMost(MAX_PARK_TIME_NS)
178+
.coerceAtMost(MAX_PARK_TIME_NS.toLong()).toInt()
174179

175180
// A symbol to mark workers that are not in parkedWorkersStack
176181
private val NOT_IN_STACK = Symbol("NOT_IN_STACK")
@@ -417,14 +422,10 @@ internal class CoroutineScheduler(
417422
var blockingWorkers = 0
418423
var cpuWorkers = 0
419424
var retired = 0
420-
var finished = 0
421-
425+
var terminated = 0
422426
val queueSizes = arrayListOf<String>()
423427
for (worker in workers) {
424-
if (worker == null) {
425-
continue
426-
}
427-
428+
if (worker == null) continue
428429
val queueSize = worker.localQueue.size()
429430
when (worker.state) {
430431
WorkerState.PARKING -> ++parkedWorkers
@@ -440,29 +441,44 @@ internal class CoroutineScheduler(
440441
++retired
441442
if (queueSize > 0) queueSizes += queueSize.toString() + "r" // Retiring
442443
}
443-
WorkerState.FINISHED -> ++finished
444+
WorkerState.TERMINATED -> ++terminated
444445
}
445446
}
446-
447-
return "${super.toString()}[core pool size = $corePoolSize, " +
448-
"CPU workers = $cpuWorkers, " +
449-
"blocking workers = $blockingWorkers, " +
450-
"parked workers = $parkedWorkers, " +
451-
"retired workers = $retired, " +
452-
"finished workers = $finished, " +
447+
val state = controlState.value
448+
return "$schedulerName@$hexAddress[" +
449+
"Pool Size {" +
450+
"core = $corePoolSize, " +
451+
"max = $maxPoolSize}, " +
452+
"Worker States {" +
453+
"CPU = $cpuWorkers, " +
454+
"blocking = $blockingWorkers, " +
455+
"parked = $parkedWorkers, " +
456+
"retired = $retired, " +
457+
"terminated = $terminated}, " +
453458
"running workers queues = $queueSizes, "+
454-
"global queue size = ${globalQueue.size}], " +
455-
"control state: ${controlState.value}"
459+
"global queue size = ${globalQueue.size}, " +
460+
"Control State Workers {" +
461+
"created = ${createdWorkers(state)}, " +
462+
"blocking = ${blockingWorkers(state)}}" +
463+
"]"
456464
}
457465

458-
// todo: make name of the pool configurable (optional parameter to CoroutineScheduler) and base thread names on it
459-
internal inner class Worker(sequenceNumber: Int) : Thread("CoroutineScheduler-worker-$sequenceNumber") {
466+
internal inner class Worker private constructor() : Thread() {
460467
init {
461468
isDaemon = true
462469
}
463470

464471
// guarded by scheduler lock
465-
private var indexInArray = sequenceNumber
472+
private var indexInArray = -1
473+
set(index) {
474+
name = "$schedulerName-worker-${if (index < 0) "TERMINATED" else index.toString()}"
475+
field = index
476+
}
477+
478+
constructor(index: Int) : this() {
479+
indexInArray = index
480+
}
481+
466482
val localQueue: WorkQueue = WorkQueue()
467483

468484
/**
@@ -552,15 +568,16 @@ internal class CoroutineScheduler(
552568
private var lastExhaustionTime = 0L
553569

554570
@Volatile // Required for concurrent idleResetBeforeUnpark
555-
private var spins = 0L
556-
private var yields = 0L // TODO replace with IntPair when inline classes arrive
571+
private var spins = 0 // spins until MAX_SPINS, then yields until MAX_YIELDS
557572

573+
// Note: it is concurrently reset by idleResetBeforeUnpark
558574
private var parkTimeNs = MIN_PARK_TIME_NS
575+
559576
private var rngState = random.nextInt()
560577

561578
override fun run() {
562579
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
563-
while (!isTerminated.value && state != WorkerState.FINISHED) {
580+
while (!isTerminated.value && state != WorkerState.TERMINATED) {
564581
val task = findTask()
565582
if (task == null) {
566583
// Wait for a job with potential park
@@ -581,7 +598,7 @@ internal class CoroutineScheduler(
581598
}
582599
}
583600

584-
tryReleaseCpu(WorkerState.FINISHED)
601+
tryReleaseCpu(WorkerState.TERMINATED)
585602
}
586603

587604
private fun runSafely(block: Runnable) {
@@ -655,20 +672,16 @@ internal class CoroutineScheduler(
655672
* The main idea is not to park while it's possible (otherwise throughput on asymmetric workloads suffers due to too frequent
656673
* park/unpark calls and delays between job submission and thread queue checking)
657674
*/
658-
when {
659-
// Volatile read spins, shall be read first
660-
spins < MAX_SPINS -> ++spins
661-
yields <= MAX_YIELDS -> {
662-
++yields
663-
yield()
664-
}
665-
else -> {
666-
if (parkTimeNs < MAX_PARK_TIME_NS) {
667-
parkTimeNs = (parkTimeNs * 3 shr 1).coerceAtMost(MAX_PARK_TIME_NS)
668-
}
669-
tryReleaseCpu(WorkerState.PARKING)
670-
doPark(parkTimeNs)
675+
val spins = this.spins // volatile read
676+
if (spins <= MAX_YIELDS) {
677+
this.spins = spins + 1 // volatile write
678+
if (spins >= MAX_SPINS) yield()
679+
} else {
680+
if (parkTimeNs < MAX_PARK_TIME_NS) {
681+
parkTimeNs = (parkTimeNs * 3 ushr 1).coerceAtMost(MAX_PARK_TIME_NS)
671682
}
683+
tryReleaseCpu(WorkerState.PARKING)
684+
doPark(parkTimeNs.toLong())
672685
}
673686
}
674687

@@ -712,12 +725,14 @@ internal class CoroutineScheduler(
712725
* Now move last worker into an index in array that was previously occupied by this worker.
713726
*/
714727
val lastWorkerIndex = decrementCreatedWorkers()
715-
val worker = workers[lastWorkerIndex]!!
716-
workers[indexInArray] = worker
717-
worker.indexInArray = indexInArray
728+
val lastWorker = workers[lastWorkerIndex]!!
729+
workers[indexInArray] = lastWorker
730+
lastWorker.indexInArray = indexInArray
718731
workers[lastWorkerIndex] = null
732+
// Cleanup index of this worker for debugging purposes
733+
indexInArray = -1
719734
}
720-
state = WorkerState.FINISHED
735+
state = WorkerState.TERMINATED
721736
}
722737

723738
/**
@@ -741,14 +756,12 @@ internal class CoroutineScheduler(
741756
state = WorkerState.BLOCKING
742757
parkTimeNs = MIN_PARK_TIME_NS
743758
}
744-
yields = 0
745759
spins = 0
746760
}
747761

748762
// It is invoked by other thread before this worker is unparked
749763
fun idleResetBeforeUnpark() {
750764
parkTimeNs = MIN_PARK_TIME_NS
751-
yields = 0
752765
spins = 0 // Volatile write, should be written last
753766
}
754767

@@ -816,6 +829,6 @@ internal class CoroutineScheduler(
816829
/**
817830
* Terminal state, will no longer be used
818831
*/
819-
FINISHED
832+
TERMINATED
820833
}
821834
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ class ExperimentalCoroutineDispatcher(
1414
corePoolSize: Int = CORE_POOL_SIZE,
1515
maxPoolSize: Int = MAX_POOL_SIZE
1616
) : CoroutineDispatcher(), Delay, Closeable {
17-
1817
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
1918

2019
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
@@ -45,7 +44,11 @@ class ExperimentalCoroutineDispatcher(
4544
internal fun dispatchBlocking(block: Runnable, context: TaskMode, fair: Boolean): Unit = coroutineScheduler.dispatch(block, context, fair)
4645
}
4746

48-
private class LimitingBlockingDispatcher(val parallelism: Int, val taskContext: TaskMode, val dispatcher: ExperimentalCoroutineDispatcher) : CoroutineDispatcher(), Delay {
47+
private class LimitingBlockingDispatcher(
48+
val parallelism: Int,
49+
val taskContext: TaskMode,
50+
val dispatcher: ExperimentalCoroutineDispatcher
51+
) : CoroutineDispatcher(), Delay {
4952

5053
private val queue = ConcurrentLinkedQueue<Runnable>()
5154
private val inFlightTasks = atomic(0)
@@ -145,5 +148,6 @@ private class LimitingBlockingDispatcher(val parallelism: Int, val taskContext:
145148
}
146149
}
147150

148-
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) = dispatcher.scheduleResumeAfterDelay(time, unit, continuation)
151+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
152+
dispatcher.scheduleResumeAfterDelay(time, unit, continuation)
149153
}

0 commit comments

Comments
 (0)