Skip to content

Commit b23729e

Browse files
committed
Introduce support for blocking tasks and dynamic scheduler sizing
API to mark tasks as blocking to give a scheduler a hint that more threads should be provided to fully utilize CPU Concept of CPU token: only workers with CPU token can execute external CPU-bound tasks and adaptively spin. Workers without token process only their local queue and queue themselves to termination Write design rationale Probabilistic poll global queue to avoid queue starvation Workers termination protocol to dynamically shrink pool
1 parent 94c587f commit b23729e

File tree

15 files changed

+1653
-284
lines changed

15 files changed

+1653
-284
lines changed

benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package benchmarks.actors
22

3-
import benchmarks.ParametrizedDispatcherBase
4-
import kotlinx.coroutines.experimental.channels.Channel
5-
import kotlinx.coroutines.experimental.channels.SendChannel
6-
import kotlinx.coroutines.experimental.channels.actor
7-
import kotlinx.coroutines.experimental.runBlocking
3+
import benchmarks.*
4+
import kotlinx.coroutines.experimental.*
5+
import kotlinx.coroutines.experimental.channels.*
86
import org.openjdk.jmh.annotations.*
9-
import java.util.concurrent.TimeUnit
10-
import kotlin.coroutines.experimental.CoroutineContext
7+
import java.util.concurrent.*
8+
import kotlin.coroutines.experimental.*
119

1210
/*
1311
* Benchmark (dispatcher) Mode Cnt Score Error Units
@@ -32,7 +30,7 @@ import kotlin.coroutines.experimental.CoroutineContext
3230
open class PingPongActorBenchmark : ParametrizedDispatcherBase() {
3331
data class Letter(val message: Any?, val sender: SendChannel<Letter>)
3432

35-
@Param("experimental")
33+
@Param("experimental", "fjp", "ftp_1", "ftp_8")
3634
override var dispatcher: String = "fjp"
3735

3836
@Benchmark

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 590 additions & 114 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,143 @@
11
package kotlinx.coroutines.experimental.scheduling
22

3+
import kotlinx.atomicfu.*
34
import kotlinx.coroutines.experimental.*
4-
import java.io.Closeable
5-
import java.util.concurrent.TimeUnit
6-
import kotlin.coroutines.experimental.CoroutineContext
5+
import java.io.*
6+
import java.util.concurrent.*
7+
import kotlin.coroutines.experimental.*
78

9+
class ExperimentalCoroutineDispatcher(corePoolSize: Int = Runtime.getRuntime().availableProcessors(), maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
810

9-
class ExperimentalCoroutineDispatcher(threads: Int = Runtime.getRuntime().availableProcessors()) : CoroutineDispatcher(), Delay, Closeable {
11+
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
1012

11-
private val coroutineScheduler = CoroutineScheduler(threads)
13+
/**
14+
* TODO: yield doesn't work as expected
15+
*/
16+
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
1217

13-
override fun dispatch(context: CoroutineContext, block: Runnable) {
14-
coroutineScheduler.dispatch(block)
15-
}
16-
17-
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
18+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
1819
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
1920

2021
override fun close() = coroutineScheduler.close()
22+
2123
override fun toString(): String {
2224
return "${super.toString()}[scheduler = $coroutineScheduler]"
2325
}
2426

