Skip to content

Commit c5bc9fe

Browse files
elizarovqwwdfsad
authored andcommitted
Restore the logic of having at least two CPU workers when corePoolSize > 1
1 parent db22e40 commit c5bc9fe

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -366,9 +366,16 @@ internal class CoroutineScheduler(
366366
val created = createdWorkers(state)
367367
val blocking = blockingWorkers(state)
368368
val cpuWorkers = created - blocking
369-
// If most of created workers are blocking, we should create one more thread to handle non-blocking work
370-
if (cpuWorkers < corePoolSize && createNewWorker()) {
371-
return
369+
/*
370+
* We check how many threads are there to handle non-blocking work,
371+
* and create one more if we have not enough of them.
372+
*/
373+
if (cpuWorkers < corePoolSize) {
374+
val newCpuWorkers = createNewWorker()
375+
// If we've created the first cpu worker and corePoolSize > 1 then create
376+
// one more (second) cpu worker, so that stealing between them is operational
377+
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
378+
if (newCpuWorkers > 0) return
372379
}
373380
// Try unpark again in case there was race between permit release and parking
374381
tryUnpark()
@@ -418,7 +425,11 @@ internal class CoroutineScheduler(
418425
}
419426
}
420427

421-
private fun createNewWorker(): Boolean {
428+
/*
429+
* Returns the number of CPU workers after this function (including new worker) or
430+
* 0 if no worker was created.
431+
*/
432+
private fun createNewWorker(): Int {
422433
synchronized(workers) {
423434
// for test purposes make sure we're not trying to resurrect terminated scheduler
424435
require(!isTerminated.value) { "This scheduler was terminated "}
@@ -427,14 +438,14 @@ internal class CoroutineScheduler(
427438
val blocking = blockingWorkers(state)
428439
val cpuWorkers = created - blocking
429440
// Double check for overprovision
430-
if (cpuWorkers >= corePoolSize) return false
431-
if (created >= maxPoolSize || cpuPermits.availablePermits() == 0) return false
441+
if (cpuWorkers >= corePoolSize) return 0
442+
if (created >= maxPoolSize || cpuPermits.availablePermits() == 0) return 0
432443
// start & register new worker
433444
val newIndex = incrementCreatedWorkers()
434445
require(newIndex > 0 && workers[newIndex] == null)
435446
val worker = Worker(newIndex).apply { start() }
436447
workers[newIndex] = worker
437-
return true
448+
return cpuWorkers + 1
438449
}
439450
}
440451

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ abstract class SchedulerTestBase : TestBase() {
9090
return _dispatcher!!.blocking(parallelism) + handler
9191
}
9292

93+
fun initialPoolSize() = corePoolSize.coerceAtMost(2)
94+
9395
@After
9496
fun after() {
9597
runBlocking {

0 commit comments

Comments
 (0)