Skip to content

Commit a26b790

Browse files
author
Sergey Mashkov
committed
IO: propagate channel cancellation to controller coroutine
1 parent eee3aaf commit a26b790

File tree

5 files changed

+47
-4
lines changed

5 files changed

+47
-4
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package kotlinx.coroutines.experimental.io
55
import kotlinx.atomicfu.*
66
import kotlinx.coroutines.experimental.*
77
import kotlinx.coroutines.experimental.channels.*
8+
import kotlinx.coroutines.experimental.internal.*
89
import kotlinx.coroutines.experimental.io.internal.*
910
import kotlinx.coroutines.experimental.io.packet.*
1011
import kotlinx.io.core.*
@@ -50,6 +51,17 @@ internal class ByteBufferChannel(
5051
private var readPosition = 0
5152
private var writePosition = 0
5253

54+
@Volatile
55+
private var attachedJob: Job? = null
56+
57+
internal fun attachJob(job: Job) {
58+
attachedJob?.cancel()
59+
attachedJob = job
60+
job.invokeOnCompletion {
61+
attachedJob = null
62+
}
63+
}
64+
5365
override var readByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
5466
override var writeByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
5567
set(newOrder) {
@@ -92,10 +104,12 @@ internal class ByteBufferChannel(
92104
joining?.let { ensureClosedJoined(it) }
93105
}
94106

107+
if (cause != null) attachedJob?.cancel(cause)
108+
95109
return true
96110
}
97111

98-
suspend override fun cancel(cause: Throwable?): Boolean {
112+
override fun cancel(cause: Throwable?): Boolean {
99113
return close(cause ?: CancellationException("Channel has been cancelled"))
100114
}
101115

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public interface ByteReadChannel {
145145
*/
146146
suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
147147

148-
suspend fun cancel(cause: Throwable?): Boolean
148+
fun cancel(cause: Throwable? = null): Boolean
149149
}
150150

151151
suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean) {

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ fun reader(coroutineContext: CoroutineContext,
3434
fun reader(coroutineContext: CoroutineContext,
3535
autoFlush: Boolean = false,
3636
parent: Job? = null,
37-
block: suspend ReaderScope.() -> Unit): ReaderJob = reader(coroutineContext, ByteChannel(autoFlush), parent, block)
37+
block: suspend ReaderScope.() -> Unit): ReaderJob {
38+
val channel = ByteChannel(autoFlush) as ByteBufferChannel
39+
val job = reader(coroutineContext, channel, parent, block)
40+
channel.attachJob(job)
41+
return job
42+
}
3843

3944
private class ReaderCoroutine(context: CoroutineContext, channel: ByteChannel)
4045
: ByteChannelCoroutine(context, channel), ReaderJob, ReaderScope

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ fun writer(coroutineContext: CoroutineContext,
3434
fun writer(coroutineContext: CoroutineContext,
3535
autoFlush: Boolean = false,
3636
parent: Job? = null,
37-
block: suspend WriterScope.() -> Unit): WriterJob = writer(coroutineContext, ByteChannel(autoFlush), parent, block)
37+
block: suspend WriterScope.() -> Unit): WriterJob {
38+
val channel = ByteChannel(autoFlush) as ByteBufferChannel
39+
val job = writer(coroutineContext, channel, parent, block)
40+
channel.attachJob(job)
41+
return job
42+
}
3843

3944
private class WriterCoroutine(ctx: CoroutineContext, channel: ByteChannel)
4045
: ByteChannelCoroutine(ctx, channel), WriterScope, WriterJob

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,25 @@ class ByteBufferChannelTest {
10881088
}
10891089
}
10901090

1091+
@Test
1092+
fun testCancelWriter() = runBlocking {
1093+
val sub = writer(DefaultDispatcher) {
1094+
delay(1000000L)
1095+
}
1096+
1097+
sub.channel.cancel()
1098+
sub.join()
1099+
}
1100+
1101+
@Test
1102+
fun testCancelReader() = runBlocking {
1103+
val sub = reader(DefaultDispatcher) {
1104+
delay(10000000L)
1105+
}
1106+
1107+
sub.channel.close(CancellationException())
1108+
sub.join()
1109+
}
10911110

10921111
private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket {
10931112
val builder = BytePacketBuilder(0, pktPool)

0 commit comments

Comments
 (0)