Skip to content

Commit e7d5ad8

Browse files
committed
Make scheduler more configurable
1 parent 37e45cd commit e7d5ad8

File tree

5 files changed

+26
-14
lines changed

5 files changed

+26
-14
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import java.util.concurrent.atomic.AtomicLong
20-
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
21-
import kotlin.coroutines.experimental.ContinuationInterceptor
22-
import kotlin.coroutines.experimental.CoroutineContext
19+
import sun.text.normalizer.UTF16.*
20+
import java.util.concurrent.atomic.*
21+
import kotlin.coroutines.experimental.*
2322

2423
/**
2524
* Name of the property that control coroutine debugging. See [newCoroutineContext].

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ internal class CoroutineScheduler(
7070
* The stack is better than a queue (even with contention on top) because it unparks threads
7171
* in most-recently used order, improving both performance and locality.
7272
* Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
73-
* the latter half will never be unparked and will terminate itself after [BLOCKING_WORKER_KEEP_ALIVE_NS].
73+
* the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
7474
*/
7575
@Suppress("ClassName")
7676
private object parkedWorkersStack
@@ -136,9 +136,6 @@ internal class CoroutineScheduler(
136136
.coerceAtLeast(10)
137137
.coerceAtMost(MAX_PARK_TIME_NS)
138138

139-
@JvmStatic
140-
private val BLOCKING_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(5)
141-
142139
// Local queue 'add' results
143140
private const val ADDED = -1
144141
// Added to the local queue, but pool requires additional worker to keep up
@@ -304,8 +301,7 @@ internal class CoroutineScheduler(
304301
* 'ALLOWED', then proceed, because park will have no effect
305302
*/
306303
if (!worker.terminationState.compareAndSet(terminationState, FORBIDDEN)
307-
&& worker.terminationState.value == TERMINATED
308-
) {
304+
&& worker.terminationState.value == TERMINATED) {
309305
continue
310306
}
311307

@@ -643,9 +639,9 @@ internal class CoroutineScheduler(
643639

644640
terminationState.value = ALLOWED
645641
val time = System.nanoTime()
646-
LockSupport.parkNanos(BLOCKING_WORKER_KEEP_ALIVE_NS)
642+
LockSupport.parkNanos(IDLE_WORKER_KEEP_ALIVE_NS)
647643
// Protection against spurious wakeups of parkNanos
648-
if (System.nanoTime() - time >= BLOCKING_WORKER_KEEP_ALIVE_NS) {
644+
if (System.nanoTime() - time >= IDLE_WORKER_KEEP_ALIVE_NS) {
649645
terminateWorker()
650646
}
651647
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import java.util.concurrent.*
77
import kotlin.coroutines.experimental.*
88

99
// TODO make internal after integration wih Ktor
10-
class ExperimentalCoroutineDispatcher(corePoolSize: Int = Runtime.getRuntime().availableProcessors(), maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
10+
class ExperimentalCoroutineDispatcher(corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
1111

1212
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
1313

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,38 @@
11
package kotlinx.coroutines.experimental.scheduling
22

3+
import java.util.concurrent.*
4+
35

46
internal typealias Task = TimedTask
57
internal typealias GlobalQueue = TaskQueue
8+
// TODO most of these fields will be moved to 'object ExperimentalDispatcher'
69

710
// 100us as default
11+
@JvmField
812
internal val WORK_STEALING_TIME_RESOLUTION_NS = readFromSystemProperties(
913
"kotlinx.coroutines.scheduler.resolution.ns", 100000L)
1014

15+
@JvmField
1116
internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = readFromSystemProperties(
1217
"kotlinx.coroutines.scheduler.offload.threshold", 96L)
1318

19+
@JvmField
1420
internal val BLOCKING_DEFAULT_PARALLELISM = readFromSystemProperties(
1521
"kotlinx.coroutines.scheduler.blocking.parallelism", 16)
1622

23+
@JvmField
24+
internal val CORE_POOL_SIZE = readFromSystemProperties(
25+
"kotlinx.coroutines.scheduler.core.pool.size", Runtime.getRuntime().availableProcessors().coerceAtLeast(2))
26+
27+
@JvmField
1728
internal val MAX_POOL_SIZE = readFromSystemProperties(
1829
"kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128)
1930

31+
@JvmField
32+
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(readFromSystemProperties(
33+
"kotlinx.coroutines.scheduler.keep.alive.sec", 5L))
34+
35+
@JvmField
2036
internal var schedulerTimeSource: TimeSource = NanoTimeSource
2137

2238
internal enum class TaskMode {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerShrinkTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class CoroutineSchedulerShrinkTest : SchedulerTestBase() {
5757

5858
delay(10, TimeUnit.SECONDS)
5959
// Pool should shrink to core size
60-
checkPoolThreadsExist(CORES_COUNT)
60+
checkPoolThreadsExist(corePoolSize..corePoolSize + 3)
6161
}
6262

6363
private suspend fun checkBlockingTasks(blockingTasks: List<Deferred<*>>) {
@@ -67,6 +67,7 @@ class CoroutineSchedulerShrinkTest : SchedulerTestBase() {
6767
}
6868

6969
@Test(timeout = 15_000)
70+
@Ignore // flaky and non deterministic
7071
fun testShrinkWithExternalTasks() = runBlocking {
7172
val nonBlockingBarrier = CyclicBarrier(CORES_COUNT + 1)
7273
val blockingTasks = launchBlocking()

0 commit comments

Comments
 (0)