Skip to content

Commit 921b0cf

Browse files
committed
ArrayBroadcastChannel.send should forget values immediately w/o subs
1 parent 932e860 commit 921b0cf

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import kotlin.concurrent.withLock
2424

2525
/**
2626
* Broadcast channel with array buffer of a fixed [capacity].
27-
* Sender suspends only when buffer is fully due to one of the receives not being late and
27+
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
2828
* receiver suspends only when buffer is empty.
2929
*
3030
* Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
@@ -128,11 +128,13 @@ class ArrayBroadcastChannel<E>(
128128

129129
private fun checkSubOffers() {
130130
var updated = false
131+
var hasSubs = false
131132
@Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
132133
for (sub in subs) {
134+
hasSubs = true
133135
if (sub.checkOffer()) updated = true
134136
}
135-
if (updated)
137+
if (updated || !hasSubs)
136138
updateHead()
137139
}
138140

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental.channels
1919
import kotlinx.coroutines.experimental.*
2020
import org.hamcrest.core.IsEqual
2121
import org.hamcrest.core.IsNull
22-
import org.junit.Assert.*
22+
import org.junit.Assert.assertThat
2323
import org.junit.Test
2424

2525
class ArrayBroadcastChannelTest : TestBase() {
@@ -112,4 +112,24 @@ class ArrayBroadcastChannelTest : TestBase() {
112112
assertThat(sub.isClosedForReceive, IsEqual(true))
113113
finish(7)
114114
}
115+
116+
@Test
117+
fun testForgetUnsubscribed() = runBlocking {
118+
expect(1)
119+
val broadcast = ArrayBroadcastChannel<Int>(1)
120+
broadcast.send(1)
121+
broadcast.send(2)
122+
broadcast.send(3)
123+
expect(2) // should not suspend anywhere above
124+
val sub = broadcast.open()
125+
launch(context, CoroutineStart.UNDISPATCHED) {
126+
expect(3)
127+
assertThat(sub.receive(), IsEqual(4)) // suspends
128+
expect(5)
129+
}
130+
expect(4)
131+
broadcast.send(4) // sends
132+
yield()
133+
finish(6)
134+
}
115135
}

0 commit comments

Comments
 (0)