Skip to content

Commit e4bbb4b

Browse files
committed
Avoid termination races by checking blocking tasks quiescence
1 parent 673c859 commit e4bbb4b

File tree

1 file changed

+17
-8
lines changed
  • core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling

1 file changed

+17
-8
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,8 @@ internal class CoroutineScheduler(
636636
parkedWorkersStack.push(this)
637637
}
638638

639+
if (!blockingQuiescence()) return
640+
639641
terminationState.value = ALLOWED
640642
val time = System.nanoTime()
641643
LockSupport.parkNanos(IDLE_WORKER_KEEP_ALIVE_NS)
@@ -649,14 +651,6 @@ internal class CoroutineScheduler(
649651
* Stops execution of current thread and removes it from [createdWorkers]
650652
*/
651653
private fun terminateWorker() {
652-
// Last ditch polling: try to find blocking task before termination
653-
val task = globalWorkQueue.pollBlockingMode()
654-
if (task != null) {
655-
localQueue.add(task, globalWorkQueue)
656-
return
657-
}
658-
659-
660654
synchronized(workers) {
661655
// Someone else terminated, bail out
662656
if (createdWorkers <= corePoolSize) {
@@ -671,6 +665,9 @@ internal class CoroutineScheduler(
671665
return
672666
}
673667

668+
// Last ditch polling: try to find blocking task before termination
669+
if (!blockingQuiescence()) return
670+
674671
/*
675672
* See tryUnpark for state reasoning.
676673
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate
@@ -690,6 +687,18 @@ internal class CoroutineScheduler(
690687
state = WorkerState.FINISHED
691688
}
692689

690+
/**
691+
* Method checks whether new blocking tasks arrived to pool when worker decided
692+
* it can go to deep park/termination and puts recently arrived task to its local queue
693+
*/
694+
private fun blockingQuiescence(): Boolean {
695+
globalWorkQueue.pollBlockingMode()?.let {
696+
localQueue.add(it, globalWorkQueue)
697+
return false
698+
}
699+
return true
700+
}
701+
693702
private fun idleReset(mode: TaskMode) {
694703
if (state == WorkerState.PARKING) {
695704
assert(mode == TaskMode.PROBABLY_BLOCKING)

0 commit comments

Comments
 (0)