Skip to content

Commit ae66c95

Browse files
committed
Runt ticker channel tests under virtual time source for stability
1 parent 7afb62e commit ae66c95

File tree

4 files changed

+278
-249
lines changed

4 files changed

+278
-249
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental
6+
7+
import java.io.*
8+
import java.util.concurrent.*
9+
import java.util.concurrent.locks.*
10+
11+
private const val SHUTDOWN_TIMEOUT = 1000L
12+
13+
internal inline fun withVirtualTimeSource(log: PrintStream = System.`out`, block: () -> Unit) {
14+
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
15+
val testTimeSource = VirtualTimeSource(log)
16+
timeSource = testTimeSource
17+
DefaultExecutor.ensureStarted() // should start with new time source
18+
try {
19+
block()
20+
} finally {
21+
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
22+
testTimeSource.shutdown()
23+
timeSource = DefaultTimeSource // restore time source
24+
}
25+
}
26+
27+
private val NOT_PARKED = -1L
28+
29+
private class ThreadStatus {
30+
@Volatile @JvmField
31+
var parkedTill = NOT_PARKED
32+
@Volatile @JvmField
33+
var permit = false
34+
override fun toString(): String = "parkedTill = ${TimeUnit.NANOSECONDS.toMillis(parkedTill)} ms, permit = $permit"
35+
}
36+
37+
private val MAX_WAIT_NANOS = 10_000_000_000L // 10s
38+
private val REAL_TIME_STEP_NANOS = 200_000_000L // 200 ms
39+
private val REAL_PARK_NANOS = 10_000_000L // 10 ms -- park for a little to better track real-time
40+
41+
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
42+
internal class VirtualTimeSource(
43+
private val log: PrintStream
44+
) : TimeSource {
45+
private val mainThread: Thread = Thread.currentThread()
46+
private var checkpointNanos: Long = System.nanoTime()
47+
48+
@Volatile
49+
private var isShutdown = false
50+
51+
@Volatile
52+
private var time: Long = 0
53+
54+
private var trackedTasks = 0
55+
56+
private val threads = ConcurrentHashMap<Thread, ThreadStatus>()
57+
58+
override fun currentTimeMillis(): Long = TimeUnit.NANOSECONDS.toMillis(time)
59+
override fun nanoTime(): Long = time
60+
61+
@Synchronized
62+
override fun trackTask(block: Runnable): Runnable {
63+
trackedTasks++
64+
return Runnable {
65+
try { block.run() }
66+
finally { unTrackTask() }
67+
}
68+
}
69+
70+
@Synchronized
71+
override fun unTrackTask() {
72+
assert(trackedTasks > 0)
73+
trackedTasks--
74+
}
75+
76+
@Synchronized
77+
override fun registerTimeLoopThread() {
78+
assert(threads.putIfAbsent(Thread.currentThread(), ThreadStatus()) == null)
79+
}
80+
81+
@Synchronized
82+
override fun unregisterTimeLoopThread() {
83+
assert(threads.remove(Thread.currentThread()) != null)
84+
wakeupAll()
85+
}
86+
87+
override fun parkNanos(blocker: Any, nanos: Long) {
88+
if (nanos <= 0) return
89+
val status = threads[Thread.currentThread()]!!
90+
assert(status.parkedTill == NOT_PARKED)
91+
status.parkedTill = time + nanos.coerceAtMost(MAX_WAIT_NANOS)
92+
while (true) {
93+
checkAdvanceTime()
94+
if (isShutdown || time >= status.parkedTill || status.permit) {
95+
status.parkedTill = NOT_PARKED
96+
status.permit = false
97+
break
98+
}
99+
LockSupport.parkNanos(blocker, REAL_PARK_NANOS)
100+
}
101+
}
102+
103+
override fun unpark(thread: Thread) {
104+
val status = threads[thread] ?: return
105+
status.permit = true
106+
LockSupport.unpark(thread)
107+
}
108+
109+
@Synchronized
110+
private fun checkAdvanceTime() {
111+
if (isShutdown) return
112+
val realNanos = System.nanoTime()
113+
if (realNanos > checkpointNanos + REAL_TIME_STEP_NANOS) {
114+
checkpointNanos = realNanos
115+
val minParkedTill = minParkedTill()
116+
time = (time + REAL_TIME_STEP_NANOS).coerceAtMost(if (minParkedTill < 0) Long.MAX_VALUE else minParkedTill)
117+
logTime("R")
118+
wakeupAll()
119+
return
120+
}
121+
if (threads[mainThread] == null) return
122+
if (trackedTasks != 0) return
123+
val minParkedTill = minParkedTill()
124+
if (minParkedTill <= time) return
125+
time = minParkedTill
126+
logTime("V")
127+
wakeupAll()
128+
}
129+
130+
private fun logTime(s: String) {
131+
log.println("[$s: Time = ${TimeUnit.NANOSECONDS.toMillis(time)} ms]")
132+
}
133+
134+
private fun minParkedTill(): Long =
135+
threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED
136+
137+
@Synchronized
138+
fun shutdown() {
139+
isShutdown = true
140+
wakeupAll()
141+
while (!threads.isEmpty()) (this as Object).wait()
142+
}
143+
144+
private fun wakeupAll() {
145+
threads.keys.forEach { LockSupport.unpark(it) }
146+
(this as Object).notifyAll()
147+
}
148+
}

