Skip to content

Commit 7510117

Browse files
author
Sergey Mashkov
committed
IO: add BufferView internal, add ByteChannel read and write with lambda
1 parent 5804c37 commit 7510117

File tree

18 files changed

+1260
-931
lines changed

18 files changed

+1260
-931
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 141 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ package kotlinx.coroutines.experimental.io
44

55
import kotlinx.coroutines.experimental.CancellableContinuation
66
import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
7+
import kotlinx.coroutines.experimental.io.buffers.*
78
import kotlinx.coroutines.experimental.io.internal.*
89
import kotlinx.coroutines.experimental.io.packet.*
910
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
11+
import java.io.*
1012
import java.nio.BufferOverflowException
1113
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
1214

@@ -370,26 +372,60 @@ internal class ByteBufferChannel(
370372
suspend override fun readPacket(size: Int, headerSizeHint: Int): ByteReadPacket {
371373
closed?.cause?.let { throw it }
372374

373-
if (size == 0) return ByteReadPacketEmpty
375+
if (size == 0) return ByteReadPacketViewBased.Empty
374376

375-
val builder = ByteWritePacketImpl(headerSizeHint, BufferPool)
377+
val builder = ByteWritePacketImpl(headerSizeHint, BufferView.Pool)
376378
val buffer = BufferPool.borrow()
379+
var remaining = size
380+
381+
try {
382+
while (remaining > 0) {
383+
buffer.clear()
384+
if (buffer.remaining() > remaining) {
385+
buffer.limit(remaining)
386+
}
387+
388+
val rc = readAsMuchAsPossible(buffer)
389+
if (rc == 0) break
390+
391+
buffer.flip()
392+
builder.writeFully(buffer)
393+
394+
remaining -= rc
395+
}
396+
} catch (t: Throwable) {
397+
BufferPool.recycle(buffer)
398+
builder.release()
399+
throw t
400+
}
401+
402+
return if (remaining == 0) {
403+
BufferPool.recycle(buffer)
404+
builder.build()
405+
} else {
406+
readPacketSuspend(remaining, builder, buffer)
407+
}
408+
}
409+
410+
private suspend fun readPacketSuspend(size: Int, builder: ByteWritePacketImpl, buffer: ByteBuffer): ByteReadPacket {
411+
var remaining = size
377412

378413
try {
379-
var remaining = size
380414
while (remaining > 0) {
381415
buffer.clear()
382416
if (buffer.remaining() > remaining) {
383417
buffer.limit(remaining)
384418
}
385419

386420
val rc = readFully(buffer)
421+
387422
buffer.flip()
388423
builder.writeFully(buffer)
389424

390425
remaining -= rc
391426
}
392427

428+
393429
return builder.build()
394430
} catch (t: Throwable) {
395431
builder.release()
@@ -837,82 +873,125 @@ internal class ByteBufferChannel(
837873
closed?.sendException?.let { packet.release(); throw it }
838874

839875
when (packet) {
840-
is ByteReadPacketEmpty -> return
841-
is ByteReadPacketSingle -> writeSingleBufferPacket(packet)
842-
is ByteReadPacketImpl -> writeMultipleBufferPacket(packet)
876+
ByteReadPacketEmpty -> return
877+
is ByteReadPacketViewBased -> writeViewBasedPacket(packet)
843878
else -> writeExternalPacket(packet)
844879
}
845880
}
846881

847-
private suspend fun writeSingleBufferPacket(packet: ByteReadPacketSingle) {
848-
val buffer = packet.steal()
849-
val t = try {
850-
writeAsMuchAsPossible(buffer)
851-
null
852-
} catch (t: Throwable) {
853-
t
854-
}
882+
override suspend fun write(min: Int, block: (ByteBuffer) -> Unit) {
883+
require(min > 0) { "min should be positive"}
855884

856-
if (t != null) {
857-
packet.pool.recycle(buffer)
858-
throw t
885+
var written = false
886+
887+
writing {
888+
if (it.availableForWrite >= min) {
889+
val position = this.position()
890+
val l = this.limit()
891+
block(this)
892+
if (l != this.limit()) throw IllegalStateException("buffer limit modified")
893+
val delta = position() - position
894+
if (delta < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
895+
896+
if (!it.tryWriteExact(delta)) throw IllegalStateException()
897+
bytesWritten(it, delta)
898+
written = true
899+
}
859900
}
860901

861-
if (buffer.hasRemaining()) {
862-
return writeSingleBufferPacketSuspend(buffer, packet)
902+
if (!written) {
903+
return writeBlockSuspend(min, block)
863904
}
905+
}
864906

865-
packet.pool.recycle(buffer)
907+
private suspend fun writeBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
908+
writeSuspend(min)
909+
write(min, block)
866910
}
867911

868-
private suspend fun writeMultipleBufferPacket(packet: ByteReadPacketImpl) {
869-
var buffer: ByteBuffer? = null
912+
override suspend fun read(min: Int, block: (ByteBuffer) -> Unit) {
913+
require(min > 0) { "min should be positive" }
870914

871-
val t = try {
872-
while (packet.remaining > 0) {
873-
buffer = packet.steal()
874-
writeAsMuchAsPossible(buffer)
875-
if (buffer.hasRemaining()) break
876-
packet.pool.recycle(buffer)
877-
buffer = null
878-
}
879-
null
880-
} catch (t: Throwable) { t }
915+
val read = reading {
916+
if (it.availableForRead >= min) {
917+
val position = this.position()
918+
val l = this.limit()
919+
block(this)
920+
if (l != this.limit()) throw IllegalStateException("buffer limit modified")
921+
val delta = position() - position
922+
if (delta < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
881923

882-
if (t != null) {
883-
buffer?.let { packet.pool.recycle(it) }
884-
packet.release()
885-
throw t
924+
if (!it.tryReadExact(delta)) throw IllegalStateException()
925+
bytesRead(it, delta)
926+
true
927+
}
928+
else false
886929
}
887930

888-
if (buffer != null) {
889-
return writeMultipleBufferPacketSuspend(buffer, packet)
931+
if (!read) {
932+
readBlockSuspend(min, block)
890933
}
934+
}
891935

892-
packet.release()
936+
private suspend fun readBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
937+
if (!readSuspend(min)) throw EOFException("Got EOF but at least $min bytes were expected")
938+
read(min, block)
893939
}
894940

895-
private suspend fun writeSingleBufferPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacketSingle) {
896-
try {
897-
writeFully(buffer)
898-
} finally {
899-
packet.pool.recycle(buffer)
941+
private suspend fun writeViewBasedPacket(packet: ByteReadPacketViewBased) {
942+
while (!packet.isEmpty) {
943+
val node = packet.steal() ?: break
944+
945+
var written: Int
946+
947+
while (node.readRemaining > 0) {
948+
try {
949+
written = 0
950+
writing {
951+
val size = minOf(remaining(), node.readRemaining)
952+
if (!it.tryWriteExact(size)) throw IllegalStateException()
953+
written = node.read(this, size)
954+
bytesWritten(it, size)
955+
}
956+
} catch (t: Throwable) {
957+
node.release()
958+
packet.release()
959+
throw t
960+
}
961+
962+
if (written == 0 && node.readRemaining > 0) {
963+
return writeViewBasedPacketSuspend(packet, node)
964+
}
965+
}
966+
967+
node.release()
900968
}
901969
}
902970

903-
private suspend fun writeMultipleBufferPacketSuspend(rem: ByteBuffer, packet: ByteReadPacketImpl) {
904-
var buffer = rem
971+
private suspend fun writeViewBasedPacketSuspend(packet: ByteReadPacketViewBased, node0: BufferView) {
972+
var node: BufferView? = node0
905973

906974
try {
907-
do {
908-
writeFully(buffer)
909-
if (packet.remaining == 0) break
910-
packet.pool.recycle(buffer)
911-
buffer = packet.steal()
912-
} while (true)
913-
} finally {
914-
packet.pool.recycle(buffer)
975+
while ((node != null && node.readRemaining > 0) || !packet.isEmpty) {
976+
while (node != null && node.readRemaining > 0) {
977+
writeSuspend(1)
978+
writing {
979+
val size = minOf(remaining(), node!!.readRemaining)
980+
if (!it.tryWriteExact(size)) throw IllegalStateException()
981+
node!!.read(this, size)
982+
bytesWritten(it, size)
983+
}
984+
985+
if (node.readRemaining == 0) {
986+
node.release()
987+
node = packet.steal()
988+
}
989+
}
990+
}
991+
} catch (t: Throwable) {
992+
node?.release()
915993
packet.release()
994+
throw t
916995
}
917996
}
918997

@@ -927,6 +1006,8 @@ internal class ByteBufferChannel(
9271006
if (buffer.hasRemaining()) {
9281007
buffer.compact()
9291008
break
1009+
} else {
1010+
buffer.compact()
9301011
}
9311012
}
9321013

@@ -935,15 +1016,19 @@ internal class ByteBufferChannel(
9351016
t
9361017
}
9371018

1019+
if (t != null) {
1020+
BufferPool.recycle(buffer)
1021+
packet.release()
1022+
throw t
1023+
}
1024+
9381025
buffer.flip()
9391026
if (buffer.hasRemaining()) {
9401027
return writeExternalPacketSuspend(buffer, packet)
9411028
}
9421029

9431030
BufferPool.recycle(buffer)
9441031
packet.release()
945-
946-
if (t != null) throw t
9471032
}
9481033

9491034
private suspend fun writeExternalPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacket) {

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,21 @@ public interface ByteReadChannel {
117117
* and no characters were read.
118118
*/
119119
suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean
120+
121+
/**
122+
* Invokes [block] when it will be possible to read at least [min] bytes
123+
* providing byte buffer to it so lambda can read from the buffer
124+
* up to [ByteBuffer.remaining] bytes. If there are no [min] bytes available then the invocation could
125+
* suspend until the requirement will be met.
126+
*
127+
* Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
128+
* eg: it could be 4 bytes available for read but the provided byte buffer could have only 2 remaining bytes:
129+
* in this case you have to invoke read again (with decreased [min] accordingly).
130+
*
131+
* @param min amount of bytes available for read, should be positive
132+
* @param block to be invoked when at least [min] bytes available for read
133+
*/
134+
suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
120135
}
121136

122137
/**

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ public interface ByteWriteChannel {
5959
suspend fun writeFully(src: ByteArray) = writeFully(src, 0, src.size)
6060
suspend fun writeFully(src: ByteBuffer)
6161

62+
/**
63+
* Invokes [block] when it will be possible to write at least [min] bytes
64+
* providing byte buffer to it so lambda can write to the buffer
65+
* up to [ByteBuffer.remaining] bytes. If there are no [min] bytes spaces available then the invocation could
66+
* suspend until the requirement will be met.
67+
*
68+
* Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
69+
* eg: it could be 4 bytes available for write but the provided byte buffer could have only 2 remaining bytes:
70+
* in this case you have to invoke write again (with decreased [min] accordingly).
71+
*
72+
* @param min amount of bytes available for write, should be positive
73+
* @param block to be invoked when at least [min] bytes free capacity available
74+
*/
75+
suspend fun write(min: Int = 1, block: (ByteBuffer) -> Unit)
76+
6277
/**
6378
* Writes a [packet] fully or fails if channel get closed before the whole packet has been written
6479
*/

0 commit comments

Comments
 (0)