Skip to content

Commit 324611b

Browse files
author
Sergey Mashkov
committed
IO: faster delegation in joined channels
1 parent 373d590 commit 324611b

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,9 @@ internal class ByteBufferChannel(
10001000
}
10011001

10021002
suspend override fun writeAvailable(src: ByteBuffer): Int {
1003+
val delegated = resolveDelegation(this)
1004+
if (delegated !== this) return delegated.writeAvailable(src)
1005+
10031006
val copied = writeAsMuchAsPossible(src)
10041007
if (copied > 0) return copied
10051008

@@ -1032,6 +1035,9 @@ internal class ByteBufferChannel(
10321035
}
10331036

10341037
suspend override fun writeFully(src: ByteBuffer) {
1038+
val delegated = resolveDelegation(this)
1039+
if (delegated !== this) return delegated.writeFully(src)
1040+
10351041
writeAsMuchAsPossible(src)
10361042
if (!src.hasRemaining()) return
10371043

@@ -1101,10 +1107,13 @@ internal class ByteBufferChannel(
11011107
if (limit == 0L) return 0L
11021108
if (src.isClosedForRead) {
11031109
if (joined != null) {
1104-
check(tryCompleteJoining(joined))
1110+
check(src.tryCompleteJoining(joined))
11051111
}
11061112
return 0L
11071113
}
1114+
if (joined != null && src.tryCompleteJoining(joined)) {
1115+
return 0L
1116+
}
11081117

11091118
val autoFlush = autoFlush
11101119
val byteOrder = writeByteOrder
@@ -1161,9 +1170,6 @@ internal class ByteBufferChannel(
11611170
}
11621171
}
11631172

1164-
// println("flush")
1165-
flush()
1166-
11671173
if (joined != null) {
11681174
if (src.tryCompleteJoining(joined)) break
11691175
else if (src.state.capacity.flush()) { // force flush src to read-up all the bytes
@@ -1176,7 +1182,9 @@ internal class ByteBufferChannel(
11761182
if (copied >= limit) break
11771183

11781184
// println("readSuspend?")
1179-
if (!src.readSuspend(1)) {
1185+
flush()
1186+
1187+
if (src.availableForRead == 0 && !src.readSuspend(1)) {
11801188
// println("readSuspend failed")
11811189
if (joined == null || src.tryCompleteJoining(joined)) break
11821190
}
@@ -1292,6 +1300,9 @@ internal class ByteBufferChannel(
12921300
}
12931301

12941302
suspend override fun writeFully(src: ByteArray, offset: Int, length: Int) {
1303+
val delegated = resolveDelegation(this)
1304+
if (delegated !== this) return delegated.writeFully(src, offset, length)
1305+
12951306
var rem = length
12961307
var off = offset
12971308

@@ -1315,6 +1326,9 @@ internal class ByteBufferChannel(
13151326
}
13161327

13171328
suspend override fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
1329+
val delegated = resolveDelegation(this)
1330+
if (delegated !== this) return delegated.writeAvailable(src, offset, length)
1331+
13181332
val size = writeAsMuchAsPossible(src, offset, length)
13191333
if (size > 0) return size
13201334
return writeSuspend(src, offset, length)
@@ -1392,6 +1406,9 @@ internal class ByteBufferChannel(
13921406
}
13931407

13941408
suspend override fun writePacket(packet: ByteReadPacket) {
1409+
val delegated = resolveDelegation(this)
1410+
if (delegated !== this) return delegated.writePacket(packet)
1411+
13951412
try {
13961413
while (!packet.isEmpty) {
13971414
if (tryWritePacketPart(packet) == 0) break

0 commit comments

Comments
 (0)