core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -35,70 +35,77 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
3535
}
3636

3737
@Test
38-
fun testDelay() = runTest {
39-
val delayChannel = channelFactory(delay = 100)
40-
delayChannel.checkNotEmpty()
41-
delayChannel.checkEmpty()
42-
43-
delay(50)
44-
delayChannel.checkEmpty()
45-
delay(51)
46-
delayChannel.checkNotEmpty()
47-
48-
delayChannel.cancel()
49-
delay(51)
50-
delayChannel.checkEmpty()
51-
delayChannel.cancel()
38+
fun testDelay() = withVirtualTimeSource {
39+
runTest {
40+
val delayChannel = channelFactory(delay = 10000)
41+
delayChannel.checkNotEmpty()
42+
delayChannel.checkEmpty()
43+
44+
delay(5000)
45+
delayChannel.checkEmpty()
46+
delay(5100)
47+
delayChannel.checkNotEmpty()
48+
49+
delayChannel.cancel()
50+
delay(5100)
51+
delayChannel.checkEmpty()
52+
delayChannel.cancel()
53+
}
5254
}
5355

5456
@Test
55-
fun testInitialDelay() = runTest {
56-
val delayChannel = channelFactory(initialDelay = 75, delay = 100)
57-
delayChannel.checkEmpty()
58-
delay(50)
59-
delayChannel.checkEmpty()
60-
delay(30)
61-
delayChannel.checkNotEmpty()
62-
63-
// Regular delay
64-
delay(75)
65-
delayChannel.checkEmpty()
66-
delay(26)
67-
delayChannel.checkNotEmpty()
68-
delayChannel.cancel()
57+
fun testInitialDelay() = withVirtualTimeSource {
58+
runTest {
59+
val delayChannel = channelFactory(initialDelay = 750, delay = 1000)
60+
delayChannel.checkEmpty()
61+
delay(500)
62+
delayChannel.checkEmpty()
63+
delay(300)
64+
delayChannel.checkNotEmpty()
65+
66+
// Regular delay
67+
delay(750)
68+
delayChannel.checkEmpty()
69+
delay(260)
70+
delayChannel.checkNotEmpty()
71+
delayChannel.cancel()
72+
}
6973
}
7074

71-
7275
@Test
73-
fun testReceive() = runTest {
74-
val delayChannel = channelFactory(delay = 100)
75-
delayChannel.checkNotEmpty()
76-
var value = withTimeoutOrNull(75) {
77-
delayChannel.receive()
78-
1
79-
}
76+
fun testReceive() = withVirtualTimeSource {
77+
runTest {
78+
val delayChannel = channelFactory(delay = 1000)
79+
delayChannel.checkNotEmpty()
80+
var value = withTimeoutOrNull(750) {
81+
delayChannel.receive()
82+
1
83+
}
8084

81-
assertNull(value)
82-
value = withTimeoutOrNull(26) {
83-
delayChannel.receive()
84-
1
85-
}
85+
assertNull(value)
86+
value = withTimeoutOrNull(260) {
87+
delayChannel.receive()
88+
1
89+
}
8690

87-
assertNotNull(value)
88-
delayChannel.cancel()
91+
assertNotNull(value)
92+
delayChannel.cancel()
93+
}
8994
}
9095

9196
@Test
92-
fun testComplexOperator() = runTest {
93-
val producer = produce {
94-
for (i in 1..7) {
95-
send(i)
96-
delay(100)
97+
fun testComplexOperator() = withVirtualTimeSource {
98+
runTest {
99+
val producer = produce {
100+
for (i in 1..7) {
101+
send(i)
102+
delay(1000)
103+
}
97104
}
98-
}
99105

100-
val averages = producer.averageInTimeWindow(300).toList()
101-
assertEquals(listOf(2.0, 5.0, 7.0), averages)
106+
val averages = producer.averageInTimeWindow(3000).toList()
107+
assertEquals(listOf(2.0, 5.0, 7.0), averages)
108+
}
102109
}
103110

104111
private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = produce {

core/kotlinx-coroutines-core/test/channels/TickerChannelTest.kt

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,51 +9,57 @@ import org.junit.*
99

1010
class TickerChannelTest : TestBase() {
1111
@Test
12-
fun testFixedDelayChannelBackpressure() = runTest {
13-
val delayChannel = ticker(delay = 100, initialDelay = 0, mode = TickerMode.FIXED_DELAY)
14-
delayChannel.checkNotEmpty()
15-
delayChannel.checkEmpty()
16-
17-
delay(150)
18-
delayChannel.checkNotEmpty()
19-
delay(50)
20-
delayChannel.checkEmpty()
21-
delay(52)
22-
delayChannel.checkNotEmpty()
23-
delayChannel.cancel()
12+
fun testFixedDelayChannelBackpressure() = withVirtualTimeSource {
13+
runTest {
14+
val delayChannel = ticker(delay = 1000, initialDelay = 0, mode = TickerMode.FIXED_DELAY)
15+
delayChannel.checkNotEmpty()
16+
delayChannel.checkEmpty()
17+
18+
delay(1500)
19+
delayChannel.checkNotEmpty()
20+
delay(500)
21+
delayChannel.checkEmpty()
22+
delay(520)
23+
delayChannel.checkNotEmpty()
24+
delayChannel.cancel()
25+
}
2426
}
2527

2628
@Test
27-
fun testDelayChannelBackpressure() = runTest {
28-
val delayChannel = ticker(delay = 100, initialDelay = 0)
29-
delayChannel.checkNotEmpty()
30-
delayChannel.checkEmpty()
31-
32-
delay(150)
33-
delayChannel.checkNotEmpty()
34-
delay(52)
35-
delayChannel.checkNotEmpty()
36-
delay(50)
37-
delayChannel.checkEmpty()
38-
delay(52)
39-
delayChannel.checkNotEmpty()
40-
delayChannel.cancel()
29+
fun testDelayChannelBackpressure() = withVirtualTimeSource {
30+
runTest {
31+
val delayChannel = ticker(delay = 1000, initialDelay = 0)
32+
delayChannel.checkNotEmpty()
33+
delayChannel.checkEmpty()
34+
35+
delay(1500)
36+
delayChannel.checkNotEmpty()
37+
delay(520)
38+
delayChannel.checkNotEmpty()
39+
delay(500)
40+
delayChannel.checkEmpty()
41+
delay(520)
42+
delayChannel.checkNotEmpty()
43+
delayChannel.cancel()
44+
}
4145
}
4246

4347
@Test
44-
fun testDelayChannelBackpressure2() = runTest {
45-
val delayChannel = ticker(delay = 100, initialDelay = 0)
46-
delayChannel.checkNotEmpty()
47-
delayChannel.checkEmpty()
48-
49-
delay(250)
50-
delayChannel.checkNotEmpty()
51-
delay(51)
52-
delayChannel.checkNotEmpty()
53-
delay(51)
54-
delayChannel.checkEmpty()
55-
delay(51)
56-
delayChannel.checkNotEmpty()
57-
delayChannel.cancel()
48+
fun testDelayChannelBackpressure2() = withVirtualTimeSource {
49+
runTest {
50+
val delayChannel = ticker(delay = 1000, initialDelay = 0)
51+
delayChannel.checkNotEmpty()
52+
delayChannel.checkEmpty()
53+
54+
delay(2500)
55+
delayChannel.checkNotEmpty()
56+
delay(510)
57+
delayChannel.checkNotEmpty()
58+
delay(510)
59+
delayChannel.checkEmpty()
60+
delay(510)
61+
delayChannel.checkNotEmpty()
62+
delayChannel.cancel()
63+
}
5864
}
5965
}

0 commit comments

Comments
 (0)