Skip to content

Commit 9525f3c

Browse files
committed
Fixed send/openSubscription race in ArrayBroadcastChannel
This race lead to stalled (hanged) send/receive invocation.
1 parent faf53c7 commit 9525f3c

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,10 @@ class ArrayBroadcastChannel<E>(
7070
override val isBufferAlwaysFull: Boolean get() = false
7171
override val isBufferFull: Boolean get() = size >= capacity
7272

73-
override fun openSubscription(): SubscriptionReceiveChannel<E> {
74-
bufferLock.withLock {
75-
val sub = Subscriber(this, tail)
76-
subs.add(sub)
77-
return sub
73+
override fun openSubscription(): SubscriptionReceiveChannel<E> =
74+
Subscriber(this).also {
75+
updateHead(addSub = it)
7876
}
79-
}
8077

8178
override fun close(cause: Throwable?): Boolean {
8279
if (!super.close(cause)) return false
@@ -131,14 +128,22 @@ class ArrayBroadcastChannel<E>(
131128
if (sub.checkOffer()) updated = true
132129
}
133130
if (updated || !hasSubs)
134-
updateHead(null)
131+
updateHead()
135132
}
136133

137-
private tailrec fun updateHead(removeSub: Subscriber<E>?) {
134+
// updates head if needed and optionally adds / removes subscriber under the same lock
135+
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
136+
assert(addSub == null || removeSub == null) // only one of them can be specified
138137
// update head in a tail rec loop
139138
var send: Send? = null
140139
var token: Any? = null
141140
bufferLock.withLock {
141+
if (addSub != null) {
142+
addSub.subHead = tail // start from last element
143+
val wasEmpty = subs.isEmpty()
144+
subs.add(addSub)
145+
if (!wasEmpty) return // no need to update when adding second and etc sub
146+
}
142147
if (removeSub != null) {
143148
subs.remove(removeSub)
144149
if (head != removeSub.subHead) return // no need to update
@@ -178,7 +183,7 @@ class ArrayBroadcastChannel<E>(
178183
// since we've just sent an element, we might need to resume some receivers
179184
checkSubOffers()
180185
// tailrec call to recheck
181-
updateHead(null)
186+
updateHead()
182187
}
183188

184189
private fun computeMinHead(): Long {
@@ -192,11 +197,13 @@ class ArrayBroadcastChannel<E>(
192197
private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
193198

194199
private class Subscriber<E>(
195-
private val broadcastChannel: ArrayBroadcastChannel<E>,
196-
@Volatile @JvmField var subHead: Long // guarded by subLock
200+
private val broadcastChannel: ArrayBroadcastChannel<E>
197201
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
198202
private val subLock = ReentrantLock()
199203

204+
@Volatile @JvmField
205+
var subHead: Long = 0 // guarded by subLock
206+
200207
override val isBufferAlwaysEmpty: Boolean get() = false
201208
override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
202209
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
@@ -272,7 +279,7 @@ class ArrayBroadcastChannel<E>(
272279
updated = true
273280
// and finally update broadcast's channel head if needed
274281
if (updated)
275-
broadcastChannel.updateHead(null)
282+
broadcastChannel.updateHead()
276283
return result
277284
}
278285

@@ -306,7 +313,7 @@ class ArrayBroadcastChannel<E>(
306313
updated = true
307314
// and finally update broadcast's channel head if needed
308315
if (updated)
309-
broadcastChannel.updateHead(null)
316+
broadcastChannel.updateHead()
310317
return result
311318
}
312319

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,13 @@ class BroadcastChannelSubStressTest(
6868
}
6969
}
7070
}
71+
var prevSent = -1L
7172
repeat(nSeconds) { sec ->
7273
delay(1000)
73-
println("${sec + 1}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}")
74+
val curSent = sentTotal.get()
75+
println("${sec + 1}: Sent $curSent, received ${receivedTotal.get()}")
76+
check(curSent > prevSent) { "Send stalled at $curSent events" }
77+
prevSent = curSent
7478
}
7579
withTimeout(5, TimeUnit.SECONDS) {
7680
sender.cancelAndJoin()

0 commit comments

Comments
 (0)