27+
/**
28+
* Creates new coroutine execution context with limited parallelism to execute tasks which may potentially block.
29+
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and piggybacks on the original [ExperimentalCoroutineDispatcher],
30+
* executing tasks in this context, giving original dispatcher hint to adjust its behaviour.
31+
*
32+
* @param parallelism parallelism level, indicating how many threads can execute tasks in given context in parallel.
33+
*/
34+
fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
35+
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
36+
return LimitingBlockingDispatcher(parallelism, TaskMode.PROBABLY_BLOCKING, this)
37+
}
38+
39+
internal fun dispatchBlocking(block: Runnable, context: TaskMode, fair: Boolean): Unit = coroutineScheduler.dispatch(block, context, fair)
40+
}
41+
42+
private class LimitingBlockingDispatcher(val parallelism: Int, val taskContext: TaskMode, val dispatcher: ExperimentalCoroutineDispatcher) : CoroutineDispatcher(), Delay {
43+
44+
private val queue = ConcurrentLinkedQueue<Runnable>()
45+
private val inFlightTasks = atomic(0)
46+
47+
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
48+
49+
private fun dispatch(block: Runnable, fair: Boolean) {
50+
var taskToSchedule = wrap(block)
51+
while (true) {
52+
// Commit in-flight tasks slot
53+
val inFlight = inFlightTasks.incrementAndGet()
54+
55+
// Fast path, if parallelism limit is not reached, dispatch task and return
56+
if (inFlight <= parallelism) {
57+
dispatcher.dispatchBlocking(taskToSchedule, taskContext, fair)
58+
return
59+
}
60+
61+
// Parallelism limit is reached, add task to the queue
62+
queue.add(taskToSchedule)
63+
64+
/*
65+
* We're not actually scheduled anything, so rollback committed in-flight task slot:
66+
* If the amount of in-flight tasks is still above the limit, do nothing
67+
* If the amount of in-flight tasks is lesser than parallelism, then
68+
* it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue
69+
* to avoid starvation.
70+
*
71+
* Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
72+
*
73+
* T1: submit task, start execution, R == 1
74+
* T2: commit slot for next task, R == 2
75+
* T1: finish T1, R == 1
76+
* T2: submit next task to local queue, decrement R, R == 0
77+
* Without retries, task from T2 will be stuck in the local queue
78+
*/
79+
if (inFlightTasks.decrementAndGet() >= parallelism) {
80+
return
81+
}
82+
83+
taskToSchedule = queue.poll() ?: return
84+
}
85+
}
86+
87+
override fun toString(): String {
88+
return "${super.toString()}[dispatcher = $dispatcher]"
89+
}
90+
91+
private fun wrap(block: Runnable): Runnable {
92+
return block as? WrappedTask ?: WrappedTask(block)
93+
}
94+
95+
/**
96+
* Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
97+
*
98+
* Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid
99+
* non-blocking continuations starvation.
100+
* E.g. for
101+
* ```
102+
* foo()
103+
* blocking()
104+
* bar()
105+
* ```
106+
* it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
107+
*/
108+
private fun afterTask() {
109+
var next = queue.poll()
110+
// If we have pending tasks in current blocking context, dispatch first
111+
if (next != null) {
112+
dispatcher.dispatchBlocking(next, taskContext, true)
113+
return
114+
}
115+
inFlightTasks.decrementAndGet()
116+
117+
/*
118+
* Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue.
119+
* Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
120+
* T1: submit task, start execution, R == 1
121+
* T2: commit slot for next task, R == 2
122+
* T1: finish T1, poll queue (it's still empty), R == 2
123+
* T2: submit next task to the local queue, decrement R, R == 1
124+
* T1: decrement R, finish. R == 0
125+
*
126+
* The task from T2 is stuck is the local queue
127+
*/
128+
next = queue.poll() ?: return
129+
dispatch(next, true)
130+
}
131+
132+
private inner class WrappedTask(val runnable: Runnable) : Runnable {
133+
override fun run() {
134+
try {
135+
runnable.run()
136+
} finally {
137+
afterTask()
138+
}
139+
}
140+
}
141+
142+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) = dispatcher.scheduleResumeAfterDelay(time, unit, continuation)
25143
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,29 @@ import java.util.*
55
internal typealias Task = TimedTask
66
internal typealias GlobalQueue = Queue<Task>
77

8-
// 100us is default resolution
8+
// 100us as default
99
internal val WORK_STEALING_TIME_RESOLUTION_NS = readFromSystemProperties(
10-
"kotlinx.coroutines.scheduler.resolution.ns", 100000L, String::toLongOrNull)
10+
"kotlinx.coroutines.scheduler.resolution.ns", 100000L)
1111

