Skip to content

Commit 33b2a9a

Browse files
authored
Fix limitedParallelism doing dispatches when it has no tasks (#3672)
To support fairness w.r.t. the tasks that want to run on the wrapped dispatcher outside of its view, `LimitedDispatcher` redispatches the work loop periodically. This can happen even when there are no more tasks in `LimitedDispatcher`'s queue. Typically, this behavior is completely benign: after being redistpatched, the new runner just checks the queue, sees that there is nothing there, and leaves. However, with closable dispatchers, this affects correctness, as this redispatch may happen even after all the dispatched tasks were run and the dispatcher was closed.
1 parent 81baf9c commit 33b2a9a

File tree

3 files changed

+100
-51
lines changed

3 files changed

+100
-51
lines changed

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

Lines changed: 62 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package kotlinx.coroutines.internal
77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.*
99
import kotlin.coroutines.*
10-
import kotlin.jvm.*
1110

1211
/**
1312
* The result of .limitedParallelism(x) call, a dispatcher
@@ -27,7 +26,7 @@ import kotlin.jvm.*
2726
internal class LimitedDispatcher(
2827
private val dispatcher: CoroutineDispatcher,
2928
private val parallelism: Int
30-
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
29+
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
3130

3231
// Atomic is necessary here for the sake of K/N memory ordering,
3332
// there is no need in atomic operations for this property
@@ -45,61 +44,37 @@ internal class LimitedDispatcher(
4544
return super.limitedParallelism(parallelism)
4645
}
4746

48-
override fun run() {
49-
var fairnessCounter = 0
50-
while (true) {
51-
val task = queue.removeFirstOrNull()
52-
if (task != null) {
53-
try {
54-
task.run()
55-
} catch (e: Throwable) {
56-
handleCoroutineException(EmptyCoroutineContext, e)
57-
}
58-
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
59-
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
60-
// Do "yield" to let other views to execute their runnable as well
61-
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
62-
dispatcher.dispatch(this, this)
63-
return
64-
}
65-
continue
66-
}
67-
68-
synchronized(workerAllocationLock) {
69-
runningWorkers.decrementAndGet()
70-
if (queue.size == 0) return
71-
runningWorkers.incrementAndGet()
72-
fairnessCounter = 0
73-
}
74-
}
75-
}
76-
7747
override fun dispatch(context: CoroutineContext, block: Runnable) {
78-
dispatchInternal(block) {
79-
dispatcher.dispatch(this, this)
48+
dispatchInternal(block) { worker ->
49+
dispatcher.dispatch(this, worker)
8050
}
8151
}
8252

8353
@InternalCoroutinesApi
8454
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
85-
dispatchInternal(block) {
86-
dispatcher.dispatchYield(this, this)
55+
dispatchInternal(block) { worker ->
56+
dispatcher.dispatchYield(this, worker)
8757
}
8858
}
8959

90-
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
60+
/**
61+
* Tries to dispatch the given [block].
62+
* If there are not enough workers, it starts a new one via [startWorker].
63+
*/
64+
private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
9165
// Add task to queue so running workers will be able to see that
92-
if (addAndTryDispatching(block)) return
93-
/*
94-
* Protect against the race when the number of workers is enough,
95-
* but one (because of synchronized serialization) attempts to complete,
96-
* and we just observed the number of running workers smaller than the actual
97-
* number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
98-
*/
66+
queue.addLast(block)
67+
if (runningWorkers.value >= parallelism) return
68+
// allocation may fail if some workers were launched in parallel or a worker temporarily decreased
69+
// `runningWorkers` when they observed an empty queue.
9970
if (!tryAllocateWorker()) return
100-
dispatch()
71+
val task = obtainTaskOrDeallocateWorker() ?: return
72+
startWorker(Worker(task))
10173
}
10274

