Skip to content

Commit 5feaf69

Browse files
author
Sergey Mashkov
committed
IO: add more byte packet tests and fix few bugs
1 parent 9c446c0 commit 5feaf69

File tree

9 files changed

+491
-38
lines changed

9 files changed

+491
-38
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package kotlinx.coroutines.experimental.io.packet
22

3-
import java.io.InputStream
3+
import java.io.*
44
import java.nio.ByteBuffer
55
import kotlin.experimental.and
66

@@ -28,11 +28,14 @@ interface ByteReadPacket {
2828
fun readFloat(): Float
2929

3030
fun skip(n: Int): Int
31-
fun skipExact(n: Int)
31+
fun skipExact(n: Int) {
32+
if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
33+
}
3234

3335
fun release()
3436

3537
fun readUTF8LineTo(out: Appendable, limit: Int = Int.MAX_VALUE): Boolean
3638

3739
fun inputStream(): InputStream
40+
fun readerUTF8(): Reader
3841
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package kotlinx.coroutines.experimental.io.packet
22

3-
import java.io.EOFException
4-
import java.io.InputStream
5-
import java.nio.ByteBuffer
3+
import java.io.*
4+
import java.nio.*
65

76
object ByteReadPacketEmpty : ByteReadPacket {
87
override val remaining: Int
@@ -44,22 +43,17 @@ object ByteReadPacketEmpty : ByteReadPacket {
4443
throw EOFException("Couldn't read float from empty packet")
4544
}
4645

47-
override fun skip(n: Int): Int {
48-
return 0
49-
}
50-
51-
override fun skipExact(n: Int) {
52-
if (n != 0) throw EOFException("Couldn't skip $n bytes in empty packet")
53-
}
46+
override fun skip(n: Int) = 0
5447

5548
override fun release() {
5649
}
5750

5851
override fun readUTF8LineTo(out: Appendable, limit: Int) = false
5952

60-
override fun inputStream() = EmptyInputStream
53+
override fun inputStream(): InputStream = EmptyInputStream
54+
override fun readerUTF8(): Reader = EmptyReader
6155

62-
private val EmptyInputStream = object : InputStream() {
56+
private object EmptyInputStream : InputStream() {
6357
override fun available() = 0
6458

6559
override fun read(): Int = -1
@@ -78,4 +72,13 @@ object ByteReadPacketEmpty : ByteReadPacket {
7872
override fun close() {
7973
}
8074
}
75+
76+
private object EmptyReader : Reader() {
77+
override fun close() {
78+
}
79+
80+
override fun read(cbuf: CharArray?, off: Int, len: Int) = -1
81+
override fun read() = -1
82+
override fun read(target: CharBuffer?) = -1
83+
}
8184
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ package kotlinx.coroutines.experimental.io.packet
44

55
import kotlinx.coroutines.experimental.io.internal.ObjectPool
66
import kotlinx.coroutines.experimental.io.internal.decodeUTF8
7-
import java.io.EOFException
8-
import java.io.InputStream
7+
import java.io.*
98
import java.nio.BufferOverflowException
109
import java.nio.ByteBuffer
11-
import java.nio.charset.MalformedInputException
10+
import java.nio.charset.*
1211
import java.util.*
1312

1413
internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
@@ -141,7 +140,7 @@ internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, i
141140
!end && size == 0
142141
}
143142

144-
if (!rc && size != 0) throw MalformedInputException(0)
143+
if (rc && size > 0) throw MalformedInputException(0)
145144

146145
return rc
147146
}
@@ -160,10 +159,6 @@ internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, i
160159
return skipped
161160
}
162161

163-
override fun skipExact(n: Int) {
164-
if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
165-
}
166-
167162
override fun inputStream(): InputStream {
168163
return object : InputStream() {
169164
override fun read(): Int {
@@ -182,6 +177,38 @@ internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, i
182177
}
183178
}
184179

180+
override fun readerUTF8(): Reader {
181+
return object : Reader() {
182+
override fun close() {
183+
release()
184+
}
185+
186+
override fun read(cbuf: CharArray, off: Int, len: Int): Int {
187+
var decoded = 0
188+
var size = 1
189+
190+
val rc = reading(size) { bb ->
191+
size = bb.decodeUTF8 { ch ->
192+
if (decoded == len) false
193+
else {
194+
cbuf[off + decoded] = ch
195+
decoded++
196+
true
197+
}
198+
}
199+
200+
size == 0
201+
}
202+
203+
return when {
204+
rc && size > 0 -> throw CharacterCodingException()
205+
rc -> decoded
206+
else -> -1
207+
}
208+
}
209+
}
210+
}
211+
185212
override fun release() {
186213
while (packets.isNotEmpty()) {
187214
recycle(packets.remove())
@@ -227,6 +254,8 @@ internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, i
227254
val extraBytes = size - buffer.remaining()
228255
val next = packets.peekFirst()
229256

257+
if (extraBytes > next.remaining()) return false
258+
230259
buffer.compact()
231260
repeat(extraBytes) {
232261
buffer.put(next.get())

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ package kotlinx.coroutines.experimental.io.packet
22

33
import kotlinx.coroutines.experimental.io.internal.ObjectPool
44
import kotlinx.coroutines.experimental.io.internal.decodeUTF8
5-
import java.io.EOFException
6-
import java.io.InputStream
5+
import java.io.*
76
import java.nio.BufferOverflowException
87
import java.nio.ByteBuffer
98
import java.nio.charset.MalformedInputException
@@ -151,10 +150,6 @@ internal class ByteReadPacketSingle(private var buffer: ByteBuffer?, internal va
151150
return skipped
152151
}
153152

154-
override fun skipExact(n: Int) {
155-
if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
156-
}
157-
158153
override fun inputStream(): InputStream {
159154
return object : InputStream() {
160155
override fun read(): Int {
@@ -170,6 +165,39 @@ internal class ByteReadPacketSingle(private var buffer: ByteBuffer?, internal va
170165
}
171166

172167
override fun available() = remaining
168+
169+
override fun close() {
170+
release()
171+
}
172+
}
173+
}
174+
175+
override fun readerUTF8(): Reader {
176+
return object : Reader() {
177+
override fun close() {
178+
release()
179+
}
180+
181+
override fun read(cbuf: CharArray, off: Int, len: Int): Int {
182+
var decoded = 0
183+
184+
val rc = reading { bb ->
185+
if (len == 0) return@reading
186+
187+
val rc = bb.decodeUTF8 { ch ->
188+
if (decoded == len) false
189+
else {
190+
cbuf[off + decoded] =ch
191+
decoded++
192+
true
193+
}
194+
}
195+
196+
if (rc > 0) throw MalformedInputException(0)
197+
}
198+
199+
return if (!rc) -1 else decoded
200+
}
173201
}
174202
}
175203

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package kotlinx.coroutines.experimental.io.packet
22

3-
import java.io.OutputStream
3+
import java.io.*
44
import java.nio.ByteBuffer
55
import java.nio.CharBuffer
66

@@ -26,6 +26,7 @@ interface ByteWritePacket : Appendable {
2626
fun build(): ByteReadPacket
2727

2828
fun outputStream(): OutputStream
29+
fun writerUTF8(): Writer
2930

3031
override fun append(csq: CharSequence): ByteWritePacket {
3132
append(csq, 0, csq.length)

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package kotlinx.coroutines.experimental.io.packet
22

33
import kotlinx.coroutines.experimental.io.internal.ObjectPool
4-
import java.io.OutputStream
4+
import java.io.*
55
import java.nio.ByteBuffer
66
import java.nio.CharBuffer
77
import java.util.*
@@ -105,7 +105,52 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
105105

106106
// expects at least one byte remaining in [bb]
107107
private tailrec fun appendUTF8(csq: CharSequence, start: Int, end: Int, bb: ByteBuffer) {
108-
for (i in start .. end) {
108+
val limitedEnd = minOf(end, start + bb.remaining())
109+
110+
for (i in start until limitedEnd) {
111+
val chi = csq[i].toInt() and 0xffff
112+
val requiredSize = when {
113+
chi <= 0x7f -> 1
114+
chi > 0x7ff -> 3
115+
else -> 2
116+
}
117+
118+
if (bb.remaining() < requiredSize) {
119+
return appendUTF8(csq, i, end, appendNewBuffer())
120+
}
121+
122+
size += bb.putUtf8Char(chi)
123+
}
124+
125+
if (limitedEnd < end) {
126+
return appendUTF8(csq, limitedEnd, end, appendNewBuffer())
127+
}
128+
}
129+
130+
private tailrec fun appendASCII(csq: CharArray, start: Int, end: Int) {
131+
val bb = ensure()
132+
val limitedEnd = minOf(end, start + bb.remaining())
133+
134+
for (i in start until limitedEnd) {
135+
val chi = csq[i].toInt() and 0xffff
136+
if (chi >= 0x80) {
137+
appendUTF8(csq, i, end, bb)
138+
return
139+
}
140+
141+
bb.put(chi.toByte())
142+
size++
143+
}
144+
145+
if (limitedEnd < end) {
146+
return appendASCII(csq, limitedEnd, end)
147+
}
148+
}
149+
150+
// expects at least one byte remaining in [bb]
151+
private tailrec fun appendUTF8(csq: CharArray, start: Int, end: Int, bb: ByteBuffer) {
152+
val limitedEnd = minOf(end, start + bb.remaining())
153+
for (i in start until limitedEnd) {
109154
val chi = csq[i].toInt() and 0xffff
110155
val requiredSize = when {
111156
chi <= 0x7f -> 1
@@ -119,6 +164,10 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
119164

120165
size += bb.putUtf8Char(chi)
121166
}
167+
168+
if (limitedEnd < end) {
169+
return appendUTF8(csq, limitedEnd, end, appendNewBuffer())
170+
}
122171
}
123172

124173
override fun writeStringUtf8(s: String) {
@@ -164,6 +213,20 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
164213
}
165214
}
166215

216+
override fun writerUTF8(): Writer {
217+
return object : Writer() {
218+
override fun write(cbuf: CharArray, off: Int, len: Int) {
219+
appendASCII(cbuf, off, len)
220+
}
221+
222+
override fun flush() {
223+
}
224+
225+
override fun close() {
226+
}
227+
}
228+
}
229+
167230
override fun build(): ByteReadPacket {
168231
val bs = buffers ?: return ByteReadPacketEmpty
169232
buffers = null
@@ -202,7 +265,7 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
202265
val buffer = last()?.takeIf { it.remaining() >= size }
203266

204267
if (buffer == null) {
205-
block(ensure())
268+
block(appendNewBuffer())
206269
} else {
207270
block(buffer)
208271
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import kotlinx.coroutines.experimental.io.internal.ObjectPoolImpl
55
import kotlinx.coroutines.experimental.io.internal.getIOIntProperty
66
import java.nio.ByteBuffer
77

8-
private val PACKET_BUFFER_SIZE = getIOIntProperty("PacketBufferSize", 4096)
8+
internal val PACKET_BUFFER_SIZE = getIOIntProperty("PacketBufferSize", 4096)
99
private val PACKET_BUFFER_POOL_SIZE = getIOIntProperty("PacketBufferPoolSize", 128)
1010

1111
private val PacketBufferPool: ObjectPool<ByteBuffer> =

0 commit comments

Comments
 (0)