1212
internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = readFromSystemProperties(
13-
"kotlinx.coroutines.scheduler.offload.threshold", 96L, String::toLongOrNull)
13+
"kotlinx.coroutines.scheduler.offload.threshold", 96L)
14+
15+
internal val BLOCKING_DEFAULT_PARALLELISM = readFromSystemProperties(
16+
"kotlinx.coroutines.scheduler.blocking.parallelism", 16L).toInt()
17+
18+
internal val MAX_POOL_SIZE = readFromSystemProperties(
19+
"kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128L).toInt()
1420

1521
internal var schedulerTimeSource: TimeSource = NanoTimeSource
1622

17-
internal data class TimedTask(val submissionTime: Long, val task: Runnable)
23+
internal enum class TaskMode {
24+
// Marker indicating that task is CPU-bound and will not block
25+
NON_BLOCKING,
26+
// Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required
27+
PROBABLY_BLOCKING,
28+
}
29+
30+
internal data class TimedTask(val task: Runnable, val submissionTime: Long, val mode: TaskMode)
1831

1932
internal abstract class TimeSource {
2033
abstract fun nanoTime(): Long
@@ -24,13 +37,16 @@ internal object NanoTimeSource : TimeSource() {
2437
override fun nanoTime() = System.nanoTime()
2538
}
2639

27-
private fun <T> readFromSystemProperties(propertyName: String, defaultValue: T, parser: (String) -> T?): T {
40+
private fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
2841
val value = try {
2942
System.getProperty(propertyName)
3043
} catch (e: SecurityException) {
3144
null
3245
} ?: return defaultValue
3346

34-
val parsed = parser(value)
35-
return parsed ?: error("System property '$propertyName' has unrecognized value '$value'")
47+
val parsed = value.toLongOrNull() ?: error("System property '$propertyName' has unrecognized value '$value'")
48+
if (parsed <= 0) {
49+
error("System property '$propertyName' should be positive, but is '$parsed'")
50+
}
51+
return parsed
3652
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package kotlinx.coroutines.experimental.scheduling
22

3-
import kotlinx.atomicfu.atomic
4-
import java.util.concurrent.atomic.AtomicReferenceArray
3+
import kotlinx.atomicfu.*
4+
import java.util.concurrent.atomic.*
55

66
internal const val BUFFER_CAPACITY_BASE = 7
77
internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE
@@ -15,7 +15,7 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
1515
*
1616
* Fairness
1717
* [WorkQueue] provides semi-FIFO order, but with priority for most recently submitted task assuming
18-
* that these two (current and submitted) are communicating and sharing state thus making such communication extremely fast.
18+
* that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast.
1919
* E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order.
2020
*
2121
* Work offloading
@@ -27,8 +27,16 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
2727
*/
2828
internal class WorkQueue {
2929

30+
// todo: There is non-atomicity in computing bufferSize (indices update separately).
31+
// todo: It can lead to arbitrary values of resulting bufferSize.
32+
// todo: Consider merging both indices into a single Long.
33+
// todo: Alternatively, prove that sporadic arbitrary result here is Ok (does not seems the case now)
3034
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
35+
36+
// todo: AtomicReferenceArray has an extra memory indirection.
37+
// todo: In the future (long-term) atomicfu shall support efficient atomic arrays in a platform-specific way (unsafe or varhandels)
3138
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
39+
3240
private val lastScheduledTask = atomic<Task?>(null)
3341

3442
private val producerIndex = atomic(0)
@@ -49,16 +57,25 @@ internal class WorkQueue {
4957
* @param globalQueue fallback queue which is used when the local queue is overflown
5058
* @return true if no offloading happened, false otherwise
5159
*/
52-
fun offer(task: Task, globalQueue: GlobalQueue): Boolean {
53-
while (true) {
54-
val previous = lastScheduledTask.value
55-
if (lastScheduledTask.compareAndSet(previous, task)) {
56-
if (previous != null) {
57-
return addLast(previous, globalQueue)
58-
}
59-
return true
60-
}
60+
fun add(task: Task, globalQueue: GlobalQueue): Boolean {
61+
val previous = lastScheduledTask.getAndSet(task) ?: return true
62+
return addLast(previous, globalQueue)
63+
}
64+
65+
// Called only by the owner
66+
fun addLast(task: Task, globalQueue: GlobalQueue): Boolean {
67+
var addedToGlobalQueue = false
68+
69+
/*
70+
* We need the loop here because race possible not only on full queue,
71+
* but also on queue with one element during stealing
72+
*/
73+
while (!tryAddLast(task)) {
74+
offloadWork(globalQueue)
75+
addedToGlobalQueue = true
6176
}
77+
78+
return !addedToGlobalQueue
6279
}
6380

6481
/**
@@ -74,7 +91,7 @@ internal class WorkQueue {
7491
}
7592

7693
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
77-
offer(lastScheduled, globalQueue)
94+
add(lastScheduled, globalQueue)
7895
return true
7996
}
8097

@@ -90,12 +107,20 @@ internal class WorkQueue {
90107
val task = victim.pollExternal { time - it.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS || victim.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD }
91108
?: return@repeat
92109
stolen = true
93-
offer(task, globalQueue)
110+
add(task, globalQueue)
94111
}
95112

96113
return stolen
97114
}
98115

116+
internal fun size(): Int {
117+
if (lastScheduledTask.value != null) {
118+
return bufferSize + 1
119+
}
120+
121+
return bufferSize
122+
}
123+
99124
/**
100125
* Offloads half of the current buffer to [target]
101126
*/
@@ -126,22 +151,6 @@ internal class WorkQueue {
126151
}
127152
}
128153

129-
// Called only by the owner
130-
private fun addLast(task: Task, globalQueue: GlobalQueue): Boolean {
131-
var addedToGlobalQueue = false
132-
133-
/*
134-
* We need the loop here because race possible not only on full queue,
135-
* but also on queue with one element during stealing
136-
*/
137-
while (!tryAddLast(task)) {
138-
offloadWork(globalQueue)
139-
addedToGlobalQueue = true
140-
}
141-
142-
return !addedToGlobalQueue
143-
}
144-
145154
// Called only by the owner
146155
private fun tryAddLast(task: Task): Boolean {
147156
if (bufferSize == BUFFER_CAPACITY - 1) return false

0 commit comments

Comments
 (0)