Skip to content

Commit 98b7a6e

Browse files
committed
Fixed Channel.close hand on double close with pending send
1 parent 9ff3264 commit 98b7a6e

File tree

3 files changed

+84
-1
lines changed

3 files changed

+84
-1
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,10 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
248248
val receive = takeFirstReceiveOrPeekClosed()
249249
if (receive == null) {
250250
// queue empty or has only senders -- try add last "Closed" item to the queue
251-
if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed<*> })) {
251+
if (queue.addLastIfPrev(closed, { prev ->
252+
if (prev is Closed<*>) return false // already closed
253+
prev !is ReceiveOrClosed<*> // only add close if no waiting receive
254+
})) {
252255
onClosed(closed)
253256
afterClose(cause)
254257
return true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.*
20+
import org.junit.Test
21+
22+
class DoubleChannelCloseStressTest : TestBase() {
23+
val nTimes = 1000 * stressTestMultiplier
24+
25+
@Test
26+
fun testDoubleCloseStress() {
27+
repeat(nTimes) {
28+
val actor = actor<Int>(CommonPool + CoroutineName("actor"), start = CoroutineStart.LAZY) {
29+
// empty -- just closes channel
30+
}
31+
launch(CommonPool + CoroutineName("sender")) {
32+
actor.send(1)
33+
}
34+
Thread.sleep(1)
35+
actor.close()
36+
}
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import junit.framework.Assert.assertFalse
20+
import junit.framework.Assert.assertTrue
21+
import kotlinx.coroutines.experimental.TestBase
22+
import kotlinx.coroutines.experimental.runBlocking
23+
import org.hamcrest.MatcherAssert.assertThat
24+
import org.hamcrest.core.IsEqual
25+
import org.hamcrest.core.IsNull
26+
import org.junit.Test
27+
28+
class LinkedListChannelTest : TestBase() {
29+
@Test
30+
fun testBasic() = runBlocking {
31+
val c = LinkedListChannel<Int>()
32+
c.send(1)
33+
assertTrue(c.offer(2))
34+
c.send(3)
35+
assertTrue(c.close())
36+
assertFalse(c.close())
37+
assertThat(c.receive(), IsEqual(1))
38+
assertThat(c.poll(), IsEqual(2))
39+
assertThat(c.receiveOrNull(), IsEqual(3))
40+
assertThat(c.receiveOrNull(), IsNull())
41+
}
42+
}

0 commit comments

Comments
 (0)