Skip to content

Commit 5804c37

Browse files
author
Sergey Mashkov
committed
IO: eliminate stealing if packets use different pools
1 parent 57906eb commit 5804c37

File tree

2 files changed

+32
-24
lines changed

2 files changed

+32
-24
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import java.nio.*
88
import java.nio.charset.*
99
import java.util.*
1010

11-
internal class ByteReadPacketImpl(internal val packets: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
11+
internal class ByteReadPacketImpl(internal val buffers: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
1212
override val remaining: Int
13-
get() = if (packets.isNotEmpty()) packets.sumBy { it.remaining() } else 0
13+
get() = if (buffers.isNotEmpty()) buffers.sumBy { it.remaining() } else 0
1414

15-
internal fun steal(): ByteBuffer = packets.pollFirst() ?: throw IllegalStateException("EOF")
15+
internal fun steal(): ByteBuffer = buffers.pollFirst() ?: throw IllegalStateException("EOF")
1616

1717
override fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int {
1818
var copied = 0
@@ -228,27 +228,27 @@ internal class ByteReadPacketImpl(internal val packets: ArrayDeque<ByteBuffer>,
228228
}
229229

230230
override fun release() {
231-
while (packets.isNotEmpty()) {
232-
recycle(packets.remove())
231+
while (buffers.isNotEmpty()) {
232+
recycle(buffers.remove())
233233
}
234234
}
235235

236236
override fun copy(): ByteReadPacket {
237-
if (packets.isEmpty()) return ByteReadPacketEmpty
238-
val copyDeque = ArrayDeque<ByteBuffer>(packets.size)
237+
if (buffers.isEmpty()) return ByteReadPacketEmpty
238+
val copyDeque = ArrayDeque<ByteBuffer>(buffers.size)
239239

240-
for (p in packets) {
240+
for (p in buffers) {
241241
copyDeque.add(pool.borrow().also { it.put(p.duplicate()); it.flip() })
242242
}
243243

244244
return ByteReadPacketImpl(copyDeque, pool)
245245
}
246246

247247
private inline fun reading(size: Int, block: (ByteBuffer) -> Boolean): Boolean {
248-
if (packets.isEmpty()) return false
248+
if (buffers.isEmpty()) return false
249249

250250
var visited = false
251-
var buffer = packets.peekFirst()
251+
var buffer = buffers.peekFirst()
252252
var stop = false
253253

254254
while (!stop) {
@@ -262,26 +262,26 @@ internal class ByteReadPacketImpl(internal val packets: ArrayDeque<ByteBuffer>,
262262
}
263263

264264
if (!buffer.hasRemaining()) {
265-
packets.removeFirst()
265+
buffers.removeFirst()
266266
recycle(buffer)
267267

268-
if (packets.isEmpty()) break
269-
buffer = packets.peekFirst()
268+
if (buffers.isEmpty()) break
269+
buffer = buffers.peekFirst()
270270
}
271271
}
272272

273273
return visited
274274
}
275275

276276
private fun tryStealBytesFromNextBuffer(size: Int, buffer: ByteBuffer): Boolean {
277-
if (packets.size == 1) {
277+
if (buffers.size == 1) {
278278
return false
279279
}
280280

281-
packets.removeFirst()
281+
buffers.removeFirst()
282282

283283
val extraBytes = size - buffer.remaining()
284-
val next = packets.peekFirst()
284+
val next = buffers.peekFirst()
285285

286286
if (extraBytes > next.remaining()) return false
287287

@@ -291,7 +291,7 @@ internal class ByteReadPacketImpl(internal val packets: ArrayDeque<ByteBuffer>,
291291
}
292292
buffer.flip()
293293

294-
packets.addFirst(buffer)
294+
buffers.addFirst(buffer)
295295
return true
296296
}
297297

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,27 +116,30 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
116116

117117
private fun writePacketSingle(p: ByteReadPacketSingle) {
118118
val initialRemaining = p.remaining
119+
val samePool = p.pool === this.pool
119120
if (initialRemaining > 0) {
120121
size += initialRemaining
121-
writePacketBuffer(p.steal())
122+
writePacketBuffer(p.steal(), samePool, p.pool)
122123
}
123124
}
124125

125126
private fun writePacketMultiple(p: ByteReadPacketImpl) {
126127
val initialRemaining = p.remaining
127128
if (initialRemaining > 0) {
128129
size += initialRemaining
130+
val samePool = p.pool === this.pool
131+
val packetPool = p.pool
129132

130133
do {
131-
writePacketBuffer(p.steal())
134+
writePacketBuffer(p.steal(), samePool, packetPool)
132135
} while (p.remaining > 0)
133136
}
134137
}
135138

136-
private fun writePacketBuffer(buffer: ByteBuffer) {
139+
private fun writePacketBuffer(buffer: ByteBuffer, samePool: Boolean, packetPool: ObjectPool<ByteBuffer>) {
137140
val last = last()
138141

139-
if (buffer.position() > 0 && last != null) {
142+
if (samePool && buffer.position() > 0 && last != null) {
140143
if (last === buffers || buffersCount() == 1) {
141144
val count = last.position() - headerSizeHint
142145
if (count < PACKET_MAX_COPY_SIZE && count <= buffer.position()) {
@@ -152,11 +155,16 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
152155
}
153156
} else if (last != null && last.remaining() <= buffer.remaining() && buffer.remaining() < PACKET_MAX_COPY_SIZE) {
154157
last.put(buffer)
155-
recycle(buffer)
158+
packetPool.recycle(buffer)
156159
return
157160
}
158161

159-
last(buffer.also { it.compact() })
162+
if (samePool) {
163+
last(buffer.also { it.compact() })
164+
} else {
165+
writeFully(buffer)
166+
packetPool.recycle(buffer)
167+
}
160168
}
161169

162170
override fun writePacketUnconsumed(p: ByteReadPacket) {
@@ -166,7 +174,7 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
166174
p.buffer?.duplicate()?.let { writeFully(it) }
167175
}
168176
is ByteReadPacketImpl -> {
169-
for (buffer in p.packets) {
177+
for (buffer in p.buffers) {
170178
writeFully(buffer.duplicate())
171179
}
172180
}

0 commit comments

Comments
 (0)