Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,22 @@ internal class CoroutineScheduler(
*/
while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
if (isTerminated || state == WorkerState.TERMINATED) break
tryReleaseCpu(WorkerState.PARKING)
val hadCpu = tryReleaseCpu(WorkerState.PARKING)
if (hadCpu && !globalCpuQueue.isEmpty) {
/*
* Prevents the following race: consider corePoolSize = 1
* - T_CPU holds the only CPU permit, scans the tasks, doesn't find anything,
* places itself on a stack
* - T_CPU scans again, doesn't find anything again, a thread switch happens at tryPark()
* - T_B (or several workers in BLOCKING mode) also put themselves on the stack, on top of the T_CPU
* - T* (not a worker) dispatches CPU tasks, wakes up T_B
* - T_B can't acquire a CPU permit, scans blocking queue, doesn't find anything, parks
* - T_CPU releases the CPU permit, parks
* - there are tasks in the CPU queue, but all workers are parked,
* so the scheduler won't make progress until there is another dispatch
*/
break
}
interrupted() // Cleanup interruptions
park()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.coroutines.scheduling

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import kotlinx.coroutines.scheduling.CoroutineScheduler.Companion.MAX_SUPPORTED_POOL_SIZE
import org.junit.*
import java.util.concurrent.*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kotlinx.coroutines.scheduling

import kotlinx.coroutines.testing.*
import java.util.concurrent.*
import kotlin.random.Random
import kotlin.test.*

class CoroutineSchedulerLostCpuPermitStressTest: TestBase() {

// See https://github.com/Kotlin/kotlinx.coroutines/issues/4491
@Test
fun testLostCpuPermit() {
// Sometimes, this test passes after a million iterations with the bug present!
val iterations = 100_000 * stressTestMultiplier
CoroutineScheduler(1, 2, Long.MAX_VALUE).use { scheduler ->
repeat(iterations) {
// Start a CPU worker
val cpuLatch = CountDownLatch(1)
scheduler.dispatch({
cpuLatch.countDown()
}, NonBlockingContext, false)
val cpuInitiallyAvailable = cpuLatch.await(1, TimeUnit.SECONDS)
assertTrue(cpuInitiallyAvailable, "Failed to start CPU worker on iteration $it")
// The CPU worker has finished its task and put itself on the stack.
// Spawn another worker on top of it.
val ioLatch = CountDownLatch(1)
scheduler.dispatch({
ioLatch.countDown()
}, BlockingContext, false)
ioLatch.await()
val finalLatch = CountDownLatch(1)
scheduler.dispatch({
finalLatch.countDown()
}, NonBlockingContext, false)
val cpuAvailableAfterBlocking = finalLatch.await(1, TimeUnit.SECONDS)
assertTrue(cpuAvailableAfterBlocking, "Lost CPU permit on iteration $it")
}
}
}
}