Skip to content

Commit ccf8dde

Browse files
author
Sergey Mashkov
committed
IO: fix verifying pool usage
buffer view doesn't hold original pool reference to make pool chaining working
1 parent 7154d59 commit ccf8dde

File tree

12 files changed

+66
-47
lines changed

12 files changed

+66
-47
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,7 @@ internal class ByteBufferChannel(
955955
}
956956
}
957957
} catch (t: Throwable) {
958-
node.release()
958+
node.release(packet.pool)
959959
packet.release()
960960
throw t
961961
}
@@ -965,7 +965,7 @@ internal class ByteBufferChannel(
965965
}
966966
}
967967

968-
node.release()
968+
node.release(packet.pool)
969969
}
970970
}
971971

@@ -984,13 +984,13 @@ internal class ByteBufferChannel(
984984
}
985985

986986
if (node.readRemaining == 0) {
987-
node.release()
987+
node.release(packet.pool)
988988
node = packet.steal()
989989
}
990990
}
991991
}
992992
} catch (t: Throwable) {
993-
node?.release()
993+
node?.release(packet.pool)
994994
packet.release()
995995
throw t
996996
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/buffers/BufferView.kt

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ import kotlinx.coroutines.experimental.io.internal.*
66
import kotlinx.coroutines.experimental.io.packet.*
77

88
internal class BufferView private constructor(private var content: ByteBuffer,
9-
private val origin: BufferView?,
10-
private val pool: ObjectPool<BufferView>?) {
9+
private val origin: BufferView?) {
1110

12-
constructor(pool: ObjectPool<ByteBuffer>) : this(pool.borrow(), null, null)
13-
constructor(buffer: ByteBuffer, pool: ObjectPool<BufferView>) : this(buffer, null, pool)
14-
constructor(buffer: ByteBuffer) : this(buffer, null, null)
11+
constructor(pool: ObjectPool<ByteBuffer>) : this(pool.borrow(), null)
12+
constructor(buffer: ByteBuffer) : this(buffer, null)
1513

1614
private var duplicated: ByteBuffer? = null
1715

@@ -88,7 +86,7 @@ internal class BufferView private constructor(private var content: ByteBuffer,
8886
require(offset < dst.size)
8987
require(offset + length <= dst.size)
9088

91-
val dup = readDuplicated()
89+
val dup = readDuplicated(length)
9290
dup.get(dst, offset, length)
9391
readPosition = rp + length
9492
}
@@ -202,7 +200,7 @@ internal class BufferView private constructor(private var content: ByteBuffer,
202200
val to = duplicated()
203201
to.limit(rp)
204202
to.position(rp - size)
205-
val from = other.readDuplicated()
203+
val from = other.readDuplicated(Int.MAX_VALUE)
206204

207205
to.put(from)
208206
readPosition = rp - size
@@ -250,9 +248,10 @@ internal class BufferView private constructor(private var content: ByteBuffer,
250248
fun isExclusivelyOwned(): Boolean = refCount.value == 1L
251249

252250
fun makeView(): BufferView {
253-
(origin ?: this).acquire()
251+
val newOrigin = origin ?: this
252+
newOrigin.acquire()
254253

255-
val view = BufferView(content, origin, null)
254+
val view = BufferView(content, newOrigin)
256255
view.attachment = attachment
257256
view.readPosition = readPosition
258257
view.writePosition = writePosition
@@ -261,15 +260,15 @@ internal class BufferView private constructor(private var content: ByteBuffer,
261260
return view
262261
}
263262

264-
fun release() {
263+
fun release(pool: ObjectPool<BufferView>) {
265264
if (releaseRefCount()) {
266265
resetForWrite()
267266

268267
if (origin != null) {
269268
unlink()
270-
origin.release()
269+
origin.release(pool)
271270
} else {
272-
pool?.recycle(this) ?: unlink()
271+
pool.recycle(this)
273272
}
274273
}
275274
}
@@ -298,7 +297,7 @@ internal class BufferView private constructor(private var content: ByteBuffer,
298297
writePosition += delta
299298
}
300299

301-
private fun readDuplicated(limit: Int = Int.MAX_VALUE): ByteBuffer {
300+
private fun readDuplicated(limit: Int): ByteBuffer {
302301
val position = readPosition
303302
val upLimit = (position.toLong() + limit).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
304303

@@ -308,7 +307,7 @@ internal class BufferView private constructor(private var content: ByteBuffer,
308307
return buffer
309308
}
310309

311-
private fun writeDuplicated(limit: Int = Int.MAX_VALUE): ByteBuffer {
310+
private fun writeDuplicated(limit: Int): ByteBuffer {
312311
val position = writePosition
313312
val upLimit = (position.toLong() + limit).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
314313

@@ -349,7 +348,7 @@ internal class BufferView private constructor(private var content: ByteBuffer,
349348
}
350349

351350
internal object Pool : ObjectPoolImpl<BufferView>(PACKET_BUFFER_POOL_SIZE) {
352-
override fun produceInstance() = BufferView(ByteBuffer.allocateDirect(PACKET_BUFFER_SIZE), this)
351+
override fun produceInstance() = BufferView(ByteBuffer.allocateDirect(PACKET_BUFFER_SIZE))
353352
override fun clearInstance(instance: BufferView): BufferView {
354353
return instance.apply {
355354
next = null

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/buffers/Utils.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package kotlinx.coroutines.experimental.io.buffers
22

3-
internal tailrec fun BufferView?.releaseAll() {
3+
import kotlinx.coroutines.experimental.io.internal.*
4+
5+
internal tailrec fun BufferView?.releaseAll(pool: ObjectPool<BufferView>) {
46
if (this == null) return
5-
release()
6-
next.releaseAll()
7+
release(pool)
8+
next.releaseAll(pool)
79
}
810

911
internal fun BufferView.copyAll(): BufferView {
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
11
package kotlinx.coroutines.experimental.io.packet
22

3-
import kotlinx.coroutines.experimental.io.buffers.*
4-
5-
val ByteReadPacketEmpty: ByteReadPacket = ByteReadPacketViewBased(BufferView.Empty)
3+
val ByteReadPacketEmpty: ByteReadPacket = ByteReadPacketViewBased.Empty

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketViewBased.kt

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@ import kotlinx.coroutines.experimental.io.internal.*
66
import java.io.*
77
import java.nio.charset.*
88

9-
internal class ByteReadPacketViewBased(private var head: BufferView) : ByteReadPacket {
9+
internal class ByteReadPacketViewBased(private var head: BufferView,
10+
internal val pool: ObjectPool<BufferView>) : ByteReadPacket {
1011

1112
override val remaining: Int
1213
get() = head.remainingAll().toInt() // TODO Long or Int?
1314

1415
override val isEmpty: Boolean
1516
get() = head.isEmpty()
1617

17-
override fun copy(): ByteReadPacket = ByteReadPacketViewBased(head.copyAll())
18+
override fun copy(): ByteReadPacket = ByteReadPacketViewBased(head.copyAll(), pool)
1819

1920
override fun release() {
2021
if (head !== BufferView.Empty) {
21-
head.releaseAll()
22+
head.releaseAll(pool)
2223
head = BufferView.Empty
2324
}
2425
}
@@ -86,6 +87,7 @@ internal class ByteReadPacketViewBased(private var head: BufferView) : ByteReadP
8687
true
8788
}
8889
'\n' -> {
90+
afterRead()
8991
return true
9092
}
9193
else -> {
@@ -94,15 +96,18 @@ internal class ByteReadPacketViewBased(private var head: BufferView) : ByteReadP
9496
return@decodeUTF8 false
9597
}
9698

97-
if (decoded == limit) throw BufferLimitExceededException("Too many characters in line: limit $limit exceeded")
99+
if (decoded == limit) {
100+
afterRead()
101+
throw BufferLimitExceededException("Too many characters in line: limit $limit exceeded")
102+
}
98103
decoded++
99104
out.append(ch)
100105
true
101106
}
102107
}
103108
}
104109

105-
if (size == 0) {
110+
if (size == 0 || end) {
106111
afterRead()
107112
size = 1
108113
}
@@ -306,7 +311,7 @@ internal class ByteReadPacketViewBased(private var head: BufferView) : ByteReadP
306311
head.writeBufferAppend(next, minSize)
307312
if (next.readRemaining == 0) {
308313
head.next = next.next
309-
next.release()
314+
next.release(pool)
310315
}
311316

312317
if (head.readRemaining >= minSize) return head
@@ -325,11 +330,13 @@ internal class ByteReadPacketViewBased(private var head: BufferView) : ByteReadP
325330
private fun releaseHead(head: BufferView) {
326331
val next = head.next
327332
this.head = next ?: BufferView.Empty
328-
head.release()
333+
head.release(pool)
329334
}
330335

331336
companion object {
332-
val Empty = ByteReadPacketViewBased(BufferView.Empty)
337+
val Empty = ByteReadPacketViewBased(BufferView.Empty, object: NoPoolImpl<BufferView>() {
338+
override fun borrow() = BufferView.Empty
339+
})
333340
val ReservedSize = 8
334341
private val SkipBuffer = CharArray(8192)
335342
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
144144
tail.writeBufferAppend(foreignStolen)
145145
tail.next = foreignStolen.next
146146
this.tail = foreignStolen.tail().takeUnless { it === foreignStolen } ?: tail
147-
foreignStolen.release()
147+
foreignStolen.release(p.pool)
148148
size = head.remainingAll().toInt()
149149
} else if (appendSize == -1 || prependSize < appendSize) {
150150
// do prepend
@@ -162,7 +162,7 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
162162

163163
pre.next = foreignStolen
164164
}
165-
tail.release()
165+
tail.release(pool)
166166

167167
this.tail = foreignStolen.tail()
168168
size = head.remainingAll().toInt()
@@ -326,7 +326,7 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
326326
this.size = 0
327327

328328
if (head === BufferView.Empty) return ByteReadPacketViewBased.Empty
329-
return ByteReadPacketViewBased(head)
329+
return ByteReadPacketViewBased(head, pool)
330330
}
331331

332332
override fun release() {
@@ -336,7 +336,7 @@ internal class ByteWritePacketImpl(private var headerSizeHint: Int, private val
336336
if (head !== empty) {
337337
this.head = empty
338338
this.tail = empty
339-
head.releaseAll()
339+
head.releaseAll(pool)
340340
size = 0
341341
}
342342
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Channels.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fun WritableByteChannel.writePacket(p: ByteReadPacket) {
2323
}
2424
}
2525
} finally {
26-
b?.release()
26+
b?.release(p.pool)
2727
p.release()
2828
}
2929
} else {
@@ -59,6 +59,7 @@ private fun ReadableByteChannel.readPacketImpl(min: Long, max: Long): ByteReadPa
5959

6060
if (max == 0L) return ByteReadPacketEmpty
6161

62+
val pool = BufferView.Pool
6263
val empty = BufferView.Empty
6364
var head: BufferView = empty
6465
var tail: BufferView = empty
@@ -69,7 +70,7 @@ private fun ReadableByteChannel.readPacketImpl(min: Long, max: Long): ByteReadPa
6970
while (read < min || (read == min && min == 0L)) {
7071
val remInt = (max - read).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
7172

72-
val part = tail.takeIf { it.writeRemaining.let { it > 200 || it >= remInt } } ?: BufferView.Pool.borrow().also {
73+
val part = tail.takeIf { it.writeRemaining.let { it > 200 || it >= remInt } } ?: pool.borrow().also {
7374
if (head === empty) {
7475
head = it; tail = it
7576
}
@@ -93,10 +94,10 @@ private fun ReadableByteChannel.readPacketImpl(min: Long, max: Long): ByteReadPa
9394
}
9495
}
9596
} catch (t: Throwable) {
96-
head.releaseAll()
97+
head.releaseAll(pool)
9798
throw t
9899
}
99100

100-
return ByteReadPacketViewBased(head)
101+
return ByteReadPacketViewBased(head, pool)
101102
}
102103

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ inline fun buildPacket(headerSizeHint: Int = 0, block: ByteWritePacket.() -> Uni
3232
fun WritePacket(headerSizeHint: Int = 0): ByteWritePacket = ByteWritePacketImpl(headerSizeHint, BufferView.Pool)
3333

3434
fun ByteReadPacket.readUTF8Line(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
35+
if (isEmpty) return null
3536
val sb = StringBuilder(estimate)
3637
return if (readUTF8LineTo(sb, limit)) sb.toString() else null
3738
}

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ class ByteBufferChannelTest {
3131
private val failures = ErrorCollector()
3232

3333
@get:Rule
34-
private val pool = VerifyingObjectPool(BufferObjectNoPool)
34+
internal val pool = VerifyingObjectPool(BufferObjectNoPool)
3535

3636
@get:Rule
37-
private val pktPool = VerifyingObjectPool(BufferView.Pool)
37+
internal val pktPool = VerifyingObjectPool(BufferView.Pool)
3838

3939
private val Size = BUFFER_SIZE - RESERVED_SIZE
4040
private val ch = ByteBufferChannel(autoFlush = false, pool = pool)
@@ -567,7 +567,7 @@ class ByteBufferChannelTest {
567567
writeStringUtf8("Hello")
568568
} as ByteReadPacketViewBased
569569

570-
val packet = ByteReadPacketViewBased(packet0.steal()!!)
570+
val packet = ByteReadPacketViewBased(packet0.steal()!!, pktPool)
571571

572572
ch.writeInt(packet.remaining)
573573
ch.writePacket(packet)

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import kotlin.test.*
99

1010
class BytePacketBuildTest {
1111
@get:Rule
12-
private val pool = VerifyingObjectPool(BufferView.Pool)
12+
internal val pool = VerifyingObjectPool(BufferView.Pool)
1313

1414
@Test
1515
fun smokeSingleBufferTest() {
@@ -160,6 +160,8 @@ class BytePacketBuildTest {
160160
p.readInt()
161161
fail()
162162
} catch (expected: EOFException) {
163+
} finally {
164+
p.release()
163165
}
164166
}
165167

0 commit comments

Comments
 (0)