Skip to content

Commit 5c7454b

Browse files
elizarovqwwdfsad
authored andcommitted
Scheduler tests infrastructure
Support "kotlinx.coroutines.scheduler" property to change DefaultDispatcher Updated scheduler shrink test with custom idleWorkerKeepAliveNs config, so that they execute faster Fix submissions from another instance of CoroutineScheduler Run JDK16 tests and tests under new scheduler tests only during nightly stress test
1 parent 216e4d3 commit 5c7454b

File tree

49 files changed

+279
-215
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+279
-215
lines changed

core/kotlinx-coroutines-core/build.gradle

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,21 @@ task jdk16Test(type: Test, dependsOn: [testClasses, checkJdk16]) {
4444
exclude '**/*CancellableContinuationExceptionHandlingTest.*'
4545
}
4646

47-
task moreTest(dependsOn: [lockFreedomTest, jdk16Test])
47+
task schedulerTest(type: Test, dependsOn: testClasses) {
48+
systemProperty 'kotlinx.coroutines.scheduler', ''
49+
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
50+
}
51+
52+
// Always run those tests
53+
task moreTest(dependsOn: [lockFreedomTest])
4854

4955
build.dependsOn moreTest
5056

57+
// Run these tests only during nightly stress test
58+
task extraTest(dependsOn: [jdk16Test, schedulerTest])
59+
60+
extraTest.onlyIf { project.properties['stressTest'] != null }
61+
5162
task testsJar(type: Jar, dependsOn: testClasses) {
5263
classifier = 'tests'
5364
from sourceSets.test.output

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.coroutines.experimental.internal.*
2020
import java.util.concurrent.atomic.*
2121
import kotlin.coroutines.experimental.*
22+
import kotlinx.coroutines.experimental.scheduling.*
2223

2324
/**
2425
* Name of the property that control coroutine debugging. See [newCoroutineContext].
@@ -56,14 +57,25 @@ internal fun resetCoroutineId() {
5657
COROUTINE_ID.set(0)
5758
}
5859

60+
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
61+
62+
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
63+
when (value) {
64+
null -> false
65+
"", "on" -> true
66+
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
67+
}
68+
}
69+
5970
/**
6071
* This is the default [CoroutineDispatcher] that is used by all standard builders like
6172
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
6273
*
6374
* It is currently equal to [CommonPool], but the value is subject to change in the future.
6475
*/
6576
@Suppress("PropertyName")
66-
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool
77+
public actual val DefaultDispatcher: CoroutineDispatcher =
78+
if (useCoroutinesScheduler) ExperimentalCoroutineDispatcher() else CommonPool
6779

6880
/**
6981
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.coroutines.experimental.scheduling
22

33
import kotlinx.atomicfu.*
44
import kotlinx.coroutines.experimental.*
5-
import kotlinx.coroutines.experimental.channels.*
65
import kotlinx.coroutines.experimental.internal.*
76
import java.io.Closeable
87
import java.util.*
@@ -55,15 +54,11 @@ import java.util.concurrent.locks.*
5554
*/
5655
@Suppress("NOTHING_TO_INLINE")
5756
internal class CoroutineScheduler(
58-
private val schedulerName: String,
5957
private val corePoolSize: Int,
60-
private val maxPoolSize: Int
58+
private val maxPoolSize: Int,
59+
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
60+
private val schedulerName: String = DEFAULT_SCHEDULER_NAME
6161
) : Closeable {
62-
constructor(
63-
corePoolSize: Int,
64-
maxPoolSize: Int
65-
) : this("CoroutineScheduler", corePoolSize, maxPoolSize)
66-
6762
init {
6863
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
6964
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
@@ -74,6 +69,9 @@ internal class CoroutineScheduler(
7469
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
7570
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
7671
}
72+
require(idleWorkerKeepAliveNs > 0) {
73+
"Idle worker keep alive time $idleWorkerKeepAliveNs must be postiive"
74+
}
7775
}
7876

7977
private val globalQueue: GlobalQueue = GlobalQueue()
@@ -454,6 +452,8 @@ internal class CoroutineScheduler(
454452
*/
455453
private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
456454
val worker = Thread.currentThread() as? Worker ?: return NOT_ADDED
455+
if (worker.scheduler !== this) return NOT_ADDED // different scheduler's worker (!!!)
456+
457457
var result = ADDED
458458

459459
if (task.mode == TaskMode.NON_BLOCKING) {
@@ -570,6 +570,8 @@ internal class CoroutineScheduler(
570570
indexInArray = index
571571
}
572572

573+
val scheduler get() = this@CoroutineScheduler
574+
573575
val localQueue: WorkQueue = WorkQueue()
574576

575577
/**
@@ -780,9 +782,9 @@ internal class CoroutineScheduler(
780782
if (!blockingQuiescence()) return
781783
terminationState.value = ALLOWED
782784
// set termination deadline the first time we are here (it is reset in idleReset)
783-
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + IDLE_WORKER_KEEP_ALIVE_NS
785+
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
784786
// actually park
785-
doPark(IDLE_WORKER_KEEP_ALIVE_NS)
787+
doPark(idleWorkerKeepAliveNs)
786788
// try terminate when we are idle past termination deadline
787789
// note, that comparison is written like this to protect against potential nanoTime wraparound
788790
if (System.nanoTime() - terminationDeadline >= 0) {
@@ -813,15 +815,15 @@ internal class CoroutineScheduler(
813815
/*
814816
* At this point this thread is no longer considered as usable for scheduling.
815817
* We need multi-step choreography to reindex workers.
816-
*
818+
*
817819
* 1) Read current worker's index and reset it to zero.
818820
*/
819821
val oldIndex = indexInArray
820822
indexInArray = 0
821823
/*
822824
* Now this worker cannot become the top of parkedWorkersStack, but it can
823825
* still be at the stack top via oldIndex.
824-
*
826+
*
825827
* 2) Update top of stack if it was pointing to oldIndex and make sure no
826828
* pending push/pop operation that might have already retrieved oldIndex could complete.
827829
*/

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,23 @@ import kotlin.coroutines.experimental.*
99
/**
1010
* @suppress **This is unstable API and it is subject to change.**
1111
*/
12-
// TODO make internal after integration wih Ktor
12+
// TODO make internal (and rename) after complete integration
1313
class ExperimentalCoroutineDispatcher(
14-
corePoolSize: Int = CORE_POOL_SIZE,
15-
maxPoolSize: Int = MAX_POOL_SIZE
14+
private val corePoolSize: Int,
15+
private val maxPoolSize: Int,
16+
private val idleWorkerKeepAliveNs: Long
1617
) : CoroutineDispatcher(), Delay, Closeable {
17-
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
18+
constructor(
19+
corePoolSize: Int = CORE_POOL_SIZE,
20+
maxPoolSize: Int = MAX_POOL_SIZE
21+
) : this(
22+
corePoolSize,
23+
maxPoolSize,
24+
IDLE_WORKER_KEEP_ALIVE_NS
25+
)
26+
27+
// This is variable for test purposes, so that we can reinitialize from clean state
28+
private var coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
1829

1930
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
2031
coroutineScheduler.dispatch(block)
@@ -45,6 +56,17 @@ class ExperimentalCoroutineDispatcher(
4556

4657
internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit =
4758
coroutineScheduler.dispatch(block, context, fair)
59+
60+
// fot tests only
61+
internal fun usePrivateScheduler() {
62+
coroutineScheduler.shutdown(1000L)
63+
coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
64+
}
65+
66+
// for tests only
67+
internal fun shutdown(timeout: Long) {
68+
coroutineScheduler.shutdown(timeout)
69+
}
4870
}
4971

5072
private class LimitingBlockingDispatcher(

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import java.util.concurrent.*
77

88
// TODO most of these fields will be moved to 'object ExperimentalDispatcher'
99

10+
internal const val DEFAULT_SCHEDULER_NAME = "CoroutineScheduler"
11+
1012
// 100us as default
1113
@JvmField
1214
internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
@@ -20,9 +22,12 @@ internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = systemProp(
2022
internal val BLOCKING_DEFAULT_PARALLELISM = systemProp(
2123
"kotlinx.coroutines.scheduler.blocking.parallelism", 16)
2224

25+
// NOTE: we coerce default to at least two threads to give us chances that multi-threading problems
26+
// get reproduced even on a single-core machine, but support explicit setting of 1 thread scheduler if needed.
2327
@JvmField
2428
internal val CORE_POOL_SIZE = systemProp("kotlinx.coroutines.scheduler.core.pool.size",
25-
AVAILABLE_PROCESSORS.coerceAtLeast(2), minValue = 2)
29+
AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
30+
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE)
2631

2732
@JvmField
2833
internal val MAX_POOL_SIZE = systemProp("kotlinx.coroutines.scheduler.max.pool.size",

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlinx.coroutines.experimental.scheduling.*
1920
import org.junit.After
2021
import org.junit.Before
2122
import java.util.concurrent.atomic.AtomicBoolean
@@ -105,17 +106,27 @@ public actual open class TestBase actual constructor() {
105106

106107
@Before
107108
fun before() {
108-
CommonPool.usePrivatePool()
109+
initPoolsBeforeTest()
109110
threadsBefore = currentThreads()
110111
}
111112

112113
@After
113114
fun onCompletion() {
114115
error.get()?.let { throw it }
115116
check(actionIndex.get() == 0 || finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
117+
shutdownPoolsAfterTest()
118+
checkTestThreads(threadsBefore)
119+
}
120+
121+
fun initPoolsBeforeTest() {
122+
CommonPool.usePrivatePool()
123+
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).usePrivateScheduler()
124+
}
125+
126+
fun shutdownPoolsAfterTest() {
116127
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
128+
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).shutdown(SHUTDOWN_TIMEOUT)
117129
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
118-
checkTestThreads(threadsBefore)
119130
}
120131

121132
@Suppress("ACTUAL_WITHOUT_EXPECT", "ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS")

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBaseTest.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,17 @@ import org.junit.*
2121
class TestBaseTest : TestBase() {
2222
@Test
2323
fun testThreadsShutdown() {
24-
val SHUTDOWN_TIMEOUT = 1_000L
2524
repeat(1000 * stressTestMultiplier) { _ ->
26-
CommonPool.usePrivatePool()
25+
initPoolsBeforeTest()
2726
val threadsBefore = currentThreads()
2827
runBlocking {
29-
val sub = launch(DefaultDispatcher) {
28+
val sub = launch {
3029
delay(10000000L)
3130
}
3231
sub.cancel()
3332
sub.join()
3433
}
35-
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
36-
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
34+
shutdownPoolsAfterTest()
3735
checkTestThreads(threadsBefore)
3836
}
3937

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ package kotlinx.coroutines.experimental
1818

1919
import kotlin.test.*
2020

21-
class WithContextCommonPoolTest : TestBase() {
21+
class WithDefaultContextTest : TestBase() {
2222
@Test
23-
fun testCommonPoolNoSuspend() = runTest {
23+
fun testNoSuspend() = runTest {
2424
expect(1)
25-
val result = withContext(CommonPool) {
25+
val result = withContext(DefaultDispatcher) {
2626
expect(2)
2727
"OK"
2828
}
@@ -31,9 +31,9 @@ class WithContextCommonPoolTest : TestBase() {
3131
}
3232

3333
@Test
34-
fun testCommonPoolWithSuspend() = runTest {
34+
fun testWithSuspend() = runTest {
3535
expect(1)
36-
val result = withContext(CommonPool) {
36+
val result = withContext(DefaultDispatcher) {
3737
expect(2)
3838
delay(100)
3939
expect(3)

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class BroadcastChannelSubStressTest(
4949
@Test
5050
fun testStress() = runBlocking {
5151
println("--- BroadcastChannelSubStressTest $kind")
52-
val ctx = coroutineContext + CommonPool
52+
val ctx = coroutineContext + DefaultDispatcher
5353
val sender =
5454
launch(context = ctx + CoroutineName("Sender")) {
5555
while (isActive) {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class ChannelAtomicCancelStressTest(val kind: TestChannelKind) : TestBase() {
107107
}
108108

109109
fun launchSender() {
110-
sender = launch(CommonPool, start = CoroutineStart.ATOMIC) {
110+
sender = launch(DefaultDispatcher, start = CoroutineStart.ATOMIC) {
111111
val rnd = Random()
112112
cancellable(senderDone) {
113113
var counter = 0
@@ -132,7 +132,7 @@ class ChannelAtomicCancelStressTest(val kind: TestChannelKind) : TestBase() {
132132
}
133133

134134
fun launchReceiver() {
135-
receiver = launch(CommonPool, start = CoroutineStart.ATOMIC) {
135+
receiver = launch(DefaultDispatcher, start = CoroutineStart.ATOMIC) {
136136
val rnd = Random()
137137
cancellable(receiverDone) {
138138
while (true) {

0 commit comments

Comments
 (0)