Skip to content

Commit 1df0be5

Browse files
authored
Introduce CoroutineDispatcher.limitedParallelism and make Dispatchers.IO unbounded for limited parallelism (#2918)
* Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control * Elastic Dispatchers.IO: * Extract Ktor-obsolete API to a separate file for backwards compatibility * Make Dispatchers.IO being a slice of unlimited blocking scheduler * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler Fixes #2943 Fixes #2919
1 parent 8d1ee7d commit 1df0be5

29 files changed

+755
-347
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {
3030
coroutineContext = when {
3131
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
3232
dispatcher == "scheduler" -> {
33-
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
33+
Dispatchers.Default
3434
}
3535
dispatcher.startsWith("ftp") -> {
3636
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }

benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@ package benchmarks
66

77
import benchmarks.common.*
88
import kotlinx.coroutines.*
9-
import kotlinx.coroutines.channels.Channel
10-
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
11-
import kotlinx.coroutines.sync.Semaphore
12-
import kotlinx.coroutines.sync.withPermit
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.sync.*
1311
import org.openjdk.jmh.annotations.*
14-
import java.util.concurrent.ForkJoinPool
15-
import java.util.concurrent.TimeUnit
12+
import java.util.concurrent.*
1613

1714
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
1815
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@@ -84,7 +81,7 @@ open class SemaphoreBenchmark {
8481

8582
enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
8683
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
87-
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
84+
EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account
8885
}
8986

9087
private const val WORK_INSIDE = 80

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,8 @@ import kotlin.coroutines.*
2727
@State(Scope.Benchmark)
2828
open class PingPongWithBlockingContext {
2929

30-
@UseExperimental(InternalCoroutinesApi::class)
31-
private val experimental = ExperimentalCoroutineDispatcher(8)
32-
@UseExperimental(InternalCoroutinesApi::class)
33-
private val blocking = experimental.blocking(8)
30+
private val experimental = Dispatchers.Default
31+
private val blocking = Dispatchers.IO.limitedParallelism(8)
3432
private val threadPool = newFixedThreadPoolContext(8, "PongCtx")
3533

3634
@TearDown

integration/kotlinx-coroutines-play-services/test/TaskTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ class TaskTest : TestBase() {
4545
}
4646

4747
@Test
48-
fun testCancelledAsTask() {
49-
val deferred = GlobalScope.async {
48+
fun testCancelledAsTask() = runTest {
49+
val deferred = async(Dispatchers.Default) {
5050
delay(100)
5151
}.apply { cancel() }
5252

@@ -60,8 +60,8 @@ class TaskTest : TestBase() {
6060
}
6161

6262
@Test
63-
fun testThrowingAsTask() {
64-
val deferred = GlobalScope.async<Int> {
63+
fun testThrowingAsTask() = runTest({ e -> e is TestException }) {
64+
val deferred = async<Int>(Dispatchers.Default) {
6565
throw TestException("Fail")
6666
}
6767

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
156156
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
157157
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
158158
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
159+
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
159160
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
160161
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
161162
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
@@ -447,6 +448,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
447448
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
448449
public fun <init> ()V
449450
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
451+
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
450452
public fun toString ()Ljava/lang/String;
451453
protected final fun toStringInternalImpl ()Ljava/lang/String;
452454
}

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,45 @@ public abstract class CoroutineDispatcher :
6161
*/
6262
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
6363

64+
/**
65+
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
66+
* The resulting view uses the original dispatcher for execution, but with the guarantee that
67+
* no more than [parallelism] coroutines are executed at the same time.
68+
*
69+
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
70+
* each view controls its own parallelism independently with the guarantee that the effective parallelism
71+
* of all views cannot exceed the actual parallelism of the original dispatcher.
72+
*
73+
* ### Limitations
74+
*
75+
* The default implementation of `limitedParallelism` does not support direct dispatchers,
76+
* such as executing the given runnable in place during [dispatch] calls.
77+
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
78+
* For direct dispatchers, it is recommended to override this method
79+
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
80+
*
81+
* ### Example of usage
82+
* ```
83+
* private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
84+
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
85+
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
86+
* // At most 3 threads will be processing JSON to avoid image processing starvation
87+
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
88+
* // At most 1 thread will be doing IO
89+
* private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
90+
* ```
91+
* is 6. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
92+
*
93+
* Note that this example was structured in such a way that it illustrates the parallelism guarantees.
94+
* In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a
95+
* `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them.
96+
*/
97+
@ExperimentalCoroutinesApi
98+
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
99+
parallelism.checkParallelism()
100+
return LimitedDispatcher(this, parallelism)
101+
}
102+
64103
/**
65104
* Dispatches execution of a runnable [block] onto another thread in the given [context].
66105
* This method should guarantee that the given [block] will be eventually invoked,

kotlinx-coroutines-core/common/src/EventLoop.common.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ internal abstract class EventLoop : CoroutineDispatcher() {
115115
}
116116
}
117117

118+
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
119+
parallelism.checkParallelism()
120+
return this
121+
}
122+
118123
open fun shutdown() {}
119124
}
120125

@@ -525,4 +530,3 @@ internal expect fun nanoTime(): Long
525530
internal expect object DefaultExecutor {
526531
public fun enqueue(task: Runnable)
527532
}
528-

kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
8+
79
/**
810
* Base class for special [CoroutineDispatcher] which is confined to application "Main" or "UI" thread
911
* and used for any UI-based activities. Instance of `MainDispatcher` can be obtained by [Dispatchers.Main].
@@ -51,6 +53,12 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
5153
*/
5254
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"
5355

56+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
57+
parallelism.checkParallelism()
58+
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
59+
return this
60+
}
61+
5462
/**
5563
* Internal method for more specific [toString] implementations. It returns non-null
5664
* string if this dispatcher is set in the platform as the main one.

kotlinx-coroutines-core/common/src/Unconfined.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ import kotlin.jvm.*
1111
* A coroutine dispatcher that is not confined to any specific thread.
1212
*/
1313
internal object Unconfined : CoroutineDispatcher() {
14+
15+
@ExperimentalCoroutinesApi
16+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
17+
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
18+
}
19+
1420
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
1521

1622
override fun dispatch(context: CoroutineContext, block: Runnable) {
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.coroutines.*
9+
import kotlin.jvm.*
10+
11+
/**
12+
* The result of .limitedParallelism(x) call, a dispatcher
13+
* that wraps the given dispatcher, but limits the parallelism level, while
14+
* trying to emulate fairness.
15+
*/
16+
internal class LimitedDispatcher(
17+
private val dispatcher: CoroutineDispatcher,
18+
private val parallelism: Int
19+
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
20+
21+
@Volatile
22+
private var runningWorkers = 0
23+
24+
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
25+
26+
@ExperimentalCoroutinesApi
27+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
28+
parallelism.checkParallelism()
29+
if (parallelism >= this.parallelism) return this
30+
return super.limitedParallelism(parallelism)
31+
}
32+
33+
override fun run() {
34+
var fairnessCounter = 0
35+
while (true) {
36+
val task = queue.removeFirstOrNull()
37+
if (task != null) {
38+
try {
39+
task.run()
40+
} catch (e: Throwable) {
41+
handleCoroutineException(EmptyCoroutineContext, e)
42+
}
43+
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
44+
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
45+
// Do "yield" to let other views to execute their runnable as well
46+
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
47+
dispatcher.dispatch(this, this)
48+
return
49+
}
50+
continue
51+
}
52+
53+
@Suppress("CAST_NEVER_SUCCEEDS")
54+
synchronized(this as SynchronizedObject) {
55+
--runningWorkers
56+
if (queue.size == 0) return
57+
++runningWorkers
58+
fairnessCounter = 0
59+
}
60+
}
61+
}
62+
63+
override fun dispatch(context: CoroutineContext, block: Runnable) {
64+
dispatchInternal(block) {
65+
dispatcher.dispatch(this, this)
66+
}
67+
}
68+
69+
@InternalCoroutinesApi
70+
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
71+
dispatchInternal(block) {
72+
dispatcher.dispatchYield(this, this)
73+
}
74+
}
75+
76+
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
77+
// Add task to queue so running workers will be able to see that
78+
if (addAndTryDispatching(block)) return
79+
/*
80+
* Protect against the race when the number of workers is enough,
81+
* but one (because of synchronized serialization) attempts to complete,
82+
* and we just observed the number of running workers smaller than the actual
83+
* number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
84+
*/
85+
if (!tryAllocateWorker()) return
86+
dispatch()
87+
}
88+
89+
private fun tryAllocateWorker(): Boolean {
90+
@Suppress("CAST_NEVER_SUCCEEDS")
91+
synchronized(this as SynchronizedObject) {
92+
if (runningWorkers >= parallelism) return false
93+
++runningWorkers
94+
return true
95+
}
96+
}
97+
98+
private fun addAndTryDispatching(block: Runnable): Boolean {
99+
queue.addLast(block)
100+
return runningWorkers >= parallelism
101+
}
102+
}
103+
104+
// Save a few bytecode ops
105+
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }

0 commit comments

Comments
 (0)