Skip to content

Commit 45c8565

Browse files
committed
Fixed integer overflow in BroadcastChannelXXXStressTest (use long)
1 parent f692d50 commit 45c8565

File tree

3 files changed

+25
-25
lines changed

3 files changed

+25
-25
lines changed

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.junit.Test
2323
import org.junit.runner.RunWith
2424
import org.junit.runners.Parameterized
2525
import java.util.concurrent.TimeUnit
26-
import java.util.concurrent.atomic.AtomicInteger
26+
import java.util.concurrent.atomic.AtomicLong
2727

2828
/**
2929
* Tests delivery of events to multiple broadcast channel subscribers.
@@ -42,13 +42,13 @@ class BroadcastChannelMultiReceiveStressTest(
4242
private val nReceivers = if (isStressTest) 10 else 5
4343
private val nSeconds = 5 * stressTestMultiplier
4444

45-
private val broadcast = kind.create()
45+
private val broadcast = kind.create<Long>()
4646
private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
4747

48-
private val sentTotal = AtomicInteger()
49-
private val receivedTotal = AtomicInteger()
50-
private val stopOnReceive = AtomicInteger(-1)
51-
private val lastReceived = Array(nReceivers) { AtomicInteger(-1) }
48+
private val sentTotal = AtomicLong()
49+
private val receivedTotal = AtomicLong()
50+
private val stopOnReceive = AtomicLong(-1)
51+
private val lastReceived = Array(nReceivers) { AtomicLong(-1) }
5252

5353
@After
5454
fun tearDown() {
@@ -60,7 +60,7 @@ class BroadcastChannelMultiReceiveStressTest(
6060
val ctx = pool + coroutineContext[Job]!!
6161
val sender =
6262
launch(context = ctx + CoroutineName("Sender")) {
63-
var i = 0
63+
var i = 0L
6464
while (isActive) {
6565
broadcast.send(++i)
6666
sentTotal.set(i) // set sentTotal only if `send` was not cancelled
@@ -114,17 +114,17 @@ class BroadcastChannelMultiReceiveStressTest(
114114
println(" Received ${receivedTotal.get()} events")
115115
}
116116

117-
private fun doReceived(receiverIndex: Int, i: Int): Boolean {
117+
private fun doReceived(receiverIndex: Int, i: Long): Boolean {
118118
val last = lastReceived[receiverIndex].get()
119119
check(i > last) { "Last was $last, got $i" }
120-
if (last != -1 && !kind.isConflated)
120+
if (last != -1L && !kind.isConflated)
121121
check(i == last + 1) { "Last was $last, got $i" }
122122
receivedTotal.incrementAndGet()
123123
lastReceived[receiverIndex].set(i)
124124
return i == stopOnReceive.get()
125125
}
126126

127-
private suspend fun doReceive(channel: ReceiveChannel<Int>, receiverIndex: Int) {
127+
private suspend fun doReceive(channel: ReceiveChannel<Long>, receiverIndex: Int) {
128128
while (true) {
129129
try {
130130
val stop = doReceived(receiverIndex, channel.receive())
@@ -134,33 +134,33 @@ class BroadcastChannelMultiReceiveStressTest(
134134
}
135135
}
136136

137-
private suspend fun doReceiveOrNull(channel: ReceiveChannel<Int>, receiverIndex: Int) {
137+
private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
138138
while (true) {
139139
val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
140140
if (stop) break
141141
}
142142
}
143143

144-
private suspend fun doIterator(channel: ReceiveChannel<Int>, receiverIndex: Int) {
144+
private suspend fun doIterator(channel: ReceiveChannel<Long>, receiverIndex: Int) {
145145
for (event in channel) {
146146
val stop = doReceived(receiverIndex, event)
147147
if (stop) break
148148
}
149149
}
150150

151-
private suspend fun doReceiveSelect(channel: ReceiveChannel<Int>, receiverIndex: Int) {
151+
private suspend fun doReceiveSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
152152
while (true) {
153153
try {
154-
val event = select<Int> { channel.onReceive { it } }
154+
val event = select<Long> { channel.onReceive { it } }
155155
val stop = doReceived(receiverIndex, event)
156156
if (stop) break
157157
} catch (ex: ClosedReceiveChannelException) { break }
158158
}
159159
}
160160

161-
private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Int>, receiverIndex: Int) {
161+
private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
162162
while (true) {
163-
val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break
163+
val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
164164
val stop = doReceived(receiverIndex, event)
165165
if (stop) break
166166
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.junit.Test
2121
import org.junit.runner.RunWith
2222
import org.junit.runners.Parameterized
2323
import java.util.concurrent.TimeUnit
24-
import java.util.concurrent.atomic.AtomicInteger
24+
import java.util.concurrent.atomic.AtomicLong
2525

2626
/**
2727
* Creates a broadcast channel and repeatedly opens new subscription, receives event, closes it,
@@ -40,10 +40,10 @@ class BroadcastChannelSubStressTest(
4040
}
4141

4242
private val nSeconds = 5 * stressTestMultiplier
43-
private val broadcast = kind.create()
43+
private val broadcast = kind.create<Long>()
4444

45-
private val sentTotal = AtomicInteger()
46-
private val receivedTotal = AtomicInteger()
45+
private val sentTotal = AtomicLong()
46+
private val receivedTotal = AtomicLong()
4747

4848
@Test
4949
fun testStress() = runBlocking {
@@ -56,7 +56,7 @@ class BroadcastChannelSubStressTest(
5656
}
5757
val receiver =
5858
launch(context = ctx + CoroutineName("Receiver")) {
59-
var last = -1
59+
var last = -1L
6060
while (isActive) {
6161
broadcast.openSubscription().use { sub ->
6262
val i = sub.receive()

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,20 @@ package kotlinx.coroutines.experimental.channels
1818

1919
enum class TestBroadcastChannelKind {
2020
ARRAY_1 {
21-
override fun create(): BroadcastChannel<Int> = ArrayBroadcastChannel<Int>(1)
21+
override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel<T>(1)
2222
override fun toString(): String = "ArrayBroadcastChannel(1)"
2323
},
2424
ARRAY_10 {
25-
override fun create(): BroadcastChannel<Int> = ArrayBroadcastChannel<Int>(10)
25+
override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel<T>(10)
2626
override fun toString(): String = "ArrayBroadcastChannel(10)"
2727
},
2828
CONFLATED {
29-
override fun create(): BroadcastChannel<Int> = ConflatedBroadcastChannel<Int>()
29+
override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel<T>()
3030
override fun toString(): String = "ConflatedBroadcastChannel"
3131
override val isConflated: Boolean get() = true
3232
}
3333
;
3434

35-
abstract fun create(): BroadcastChannel<Int>
35+
abstract fun <T> create(): BroadcastChannel<T>
3636
open val isConflated: Boolean get() = false
3737
}

0 commit comments

Comments
 (0)