Skip to content

Commit 0f5b8df

Browse files
committed
IJPL-149823 Parallelism compensation for CoroutineDispatchers and runBlocking
1 parent fc26b8e commit 0f5b8df

12 files changed

+569
-21
lines changed

IntelliJ-patches.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,28 @@ We provide a single method `kotlinx.coroutines.internal.intellij.IntellijCorouti
1919
The invariant is that the result of this method is always equal to `coroutineContext` in suspending environment,
2020
and it does not change during the non-suspending execution within the same thread.
2121

22+
23+
## Parallelism compensation for `CoroutineDispatcher`s
24+
25+
If `runBlocking` happens to be invoked on a thread from `CoroutineDispatcher`, it may cause a thread starvation problem
26+
(Kotlin#3983). This happens because `runBlocking` does not release an associated computational permit while it parks the
27+
thread. To fix this, a parallelism compensation mechanism is introduced. When `runBlocking` decides to park a
28+
`CoroutineDispatcher` thread, it first increases the allowed parallelism limit of the `CoroutineDispatcher`. After
29+
the thread unparks, `runBlocking` notifies the dispatcher that the parallelism limit should be lowered back. It is
30+
important that the effective parallelism may temporarily exceed the current allowed parallelism limit. The
31+
`CoroutineDispatcher`'s worker take care of adjusting the effective parallelism if it needs to be decreased.
32+
33+
It is easy to see that this behavior cannot be general for `CoroutineDispatcher`s, at least because it breaks the contract
34+
of `LimitedDispatcher` (one that can be acquired via `.limitedParallelism`). It means that parallelism compensation
35+
cannot work for `LimitedDispatcher`, so `runBlocking` can still cause starvation issues there, but it seems rather
36+
expected.
37+
38+
Parallelism compensation support is internal and is implemented for `Dispatchers.Default` and `Dispatchers.IO`.
39+
To acquire an analogue of `limitedParallelism` dispatcher which supports parallelism compensation, use
40+
`IntellijCoroutines.softLimitedParallelism`. Be advised that not every `.limitedParallelism` call can be substituted
41+
with `.softLimitedParallelism`, e.g., `.limitedParallelism(1)` may be used as a synchronization manager and in this case
42+
exceeding the parallelism limit would eliminate this (likely expected) side effect.
43+
44+
### API
45+
- `CoroutineDispatcher.softLimitedParallelism` – an analogue of `.limitedParallelism` which supports
46+
parallelism compensation
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.scheduling.ParallelismCompensation
6+
import kotlin.coroutines.*
7+
8+
/**
9+
* Introduced as part of IntelliJ patches.
10+
*
11+
* CoroutineDispatchers may optionally implement this interface to declare an ability to construct [SoftLimitedDispatcher]
12+
* on top of themselves. This is not possible in general case, because the worker of the underlying dispatcher must
13+
* implement [ParallelismCompensation] and properly propagate such requests to the task it is running.
14+
*/
15+
internal interface SoftLimitedParallelism {
16+
fun softLimitedParallelism(parallelism: Int): CoroutineDispatcher
17+
}
18+
19+
/**
20+
* Introduced as part of IntelliJ patches.
21+
*/
22+
internal fun CoroutineDispatcher.softLimitedParallelism(parallelism: Int): CoroutineDispatcher {
23+
if (this is SoftLimitedParallelism) {
24+
return this.softLimitedParallelism(parallelism)
25+
}
26+
// SoftLimitedDispatcher cannot be used on top of LimitedDispatcher, because the latter doesn't propagate compensation requests
27+
throw UnsupportedOperationException("CoroutineDispatcher.softLimitedParallelism cannot be applied to $this")
28+
}
29+
30+
/**
31+
* Introduced as part of IntelliJ patches.
32+
*
33+
* Shamelessly copy-pasted from [LimitedDispatcher], but [ParallelismCompensation] is
34+
* implemented for [Worker] to allow compensation.
35+
*
36+
* [ParallelismCompensation] breaks the contract of [LimitedDispatcher] so a separate class is made to implement a
37+
* dispatcher that mostly behaves as limited, but can temporarily increase parallelism if necessary.
38+
*/
39+
internal class SoftLimitedDispatcher(
40+
private val dispatcher: CoroutineDispatcher,
41+
parallelism: Int
42+
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay), SoftLimitedParallelism {
43+
private val initialParallelism = parallelism
44+
// `parallelism limit - runningWorkers`; may be < 0 if decompensation is expected
45+
private val availablePermits = atomic(parallelism)
46+
47+
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
48+
49+
private val workerAllocationLock = SynchronizedObject()
50+
51+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
52+
return super.limitedParallelism(parallelism)
53+
}
54+
55+
override fun softLimitedParallelism(parallelism: Int): CoroutineDispatcher {
56+
parallelism.checkParallelism()
57+
if (parallelism >= initialParallelism) return this
58+
return SoftLimitedDispatcher(this, parallelism)
59+
}
60+
61+
override fun dispatch(context: CoroutineContext, block: Runnable) {
62+
dispatchInternal(block) { worker ->
63+
dispatcher.dispatch(this, worker)
64+
}
65+
}
66+
67+
@InternalCoroutinesApi
68+
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
69+
dispatchInternal(block) { worker ->
70+
dispatcher.dispatchYield(this, worker)
71+
}
72+
}
73+
74+
/**
75+
* Tries to dispatch the given [block].
76+
* If there are not enough workers, it starts a new one via [startWorker].
77+
*/
78+
private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
79+
queue.addLast(block)
80+
if (availablePermits.value <= 0) return
81+
if (!tryAllocateWorker()) return
82+
val task = obtainTaskOrDeallocateWorker() ?: return
83+
startWorker(Worker(task))
84+
}
85+
86+
/**
87+
* Tries to obtain the permit to start a new worker.
88+
*/
89+
private fun tryAllocateWorker(): Boolean {
90+
synchronized(workerAllocationLock) {
91+
val permits = availablePermits.value
92+
if (permits <= 0) return false
93+
return availablePermits.compareAndSet(permits, permits - 1)
94+
}
95+
}
96+
97+
/**
98+
* Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
99+
*/
100+
private fun obtainTaskOrDeallocateWorker(): Runnable? {
101+
val permits = availablePermits.value
102+
if (permits < 0) { // decompensation
103+
if (availablePermits.compareAndSet(permits, permits + 1)) return null
104+
}
105+
while (true) {
106+
when (val nextTask = queue.removeFirstOrNull()) {
107+
null -> synchronized(workerAllocationLock) {
108+
availablePermits.incrementAndGet()
109+
if (queue.size == 0) return null
110+
availablePermits.decrementAndGet()
111+
}
112+
else -> return nextTask
113+
}
114+
}
115+
}
116+
117+
/**
118+
* Every running Worker holds a permit
119+
*/
120+
private inner class Worker(private var currentTask: Runnable) : Runnable, ParallelismCompensation {
121+
override fun run() {
122+
var fairnessCounter = 0
123+
while (true) {
124+
try {
125+
currentTask.run()
126+
} catch (e: Throwable) {
127+
handleCoroutineException(EmptyCoroutineContext, e)
128+
}
129+
currentTask = obtainTaskOrDeallocateWorker() ?: return
130+
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
131+
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@SoftLimitedDispatcher)) {
132+
// Do "yield" to let other views execute their runnable as well
133+
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
134+
dispatcher.dispatch(this@SoftLimitedDispatcher, this)
135+
return
136+
}
137+
}
138+
}
139+
140+
override fun increaseParallelismAndLimit() {
141+
val newTask = obtainTaskOrDeallocateWorker() // either increases the number of permits or we launch a new worker (which holds a permit)
142+
if (newTask != null) {
143+
dispatcher.dispatch(this@SoftLimitedDispatcher, Worker(newTask))
144+
}
145+
(currentTask as? ParallelismCompensation)?.increaseParallelismAndLimit()
146+
}
147+
148+
override fun decreaseParallelismLimit() {
149+
try {
150+
(currentTask as? ParallelismCompensation)?.decreaseParallelismLimit()
151+
} finally {
152+
availablePermits.decrementAndGet()
153+
}
154+
}
155+
}
156+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package kotlinx.coroutines.scheduling
2+
3+
/**
4+
* Introduced as part of IntelliJ patches.
5+
*
6+
* Runnables that are dispatched on [kotlinx.coroutines.CoroutineDispatcher] may optionally implement this interface
7+
* to declare an ability to compensate the associated parallelism resource.
8+
*/
9+
internal interface ParallelismCompensation {
10+
/**
11+
* Should increase both the limit and the effective parallelism.
12+
*/
13+
fun increaseParallelismAndLimit()
14+
15+
/**
16+
* Should only decrease the parallelism limit. The effective parallelism may temporarily stay higher than this limit.
17+
* Runnable should take care of checking whether effective parallelism needs to decrease to meet the desired limit.
18+
*/
19+
fun decreaseParallelismLimit()
20+
}

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package kotlinx.coroutines
66

7-
import java.util.concurrent.locks.*
7+
import kotlinx.coroutines.scheduling.withCompensatedParallelism
88
import kotlin.contracts.*
99
import kotlin.coroutines.*
1010

@@ -95,7 +95,11 @@ private class BlockingCoroutine<T>(
9595
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
9696
// note: process next even may loose unpark flag, so check if completed before parking
9797
if (isCompleted) break
98-
parkNanos(this, parkNanos)
98+
if (parkNanos > 0) {
99+
withCompensatedParallelism {
100+
parkNanos(this, parkNanos)
101+
}
102+
}
99103
}
100104
} finally { // paranoia
101105
eventLoop?.decrementUseCount()

kotlinx-coroutines-core/jvm/src/internal/intellij/intellij.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
*/
44
package kotlinx.coroutines.internal.intellij
55

6+
import kotlinx.coroutines.CoroutineDispatcher
67
import kotlinx.coroutines.InternalCoroutinesApi
8+
import kotlinx.coroutines.internal.softLimitedParallelism as softLimitedParallelismImpl
9+
import kotlinx.coroutines.internal.SoftLimitedDispatcher
10+
import kotlinx.coroutines.Dispatchers
711
import kotlin.coroutines.*
812

913
internal val currentContextThreadLocal : ThreadLocal<CoroutineContext?> = ThreadLocal.withInitial { null }
@@ -14,7 +18,6 @@ internal val currentContextThreadLocal : ThreadLocal<CoroutineContext?> = Thread
1418
*/
1519
@InternalCoroutinesApi
1620
public object IntellijCoroutines {
17-
1821
/**
1922
* IntelliJ Platform would like to introspect coroutine contexts outside the coroutine framework.
2023
* This function is a non-suspend version of [coroutineContext].
@@ -25,4 +28,15 @@ public object IntellijCoroutines {
2528
public fun currentThreadCoroutineContext(): CoroutineContext? {
2629
return currentContextThreadLocal.get()
2730
}
31+
32+
/**
33+
* Constructs a [SoftLimitedDispatcher] from the specified [CoroutineDispatcher].
34+
* [SoftLimitedDispatcher] behaves as [LimitedDispatcher][kotlinx.coroutines.internal.LimitedDispatcher] but allows
35+
* temporarily exceeding the parallelism limit in case [parallelism compensation][kotlinx.coroutines.scheduling.withCompensatedParallelism]
36+
* was requested (e.g., by [kotlinx.coroutines.runBlocking]).
37+
*
38+
* This extension can only be used on instances of [Dispatchers.Default], [Dispatchers.IO] and also on what this extension
39+
* has returned. Throws [UnsupportedOperationException] if [this] does not support parallelism compensation mechanism.
40+
*/
41+
public fun CoroutineDispatcher.softLimitedParallelism(parallelism: Int): CoroutineDispatcher = softLimitedParallelismImpl(parallelism)
2842
}

0 commit comments

Comments
 (0)