Skip to content

Commit 5428565

Browse files
author
Sergey Mashkov
committed
IO: add writeDirect to WritePacket (useful for better NIO interop)
1 parent 754a19b commit 5428565

File tree

5 files changed

+38
-9
lines changed

5 files changed

+38
-9
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/buffers/BufferView.kt

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.coroutines.experimental.io.buffers
33
import kotlinx.atomicfu.*
44
import kotlinx.coroutines.experimental.io.*
55
import kotlinx.coroutines.experimental.io.internal.*
6+
import kotlinx.coroutines.experimental.io.packet.*
67

78
internal class BufferView private constructor(private var content: ByteBuffer,
89
private val origin: BufferView?,
@@ -32,8 +33,8 @@ internal class BufferView private constructor(private var content: ByteBuffer,
3233
field = value
3334
}
3435

35-
val readRemaining: Int get() = writePosition - readPosition
36-
val writeRemaining: Int get() = limit - writePosition
36+
inline val readRemaining: Int get() = writePosition - readPosition
37+
inline val writeRemaining: Int get() = limit - writePosition
3738

3839
val startGap: Int get() = readPosition
3940
val endGap: Int get() = limit - writePosition
@@ -268,11 +269,23 @@ internal class BufferView private constructor(private var content: ByteBuffer,
268269
unlink()
269270
origin.release()
270271
} else {
271-
pool?.recycle(this)
272+
pool?.recycle(this) ?: unlink()
272273
}
273274
}
274275
}
275276

277+
internal fun writeDirect(size: Int, block: (ByteBuffer) -> Unit) {
278+
val rem = writeRemaining
279+
require (size <= rem) { "size $size is greater than buffer's remaining capacity $rem" }
280+
val buffer = writeDuplicated(rem)
281+
val positionBefore = buffer.position()
282+
block(buffer)
283+
val delta = buffer.position() - positionBefore
284+
if (delta < 0 || delta > rem) throw IllegalStateException("Wrong buffer position change: $delta (position should be moved forward only by at most size bytes (size = $size)")
285+
286+
writePosition += delta
287+
}
288+
276289
private fun readDuplicated(limit: Int = Int.MAX_VALUE): ByteBuffer {
277290
val position = readPosition
278291
val upLimit = (position.toLong() + limit).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
@@ -323,8 +336,8 @@ internal class BufferView private constructor(private var content: ByteBuffer,
323336
}
324337
}
325338

326-
internal object Pool : ObjectPoolImpl<BufferView>(BufferViewCapacity) {
327-
override fun produceInstance() = BufferView(ByteBuffer.allocateDirect(BufferViewContentSize), this)
339+
internal object Pool : ObjectPoolImpl<BufferView>(PACKET_BUFFER_POOL_SIZE) {
340+
override fun produceInstance() = BufferView(ByteBuffer.allocateDirect(PACKET_BUFFER_SIZE), this)
328341
override fun clearInstance(instance: BufferView): BufferView {
329342
return instance.apply {
330343
next = null
@@ -344,9 +357,6 @@ internal class BufferView private constructor(private var content: ByteBuffer,
344357
}
345358

346359
companion object {
347-
private val BufferViewContentSize = 4096
348-
private val BufferViewCapacity = 1024
349-
350360
private val EmptyBuffer = ByteBuffer.allocate(0)!!
351361

352362
val Empty = BufferView(EmptyBuffer)

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ interface ByteWritePacket : Appendable {
3434
@Deprecated("Use copy() instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("writePacket(p.copy())"))
3535
fun writePacketUnconsumed(p: ByteReadPacket) = writePacket(p.copy())
3636

37+
fun writeDirect(size: Int = 1, block: (ByteBuffer) -> Unit)
38+
3739
override fun append(csq: CharSequence): ByteWritePacket {
3840
append(csq, 0, csq.length)
3941
return this

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
8181
size += 4
8282
}
8383

84+
override fun writeDirect(size: Int, block: (ByteBuffer) -> Unit) {
85+
write(size) {
86+
it.writeDirect(size, block)
87+
}
88+
}
89+
8490
override fun append(c: Char): ByteWritePacket {
8591
write(3) {
8692
size += it.putUtf8Char(c.toInt() and 0xffff)

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import kotlinx.coroutines.experimental.io.buffers.*
44
import kotlinx.coroutines.experimental.io.internal.*
55

66
internal val PACKET_BUFFER_SIZE = getIOIntProperty("PacketBufferSize", 4096)
7-
private val PACKET_BUFFER_POOL_SIZE = getIOIntProperty("PacketBufferPoolSize", 128)
7+
internal val PACKET_BUFFER_POOL_SIZE = getIOIntProperty("PacketBufferPoolSize", 128)
88
internal val PACKET_MAX_COPY_SIZE = getIOIntProperty("PacketMaxCopySize", 500)
99

1010
internal inline fun buildPacket(pool: ObjectPool<BufferView> = BufferView.Pool, headerSizeHint: Int, block: ByteWritePacket.() -> Unit): ByteReadPacket {

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketReaderWriterTest.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,5 +433,16 @@ class BytePacketReaderWriterTest {
433433
assertEquals(100000, inner.remaining)
434434
}
435435

436+
@Test
437+
fun testWriteDirect() {
438+
val packet = buildPacket {
439+
writeDirect { bb ->
440+
bb.putLong(0x1234567812345678L)
441+
}
442+
}
443+
444+
assertEquals(0x1234567812345678L, packet.readLong())
445+
}
446+
436447
private inline fun buildPacket(block: ByteWritePacket.() -> Unit) = buildPacket(pool, 0, block)
437448
}

0 commit comments

Comments
 (0)