Skip to content

Commit 57906eb

Browse files
author
Sergey Mashkov
committed
IO: add Packet.writePacket optimizations, add object pool verifier
1 parent 2dafc49 commit 57906eb

File tree

11 files changed

+326
-44
lines changed

11 files changed

+326
-44
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,12 +367,12 @@ internal class ByteBufferChannel(
367367
return readAvailable(dst)
368368
}
369369

370-
suspend override fun readPacket(size: Int): ByteReadPacket {
370+
suspend override fun readPacket(size: Int, headerSizeHint: Int): ByteReadPacket {
371371
closed?.cause?.let { throw it }
372372

373373
if (size == 0) return ByteReadPacketEmpty
374374

375-
val builder = ByteWritePacketImpl(BufferPool)
375+
val builder = ByteWritePacketImpl(headerSizeHint, BufferPool)
376376
val buffer = BufferPool.borrow()
377377

378378
try {
@@ -874,6 +874,7 @@ internal class ByteBufferChannel(
874874
writeAsMuchAsPossible(buffer)
875875
if (buffer.hasRemaining()) break
876876
packet.pool.recycle(buffer)
877+
buffer = null
877878
}
878879
null
879880
} catch (t: Throwable) { t }

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ public interface ByteReadChannel {
5454

5555
/**
5656
* Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
57-
* and not enough bytes available.
57+
* and not enough bytes available. Accepts [headerSizeHint] to be provided, see [WritePacket].
5858
*/
59-
suspend fun readPacket(size: Int): ByteReadPacket
59+
suspend fun readPacket(size: Int, headerSizeHint: Int = 0): ByteReadPacket
6060

6161
/**
6262
* Reads a long number (suspending if not enough bytes available) or fails if channel has been closed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ suspend fun ByteWriteChannel.writeChar(ch: Char) {
168168
writeShort(ch.toInt())
169169
}
170170

171-
inline suspend fun ByteWriteChannel.writePacket(builder: ByteWritePacket.() -> Unit) {
172-
writePacket(buildPacket(builder))
171+
inline suspend fun ByteWriteChannel.writePacket(headerSizeHint: Int = 0, builder: ByteWritePacket.() -> Unit) {
172+
writePacket(buildPacket(headerSizeHint, builder))
173173
}
174174

175175
suspend fun ByteWriteChannel.writePacketSuspend(builder: suspend ByteWritePacket.() -> Unit) {

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.util.*
1010

1111
internal class ByteReadPacketImpl(internal val packets: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
1212
override val remaining: Int
13-
get() = packets.sumBy { it.remaining() }
13+
get() = if (packets.isNotEmpty()) packets.sumBy { it.remaining() } else 0
1414

1515
internal fun steal(): ByteBuffer = packets.pollFirst() ?: throw IllegalStateException("EOF")
1616

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ interface ByteWritePacket : Appendable {
2424

2525
fun release()
2626
fun build(): ByteReadPacket
27+
fun reset()
2728

2829
fun outputStream(): OutputStream
2930
fun writerUTF8(): Writer
3031

3132
fun writePacket(p: ByteReadPacket)
33+
@Deprecated("")
3234
fun writePacketUnconsumed(p: ByteReadPacket)
3335

3436
override fun append(csq: CharSequence): ByteWritePacket {

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt

Lines changed: 140 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,13 @@ import java.io.*
55
import java.nio.ByteBuffer
66
import java.nio.CharBuffer
77
import java.util.*
8+
import kotlin.NoSuchElementException
9+
10+
internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val pool: ObjectPool<ByteBuffer>) : ByteWritePacket {
11+
init {
12+
require(headerSizeHint >= 0) { "shouldn't be negative: headerSizeHint = $headerSizeHint" }
13+
}
814

9-
internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : ByteWritePacket {
1015
override var size: Int = 0
1116
private set
1217
private var buffers: Any? = null
@@ -86,22 +91,72 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
8691
override fun writePacket(p: ByteReadPacket) {
8792
when (p) {
8893
is ByteReadPacketEmpty -> {}
89-
is ByteReadPacketSingle -> {
90-
if (p.remaining > 0) {
91-
size += p.remaining
92-
last(p.steal().also { it.compact() })
94+
is ByteReadPacketSingle -> writePacketSingle(p)
95+
is ByteReadPacketImpl -> writePacketMultiple(p)
96+
else -> writeFully(p.readBytes())
97+
}
98+
}
99+
100+
private fun reverseCopyToForeignBuffer(count: Int, last: ByteBuffer, buffer: ByteBuffer) {
101+
val startOffset = buffer.position() - count
102+
val l = buffer.limit()
103+
buffer.position(startOffset)
104+
buffer.limit(startOffset + count)
105+
106+
last.flip()
107+
last.position(last.limit() - count)
108+
buffer.put(last)
109+
110+
recycleLast()
111+
headerSizeHint = startOffset
112+
buffer.limit(buffer.capacity())
113+
buffer.position(l)
114+
last(buffer)
115+
}
116+
117+
private fun writePacketSingle(p: ByteReadPacketSingle) {
118+
val initialRemaining = p.remaining
119+
if (initialRemaining > 0) {
120+
size += initialRemaining
121+
writePacketBuffer(p.steal())
122+
}
123+
}
124+
125+
private fun writePacketMultiple(p: ByteReadPacketImpl) {
126+
val initialRemaining = p.remaining
127+
if (initialRemaining > 0) {
128+
size += initialRemaining
129+
130+
do {
131+
writePacketBuffer(p.steal())
132+
} while (p.remaining > 0)
133+
}
134+
}
135+
136+
private fun writePacketBuffer(buffer: ByteBuffer) {
137+
val last = last()
138+
139+
if (buffer.position() > 0 && last != null) {
140+
if (last === buffers || buffersCount() == 1) {
141+
val count = last.position() - headerSizeHint
142+
if (count < PACKET_MAX_COPY_SIZE && count <= buffer.position()) {
143+
reverseCopyToForeignBuffer(count, last, buffer)
144+
return
93145
}
94-
}
95-
is ByteReadPacketImpl -> {
96-
size += p.remaining
97-
while (p.remaining > 0) {
98-
last(p.steal().also { it.compact() })
146+
} else {
147+
val count = last.position()
148+
if (count < PACKET_MAX_COPY_SIZE && count == buffer.position()) {
149+
reverseCopyToForeignBuffer(count, last, buffer)
150+
return
99151
}
100152
}
101-
else -> {
102-
writeFully(p.readBytes())
103-
}
153+
} else if (last != null && last.remaining() <= buffer.remaining() && buffer.remaining() < PACKET_MAX_COPY_SIZE) {
154+
last.put(buffer)
155+
recycle(buffer)
156+
return
104157
}
158+
159+
last(buffer.also { it.compact() })
105160
}
106161

107162
override fun writePacketUnconsumed(p: ByteReadPacket) {
@@ -264,22 +319,43 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
264319
}
265320

266321
override fun build(): ByteReadPacket {
267-
val bs = buffers ?: return ByteReadPacketEmpty
322+
val bs = buffers
268323
buffers = null
324+
size = 0
269325

270-
return if (bs is ArrayDeque<*>) {
271-
@Suppress("UNCHECKED_CAST")
272-
when {
273-
bs.isEmpty() -> ByteReadPacketEmpty
274-
bs.size == 1 -> ByteReadPacketSingle((bs.first as ByteBuffer).also { it.flip() }, pool)
275-
else -> ByteReadPacketImpl((bs as ArrayDeque<ByteBuffer>).also {
276-
for (b in bs) {
277-
b.flip()
278-
}
279-
}, pool)
326+
return when (bs) {
327+
null -> ByteReadPacketEmpty
328+
is ArrayDeque<*> -> {
329+
@Suppress("UNCHECKED_CAST") buildMultiBufferPacket(bs as ArrayDeque<ByteBuffer>)
330+
}
331+
else -> ByteReadPacketSingle((bs as ByteBuffer).also { switchBufferToRead(true, it) }, pool)
332+
}
333+
}
334+
335+
private fun buildMultiBufferPacket(buffers: ArrayDeque<ByteBuffer>): ByteReadPacket {
336+
return when (buffers.size) {
337+
0 -> ByteReadPacketEmpty
338+
1 -> ByteReadPacketSingle(buffers.first.also { switchBufferToRead(true, it) }, pool)
339+
else -> {
340+
val it = buffers.iterator()
341+
switchBufferToRead(true, it.next())
342+
do {
343+
switchBufferToRead(false, it.next())
344+
} while (it.hasNext())
345+
346+
ByteReadPacketImpl(buffers, pool)
347+
}
348+
}
349+
}
350+
351+
private fun switchBufferToRead(first: Boolean, bb: ByteBuffer) {
352+
bb.flip()
353+
354+
if (first) {
355+
val skip = headerSizeHint
356+
if (skip > 0) {
357+
bb.position(bb.position() + skip)
280358
}
281-
} else {
282-
ByteReadPacketSingle((bs as ByteBuffer).also { it.flip() }, pool)
283359
}
284360
}
285361

@@ -297,6 +373,10 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
297373
}
298374
}
299375

376+
override fun reset() {
377+
release()
378+
}
379+
300380
private inline fun write(size: Int, block: (ByteBuffer) -> Unit) {
301381
val buffer = last()?.takeIf { it.remaining() >= size }
302382

@@ -313,6 +393,9 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
313393
val new = pool.borrow()
314394
new.clear()
315395
last(new)
396+
if (buffers === new) {
397+
new.position(headerSizeHint)
398+
}
316399
return new
317400
}
318401

@@ -328,15 +411,44 @@ internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : B
328411
private fun last(new: ByteBuffer) {
329412
@Suppress("UNCHECKED_CAST")
330413
if (buffers is ArrayDeque<*>) (buffers as ArrayDeque<ByteBuffer>).addLast(new)
331-
else if (buffers == null) buffers = new
332-
else {
414+
else if (buffers == null) {
415+
buffers = new
416+
} else {
333417
val dq = ArrayDeque<ByteBuffer>()
334418
dq.addFirst(buffers as ByteBuffer)
335419
dq.addLast(new)
336420
buffers = dq
337421
}
338422
}
339423

424+
private fun recycleLast() {
425+
val b = buffers
426+
when (b) {
427+
is ArrayDeque<*> -> {
428+
recycle(b.pollLast() as ByteBuffer)
429+
if (b.isEmpty()) {
430+
buffers = null
431+
}
432+
}
433+
is ByteBuffer -> {
434+
buffers = null
435+
recycle(b)
436+
}
437+
else -> throw NoSuchElementException("Unable to recycle last buffer: buffers chain is empty")
438+
}
439+
}
440+
441+
private fun buffersCount(): Int {
442+
val bb = buffers
443+
444+
return when (bb) {
445+
null -> 0
446+
is ByteBuffer -> 1
447+
is ArrayDeque<*> -> bb.size
448+
else -> 0
449+
}
450+
}
451+
340452
private fun recycle(buffer: ByteBuffer) {
341453
pool.recycle(buffer)
342454
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,37 @@ import java.nio.ByteBuffer
55

66
internal val PACKET_BUFFER_SIZE = getIOIntProperty("PacketBufferSize", 4096)
77
private val PACKET_BUFFER_POOL_SIZE = getIOIntProperty("PacketBufferPoolSize", 128)
8+
internal val PACKET_MAX_COPY_SIZE = getIOIntProperty("PacketMaxCopySize", 500)
89

9-
private val PacketBufferPool: ObjectPool<ByteBuffer> =
10+
internal val PacketBufferPool: ObjectPool<ByteBuffer> =
1011
object : ObjectPoolImpl<ByteBuffer>(PACKET_BUFFER_POOL_SIZE) {
1112
override fun produceInstance(): ByteBuffer = ByteBuffer.allocateDirect(PACKET_BUFFER_SIZE)
1213
override fun clearInstance(instance: ByteBuffer) = instance.apply { clear() }
1314
}
1415

15-
inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket =
16-
WritePacket().apply(block).build()
16+
internal inline fun buildPacket(pool: ObjectPool<ByteBuffer> = PacketBufferPool, headerSizeHint: Int, block: ByteWritePacket.() -> Unit): ByteReadPacket {
17+
val w = ByteWritePacketImpl(headerSizeHint, pool)
18+
try {
19+
block(w)
20+
return w.build()
21+
} catch (t: Throwable) {
22+
w.release()
23+
throw t
24+
}
25+
}
26+
27+
inline fun buildPacket(headerSizeHint: Int = 0, block: ByteWritePacket.() -> Unit): ByteReadPacket {
28+
val w = WritePacket(headerSizeHint)
29+
try {
30+
block(w)
31+
return w.build()
32+
} catch (t: Throwable) {
33+
w.release()
34+
throw t
35+
}
36+
}
1737

18-
fun WritePacket(): ByteWritePacket = ByteWritePacketImpl(PacketBufferPool)
38+
fun WritePacket(headerSizeHint: Int = 0): ByteWritePacket = ByteWritePacketImpl(headerSizeHint, PacketBufferPool)
1939

2040
fun ByteReadPacket.readUTF8Line(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
2141
val sb = StringBuilder(estimate)

0 commit comments

Comments
 (0)