75+
/**
76+
* Tries to obtain the permit to start a new worker.
77+
*/
10378
private fun tryAllocateWorker(): Boolean {
10479
synchronized(workerAllocationLock) {
10580
if (runningWorkers.value >= parallelism) return false
@@ -108,9 +83,49 @@ internal class LimitedDispatcher(
10883
}
10984
}
11085

111-
private fun addAndTryDispatching(block: Runnable): Boolean {
112-
queue.addLast(block)
113-
return runningWorkers.value >= parallelism
86+
/**
87+
* Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
88+
*/
89+
private fun obtainTaskOrDeallocateWorker(): Runnable? {
90+
while (true) {
91+
when (val nextTask = queue.removeFirstOrNull()) {
92+
null -> synchronized(workerAllocationLock) {
93+
runningWorkers.decrementAndGet()
94+
if (queue.size == 0) return null
95+
runningWorkers.incrementAndGet()
96+
}
97+
else -> return nextTask
98+
}
99+
}
100+
}
101+
102+
/**
103+
* A worker that polls the queue and runs tasks until there are no more of them.
104+
*
105+
* It always stores the next task to run. This is done in order to prevent the possibility of the fairness
106+
* re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
107+
* actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
108+
* perform any more dispatches.
109+
*/
110+
private inner class Worker(private var currentTask: Runnable) : Runnable {
111+
override fun run() {
112+
var fairnessCounter = 0
113+
while (true) {
114+
try {
115+
currentTask.run()
116+
} catch (e: Throwable) {
117+
handleCoroutineException(EmptyCoroutineContext, e)
118+
}
119+
currentTask = obtainTaskOrDeallocateWorker() ?: return
120+
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
121+
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
122+
// Do "yield" to let other views execute their runnable as well
123+
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
124+
dispatcher.dispatch(this@LimitedDispatcher, this)
125+
return
126+
}
127+
}
128+
}
114129
}
115130
}
116131

kotlinx-coroutines-core/concurrent/test/LimitedParallelismConcurrentTest.kt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines
77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.exceptions.*
10+
import kotlin.coroutines.*
1011
import kotlin.test.*
1112

1213
class LimitedParallelismConcurrentTest : TestBase() {
@@ -58,4 +59,37 @@ class LimitedParallelismConcurrentTest : TestBase() {
5859
joinAll(j1, j2)
5960
executor.close()
6061
}
62+
63+
/**
64+
* Tests that, when no tasks are present, the limited dispatcher does not dispatch any tasks.
65+
* This is important for the case when a dispatcher is closeable and the [CoroutineDispatcher.limitedParallelism]
66+
* machinery could trigger a dispatch after the dispatcher is closed.
67+
*/
68+
@Test
69+
fun testNotDoingDispatchesWhenNoTasksArePresent() = runTest {
70+
class NaggingDispatcher: CoroutineDispatcher() {
71+
val closed = atomic(false)
72+
override fun dispatch(context: CoroutineContext, block: Runnable) {
73+
if (closed.value)
74+
fail("Dispatcher was closed, but still dispatched a task")
75+
Dispatchers.Default.dispatch(context, block)
76+
}
77+
fun close() {
78+
closed.value = true
79+
}
80+
}
81+
repeat(stressTestMultiplier * 500_000) {
82+
val dispatcher = NaggingDispatcher()
83+
val view = dispatcher.limitedParallelism(1)
84+
val deferred = CompletableDeferred<Unit>()
85+
val job = launch(view) {
86+
deferred.await()
87+
}
88+
launch(Dispatchers.Default) {
89+
deferred.complete(Unit)
90+
}
91+
job.join()
92+
dispatcher.close()
93+
}
94+
}
6195
}

kotlinx-coroutines-core/jvm/test/TestBase.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {
9797

9898
private fun printError(message: String, cause: Throwable) {
9999
setError(cause)
100-
println("$message: $cause")
101-
cause.printStackTrace(System.out)
102-
println("--- Detected at ---")
103-
Throwable().printStackTrace(System.out)
100+
System.err.println("$message: $cause")
101+
cause.printStackTrace(System.err)
102+
System.err.println("--- Detected at ---")
103+
Throwable().printStackTrace(System.err)
104104
}
105105

106106
/**

0 commit comments

Comments
 (0)