diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3430ebadec..25b79b3ec8 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -798,7 +798,22 @@ internal class CoroutineScheduler( */ while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break - tryReleaseCpu(WorkerState.PARKING) + val hadCpu = tryReleaseCpu(WorkerState.PARKING) + if (hadCpu && !globalCpuQueue.isEmpty) { + /* + * Prevents the following race: consider corePoolSize = 1 + * - T_CPU holds the only CPU permit, scans the tasks, doesn't find anything, + * places itself on a stack + * - T_CPU scans again, doesn't find anything again, a thread switch happens at tryPark() + * - T_B (or several workers in BLOCKING mode) also put themselves on the stack, on top of the T_CPU + * - T* (not a worker) dispatches CPU tasks, wakes up T_B + * - T_B can't acquire a CPU permit, scans blocking queue, doesn't find anything, parks + * - T_CPU releases the CPU permit, parks + * - there are tasks in the CPU queue, but all workers are parked, + * so the scheduler won't make progress until there is another dispatch + */ + break + } interrupted() // Cleanup interruptions park() } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt index 07b549f917..6150450636 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt @@ -1,7 +1,6 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import kotlinx.coroutines.scheduling.CoroutineScheduler.Companion.MAX_SUPPORTED_POOL_SIZE import org.junit.* import java.util.concurrent.* diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLostCpuPermitStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLostCpuPermitStressTest.kt new file mode 100644 index 0000000000..a3d153b11e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLostCpuPermitStressTest.kt @@ -0,0 +1,40 @@ +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.testing.* +import java.util.concurrent.* +import kotlin.random.Random +import kotlin.test.* + +class CoroutineSchedulerLostCpuPermitStressTest: TestBase() { + + // See https://github.com/Kotlin/kotlinx.coroutines/issues/4491 + @Test + fun testLostCpuPermit() { + // Sometimes, this test passes after a million iterations with the bug present! + val iterations = 100_000 * stressTestMultiplier + CoroutineScheduler(1, 2, Long.MAX_VALUE).use { scheduler -> + repeat(iterations) { + // Start a CPU worker + val cpuLatch = CountDownLatch(1) + scheduler.dispatch({ + cpuLatch.countDown() + }, NonBlockingContext, false) + val cpuInitiallyAvailable = cpuLatch.await(1, TimeUnit.SECONDS) + assertTrue(cpuInitiallyAvailable, "Failed to start CPU worker on iteration $it") + // The CPU worker has finished its task and put itself on the stack. + // Spawn another worker on top of it. + val ioLatch = CountDownLatch(1) + scheduler.dispatch({ + ioLatch.countDown() + }, BlockingContext, false) + ioLatch.await() + val finalLatch = CountDownLatch(1) + scheduler.dispatch({ + finalLatch.countDown() + }, NonBlockingContext, false) + val cpuAvailableAfterBlocking = finalLatch.await(1, TimeUnit.SECONDS) + assertTrue(cpuAvailableAfterBlocking, "Lost CPU permit on iteration $it") + } + } + } +}