Skip to content

Commit 9e9929d

Browse files
author
Sergey Mashkov
committed
IO: fix joining of already closed but not empty channel
1 parent f75ec15 commit 9e9929d

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ internal class ByteBufferChannel(
230230
val alreadyClosed = closed
231231
if (alreadyClosed != null) {
232232
if (alreadyClosed.cause != null) delegate.close(alreadyClosed.cause)
233-
else if (delegateClose) delegate.close()
233+
else if (delegateClose && state === ReadWriteBufferState.Terminated) delegate.close()
234234
else delegate.flush()
235235
} else {
236236
flush()
@@ -1092,13 +1092,20 @@ internal class ByteBufferChannel(
10921092
if (delegateClose) close(src.closed!!.cause)
10931093
return
10941094
}
1095-
closed?.let { closed -> throw closed.sendException }
1095+
closed?.let { closed ->
1096+
if (src.closed == null) throw closed.sendException
1097+
return
1098+
}
1099+
1100+
val joined = src.setupDelegateTo(this, delegateClose)
1101+
if (src.tryCompleteJoining(joined)) {
1102+
return src.awaitClose()
1103+
}
10961104

1097-
return joinFromSuspend(src, delegateClose)
1105+
return joinFromSuspend(src, delegateClose, joined)
10981106
}
10991107

1100-
private suspend fun joinFromSuspend(src: ByteBufferChannel, delegateClose: Boolean) {
1101-
val joined = src.setupDelegateTo(this, delegateClose)
1108+
private suspend fun joinFromSuspend(src: ByteBufferChannel, delegateClose: Boolean, joined: JoiningState) {
11021109
copyDirect(src, Long.MAX_VALUE, joined)
11031110

11041111
if (delegateClose && src.isClosedForRead) {

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,19 @@ class ByteBufferChannelTest {
845845
ch.close()
846846
}
847847

848+
@Test
849+
fun testJoinToClosed() = runBlocking<Unit> {
850+
val other = ByteBufferChannel(autoFlush = false, pool = pool)
851+
852+
ch.writeInt(0x11223344)
853+
ch.close()
854+
855+
ch.joinTo(other, true)
856+
yield()
857+
858+
assertEquals(0x11223344, other.readInt())
859+
assertTrue { other.isClosedForRead }
860+
}
848861

849862
@Test
850863
fun testReadThenRead() = runBlocking<Unit> {

0 commit comments

Comments
 (0)