Skip to content

Commit bc296bb

Browse files
committed
Throttle fast senders in ChannelSendReceiveStressTest to prevent OOM with LinkedListChannel stress test
1 parent cbd8e40 commit bc296bb

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,13 @@ class ChannelSendReceiveStressTest(
4646
val timeLimit = 30_000L * stressTestMultiplier // 30 sec
4747
val nEvents = 1_000_000 * stressTestMultiplier
4848

49+
val maxBuffer = 10_000 // artifical limit for LinkedListChannel
50+
4951
val channel = kind.create()
5052
val sendersCompleted = AtomicInteger()
5153
val receiversCompleted = AtomicInteger()
5254
val dupes = AtomicInteger()
55+
val sentTotal = AtomicInteger()
5356
val received = AtomicIntegerArray(nEvents)
5457
val receivedTotal = AtomicInteger()
5558
val receivedBy = IntArray(nReceivers)
@@ -90,6 +93,7 @@ class ChannelSendReceiveStressTest(
9093
println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
9194
println("Completed successfully ${sendersCompleted.get()} sender coroutines")
9295
println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
96+
println(" Sent ${sentTotal.get()} events")
9397
println(" Received ${receivedTotal.get()} events")
9498
println(" Received dupes ${dupes.get()}")
9599
repeat(nReceivers) { receiveIndex ->
@@ -98,20 +102,31 @@ class ChannelSendReceiveStressTest(
98102
assertEquals(nSenders, sendersCompleted.get())
99103
assertEquals(nReceivers, receiversCompleted.get())
100104
assertEquals(0, dupes.get())
105+
assertEquals(nEvents, sentTotal.get())
101106
assertEquals(nEvents, receivedTotal.get())
102107
repeat(nReceivers) { receiveIndex ->
103108
assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
104109
}
105110
}
106111

112+
private suspend fun doSent() {
113+
sentTotal.incrementAndGet()
114+
while (sentTotal.get() > receivedTotal.get() + maxBuffer)
115+
yield() // throttle fast senders to prevent OOM with LinkedListChannel
116+
}
117+
107118
private suspend fun doSend(senderIndex: Int) {
108-
for (i in senderIndex until nEvents step nSenders)
119+
for (i in senderIndex until nEvents step nSenders) {
109120
channel.send(i)
121+
doSent()
122+
}
110123
}
111124

112125
private suspend fun doSendSelect(senderIndex: Int) {
113-
for (i in senderIndex until nEvents step nSenders)
126+
for (i in senderIndex until nEvents step nSenders) {
114127
select<Unit> { channel.onSend(i) { Unit } }
128+
doSent()
129+
}
115130
}
116131

117132
private fun doReceived(receiverIndex: Int, event: Int) {

0 commit comments

Comments
 (0)