Skip to content

Commit 5ea2c05

Browse files
committed
Add ChannelLFStressTest
It tests lock-freedom on send and receive operations on rendezvous and conflated channels. The test is comprehensive enough to fail on buffered channels (which are currently not lock-free) and will help us ensuring that lock-freedom property is not lost while channels are being improved and refactored.
1 parent 2907df9 commit 5ea2c05

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

gradle.properties

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
#
2+
# Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
#
4+
15
# Kotlin
26
version=1.3.0-SNAPSHOT
37
group=org.jetbrains.kotlinx
48
kotlin_version=1.3.50
59

610
# Dependencies
711
junit_version=4.12
8-
atomicfu_version=0.12.10
12+
atomicfu_version=0.12.11
913
html_version=0.6.8
1014
lincheck_version=2.0
1115
dokka_version=0.9.16-rdev-2-mpp-hacks
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.channels
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import java.util.concurrent.atomic.AtomicLong
10+
import java.util.concurrent.atomic.AtomicLongArray
11+
import kotlin.math.*
12+
import kotlin.test.*
13+
14+
/**
15+
* Tests lock-freedom of send and receive operations on rendezvous and conflated channels.
16+
* There is a single channel with two sender and two receiver threads.
17+
* When one sender or receiver gets suspended at most one other operation is allowed to cease having progress
18+
* (`allowSuspendedThreads = 1`).
19+
*
20+
* **Note**: In the current implementation buffered channels are not lock-free, so this test would fail
21+
* if channel is created with a buffer.
22+
*/
23+
class ChannelLFStressTest : TestBase() {
24+
private val nSeconds = 5 * stressTestMultiplier
25+
private val env = LockFreedomTestEnvironment("ChannelLFStressTest", allowSuspendedThreads = 1)
26+
private lateinit var channel: Channel<Long>
27+
28+
private val sendIndex = AtomicLong()
29+
private val receiveCount = AtomicLong()
30+
private val duplicateCount = AtomicLong()
31+
32+
private val nCheckedSize = 10_000_000
33+
private val nChecked = (nCheckedSize * Long.SIZE_BITS).toLong()
34+
private val receivedBits = AtomicLongArray(nCheckedSize) // bit set of received values
35+
36+
@Test
37+
fun testRendezvousLockFreedom() {
38+
channel = Channel()
39+
performLockFreedomTest()
40+
// ensure that all sent were received
41+
checkAllReceived()
42+
}
43+
44+
@Test
45+
fun testConflatedLockFreedom() {
46+
// This test does not really verify that all sent elements were received
47+
// and checks only LF property
48+
channel = Channel(Channel.CONFLATED)
49+
performLockFreedomTest()
50+
}
51+
52+
private fun performLockFreedomTest() {
53+
env.onCompletion { channel.close() }
54+
repeat(2) { env.testThread { sender() } }
55+
repeat(2) { env.testThread { receiver() } }
56+
env.performTest(nSeconds) {
57+
println("Sent: $sendIndex, Received: $receiveCount, dups: $duplicateCount")
58+
}
59+
// ensure no duplicates
60+
assertEquals(0L, duplicateCount.get())
61+
}
62+
63+
private fun checkAllReceived() {
64+
for (i in 0 until min(sendIndex.get(), nChecked)) {
65+
assertTrue(isReceived(i))
66+
}
67+
}
68+
69+
private suspend fun sender() {
70+
val value = sendIndex.getAndIncrement()
71+
try {
72+
channel.send(value)
73+
} catch (e: ClosedSendChannelException) {
74+
check(env.isCompleted) // expected when test was completed
75+
markReceived(value) // fake received (actually failed to send)
76+
}
77+
}
78+
79+
private suspend fun receiver() {
80+
val value = try {
81+
channel.receive()
82+
} catch (e: ClosedReceiveChannelException) {
83+
check(env.isCompleted) // expected when test was completed
84+
return
85+
}
86+
receiveCount.incrementAndGet()
87+
markReceived(value)
88+
}
89+
90+
private fun markReceived(value: Long) {
91+
if (value >= nChecked) return // too big
92+
val index = (value / Long.SIZE_BITS).toInt()
93+
val mask = 1L shl (value % Long.SIZE_BITS).toInt()
94+
while (true) {
95+
val bits = receivedBits.get(index)
96+
if (bits and mask != 0L) {
97+
duplicateCount.incrementAndGet()
98+
break
99+
}
100+
if (receivedBits.compareAndSet(index, bits, bits or mask)) break
101+
}
102+
}
103+
104+
private fun isReceived(value: Long): Boolean {
105+
val index = (value / Long.SIZE_BITS).toInt()
106+
val mask = 1L shl (value % Long.SIZE_BITS).toInt()
107+
val bits = receivedBits.get(index)
108+
return bits and mask != 0L
109+
}
110+
}

0 commit comments

Comments
 (0)