Skip to content

Commit 09b621e

Browse files
author
Sergey Mashkov
committed
IO: optimize byte buffer memory allocations
1 parent 0939f23 commit 09b621e

File tree

5 files changed

+191
-55
lines changed

5 files changed

+191
-55
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 175 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import kotlinx.io.pool.*
1313
import java.io.EOFException
1414
import java.nio.*
1515
import java.util.concurrent.atomic.*
16+
import kotlin.coroutines.experimental.*
17+
import kotlin.coroutines.experimental.intrinsics.*
1618

1719
internal const val DEFAULT_CLOSE_MESSAGE = "Byte channel was closed"
1820

@@ -42,10 +44,10 @@ internal class ByteBufferChannel(
4244
private var joining: JoiningState? = null
4345

4446
@Volatile
45-
private var readOp: CancellableContinuation<Boolean>? = null
47+
private var readOp: Continuation<Boolean>? = null
4648

4749
@Volatile
48-
private var writeOp: CancellableContinuation<Unit>? = null
50+
private var writeOp: Continuation<Unit>? = null
4951

5052
private var readPosition = 0
5153
private var writePosition = 0
@@ -56,8 +58,9 @@ internal class ByteBufferChannel(
5658
internal fun attachJob(job: Job) {
5759
attachedJob?.cancel()
5860
attachedJob = job
59-
job.invokeOnCompletion {
61+
job.invokeOnCompletion(onCancelling = true) { cause ->
6062
attachedJob = null
63+
if (cause != null) cancel(cause)
6164
}
6265
}
6366

@@ -104,6 +107,8 @@ internal class ByteBufferChannel(
104107
}
105108

106109
if (cause != null) attachedJob?.cancel(cause)
110+
readSuspendContinuationCache.close()
111+
writeSuspendContinuationCache.close()
107112

108113
return true
109114
}
@@ -150,7 +155,9 @@ internal class ByteBufferChannel(
150155
}
151156

152157
private fun setupStateForWrite(): ByteBuffer? {
153-
if (writeOp != null) throw IllegalStateException("Write operation is already in progress")
158+
if (writeOp != null) {
159+
throw IllegalStateException("Write operation is already in progress")
160+
}
154161

155162
var _allocated: ReadWriteBufferState.Initial? = null
156163
val (old, newState) = updateState { state ->
@@ -1116,7 +1123,7 @@ internal class ByteBufferChannel(
11161123

11171124
private suspend fun writeFullySuspend(src: ByteBuffer) {
11181125
while (src.hasRemaining()) {
1119-
writeSuspend(1)
1126+
tryWriteSuspend(1)
11201127

11211128
joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
11221129

@@ -1126,7 +1133,7 @@ internal class ByteBufferChannel(
11261133

11271134
private suspend fun writeFullySuspend(src: BufferView) {
11281135
while (src.canRead()) {
1129-
writeSuspend(1)
1136+
tryWriteSuspend(1)
11301137

11311138
joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
11321139

@@ -1197,7 +1204,7 @@ internal class ByteBufferChannel(
11971204
while (copied < limit) {
11981205
var avWBefore = state.availableForWrite
11991206
if (avWBefore == 0) {
1200-
writeSuspend(1)
1207+
tryWriteSuspend(1)
12011208
if (joining != null) break
12021209
avWBefore = state.availableForWrite
12031210
}
@@ -1256,7 +1263,7 @@ internal class ByteBufferChannel(
12561263
// println("readSuspend?")
12571264
flush()
12581265

1259-
if (src.availableForRead == 0 && !src.readSuspend(1)) {
1266+
if (src.availableForRead == 0 && !src.readSuspendImpl(1)) {
12601267
// println("readSuspend failed")
12611268
if (joined == null || src.tryCompleteJoining(joined)) break
12621269
}
@@ -1406,7 +1413,7 @@ internal class ByteBufferChannel(
14061413

14071414
private suspend fun writeSuspend(src: ByteArray, offset: Int, length: Int): Int {
14081415
while (true) {
1409-
writeSuspend(1)
1416+
tryWriteSuspend(1)
14101417

14111418
joining?.let { resolveDelegation(this, it)?.let { return it.writeSuspend(src, offset, length) } }
14121419

@@ -1704,6 +1711,49 @@ internal class ByteBufferChannel(
17041711
return result!!
17051712
}
17061713

1714+
override suspend fun writeSuspendSession(visitor: suspend WriterSuspendSession.() -> Unit) {
1715+
writing { byteBuffer, ringBufferCapacity ->
1716+
var locked = 0
1717+
1718+
val session = object : WriterSuspendSession {
1719+
override fun request(min: Int): ByteBuffer? {
1720+
locked += ringBufferCapacity.tryWriteAtLeast(0)
1721+
if (locked < min) return null
1722+
byteBuffer.prepareBuffer(writeByteOrder, writePosition, locked)
1723+
if (byteBuffer.remaining() < min) return null
1724+
if (joining != null) return null
1725+
1726+
return byteBuffer
1727+
}
1728+
1729+
override fun written(n: Int) {
1730+
require(n >= 0)
1731+
if (n > locked) throw IllegalStateException()
1732+
locked -= n
1733+
byteBuffer.bytesWritten(ringBufferCapacity, n)
1734+
}
1735+
1736+
override suspend fun tryAwait(n: Int) {
1737+
if (locked >= n) return
1738+
if (locked > 0) {
1739+
ringBufferCapacity.completeRead(locked)
1740+
locked = 0
1741+
}
1742+
1743+
return tryWriteSuspend(n)
1744+
}
1745+
}
1746+
1747+
try {
1748+
visitor(session)
1749+
} finally {
1750+
if (locked > 0) {
1751+
ringBufferCapacity.completeRead(locked)
1752+
}
1753+
}
1754+
}
1755+
}
1756+
17071757
override fun consumed(n: Int) {
17081758
require(n >= 0)
17091759

@@ -1713,19 +1763,23 @@ internal class ByteBufferChannel(
17131763
}
17141764
}
17151765

1716-
final suspend override fun awaitAtLeast(n: Int) {
1766+
final override suspend fun awaitAtLeast(n: Int): Boolean {
17171767
if (state.capacity.availableForRead >= n) {
1718-
if (state.idle) setupStateForRead()
1719-
return
1768+
if (state.idle || state is ReadWriteBufferState.Writing) setupStateForRead()
1769+
return true
17201770
}
17211771

1722-
return awaitAtLeastSuspend(n)
1772+
if (state.idle || state is ReadWriteBufferState.Writing) return awaitAtLeastSuspend(n)
1773+
else if (n == 1) return readSuspendImpl(1)
1774+
else return readSuspend(n)
17231775
}
17241776

1725-
private suspend fun awaitAtLeastSuspend(n: Int) {
1726-
if (readSuspend(n) && state.idle) {
1777+
private suspend fun awaitAtLeastSuspend(n: Int): Boolean {
1778+
val rc = readSuspend(n)
1779+
if (rc && state.idle) {
17271780
setupStateForRead()
17281781
}
1782+
return rc
17291783
}
17301784

17311785
override fun request(skip: Int, atLeast: Int): ByteBuffer? {
@@ -1746,33 +1800,38 @@ internal class ByteBufferChannel(
17461800
}
17471801

17481802
private inline fun consumeEachBufferRangeFast(last: Boolean, visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean): Boolean {
1749-
if (state === ReadWriteBufferState.Terminated && !last) return false
1750-
17511803
val rc = reading {
17521804
do {
1753-
val available = state.capacity.availableForRead
1754-
1755-
val rem = if (available > 0 || last) {
1756-
if (!visitor(this, last)) {
1757-
afterBufferVisited(this, it)
1758-
return true
1759-
}
1760-
1761-
val consumed = afterBufferVisited(this, it)
1762-
available - consumed
1763-
} else 0
1764-
} while (rem > 0 && !last)
1805+
if (hasRemaining() || last) {
1806+
val rc = visitor(this, last)
1807+
afterBufferVisited(this, it)
1808+
if (!rc || (last && !hasRemaining())) return true
1809+
} else break
1810+
} while (true)
17651811

17661812
last
17671813
}
17681814

17691815
if (!rc && closed != null) {
17701816
visitor(EmptyByteBuffer, true)
1817+
return true
17711818
}
17721819

17731820
return rc
17741821
}
17751822

1823+
// private suspend fun consumeEachBufferRangeSuspendLoop(visitor: RendezvousChannel<ConsumeEachBufferVisitor>) {
1824+
// var last = false
1825+
//
1826+
// do {
1827+
// if (consumeEachBufferRangeFast(last, visitor)) return
1828+
// if (last) return
1829+
// if (!readSuspend(1)) {
1830+
// last = true
1831+
// }
1832+
// } while (true)
1833+
// }
1834+
17761835
private suspend fun consumeEachBufferRangeSuspend(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean) {
17771836
var last = false
17781837

@@ -1998,7 +2057,23 @@ internal class ByteBufferChannel(
19982057
ClosedWriteChannelException(DEFAULT_CLOSE_MESSAGE))
19992058
}
20002059

2001-
private tailrec suspend fun readSuspend(size: Int): Boolean {
2060+
private suspend fun readSuspend(size: Int): Boolean {
2061+
val capacity = state.capacity
2062+
if (capacity.availableForRead >= size) return true
2063+
2064+
closed?.let { c ->
2065+
if (c.cause != null) throw c.cause
2066+
val afterCapacity = state.capacity
2067+
val result = afterCapacity.flush() && afterCapacity.availableForRead >= size
2068+
if (readOp != null) throw IllegalStateException("Read operation is already in progress")
2069+
return result
2070+
}
2071+
2072+
if (size == 1) return readSuspendImpl(1)
2073+
return readSuspendLoop(size)
2074+
}
2075+
2076+
private tailrec suspend fun readSuspendLoop(size: Int): Boolean {
20022077
val capacity = state.capacity
20032078
if (capacity.availableForRead >= size) return true
20042079

@@ -2012,28 +2087,38 @@ internal class ByteBufferChannel(
20122087

20132088
if (!readSuspendImpl(size)) return false
20142089

2015-
return readSuspend(size)
2090+
return readSuspendLoop(size)
20162091
}
20172092

2018-
private suspend fun readSuspendImpl(size: Int): Boolean {
2019-
if (state.capacity.availableForRead >= size) return true
2093+
private val readSuspendContinuationCache = MutableDelegateContinuation<Boolean>()
20202094

2021-
return suspendCancellableCoroutine(holdCancellability = true) { c ->
2022-
do {
2023-
if (state.capacity.availableForRead >= size) {
2024-
c.resume(true)
2025-
break
2026-
}
2095+
private fun suspensionForSize(size: Int, c: Continuation<Boolean>): Any {
2096+
do {
2097+
if (this.state.capacity.availableForRead >= size) {
2098+
c.resume(true)
2099+
break
2100+
}
20272101

2028-
closed?.let {
2029-
if (it.cause != null) {
2030-
c.resumeWithException(it.cause)
2031-
} else {
2032-
c.resume(state.capacity.flush() && state.capacity.availableForRead >= size)
2033-
}
2034-
return@suspendCancellableCoroutine
2102+
closed?.let {
2103+
if (it.cause != null) {
2104+
c.resumeWithException(it.cause)
2105+
} else {
2106+
c.resume(state.capacity.flush() && state.capacity.availableForRead >= size)
20352107
}
2036-
} while (!setContinuation({ readOp }, ReadOp, c, { closed == null && state.capacity.availableForRead < size }))
2108+
return COROUTINE_SUSPENDED
2109+
}
2110+
} while (!setContinuation({ readOp }, ReadOp, c, { closed == null && state.capacity.availableForRead < size }))
2111+
2112+
return COROUTINE_SUSPENDED
2113+
}
2114+
2115+
private suspend fun readSuspendImpl(size: Int): Boolean {
2116+
if (state.capacity.availableForRead >= size) return true
2117+
2118+
return suspendCoroutineOrReturn { raw ->
2119+
val c = readSuspendContinuationCache
2120+
suspensionForSize(size, c)
2121+
c.swap(raw)
20372122
}
20382123
}
20392124

@@ -2049,8 +2134,44 @@ internal class ByteBufferChannel(
20492134
}
20502135
}
20512136

2137+
private val writeSuspendContinuationCache = MutableDelegateContinuation<Unit>()
2138+
@Volatile
2139+
private var writeSuspensionSize: Int = 0
2140+
private val writeSuspension = { c: Continuation<Unit> ->
2141+
val size = writeSuspensionSize
2142+
2143+
do {
2144+
closed?.sendException?.let { throw it }
2145+
if (!writeSuspendPredicate(size)) {
2146+
c.resume(Unit)
2147+
break
2148+
}
2149+
} while (!setContinuation({ writeOp }, WriteOp, c, { writeSuspendPredicate(size) }))
2150+
2151+
flushImpl(1, minWriteSize = size)
2152+
2153+
COROUTINE_SUSPENDED
2154+
}
2155+
2156+
private suspend fun tryWriteSuspend(size: Int) {
2157+
if (!writeSuspendPredicate(size)) {
2158+
closed?.sendException?.let { throw it }
2159+
return
2160+
}
2161+
2162+
writeSuspensionSize = size
2163+
if (attachedJob != null) {
2164+
return suspendCoroutineOrReturn(writeSuspension)
2165+
}
2166+
2167+
return suspendCoroutineOrReturn { raw ->
2168+
val c = writeSuspendContinuationCache
2169+
writeSuspension(c)
2170+
c.swap(raw)
2171+
}
2172+
}
2173+
20522174
private suspend fun writeSuspend(size: Int) {
2053-
// println("Write suspend (enter)")
20542175
while (writeSuspendPredicate(size)) {
20552176
suspendCancellableCoroutine<Unit>(holdCancellability = true) { c ->
20562177
do {
@@ -2062,15 +2183,13 @@ internal class ByteBufferChannel(
20622183
} while (!setContinuation({ writeOp }, WriteOp, c, { writeSuspendPredicate(size) }))
20632184

20642185
flushImpl(1, minWriteSize = size)
2065-
// println("Write suspend (loop), op = ${writeOp}, state = $state, joined = $joining")
20662186
}
20672187
}
20682188

20692189
closed?.sendException?.let { throw it }
2070-
// println("Write suspend (leave)")
20712190
}
20722191

2073-
private inline fun <T, C : CancellableContinuation<T>> setContinuation(getter: () -> C?, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, C?>, continuation: C, predicate: () -> Boolean): Boolean {
2192+
private inline fun <T, C : Continuation<T>> setContinuation(getter: () -> C?, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, C?>, continuation: C, predicate: () -> Boolean): Boolean {
20742193
while (true) {
20752194
val current = getter()
20762195
if (current != null) throw IllegalStateException("Operation is already in progress")
@@ -2081,7 +2200,9 @@ internal class ByteBufferChannel(
20812200

20822201
if (updater.compareAndSet(this, null, continuation)) {
20832202
if (predicate() || !updater.compareAndSet(this, continuation, null)) {
2084-
continuation.initCancellability()
2203+
if (attachedJob == null && continuation is CancellableContinuation<*>) {
2204+
continuation.initCancellability()
2205+
}
20852206
return true
20862207
}
20872208

@@ -2144,7 +2265,8 @@ internal class ByteBufferChannel(
21442265

21452266
override fun request(skip: Int, atLeast: Int) = null
21462267

2147-
suspend override fun awaitAtLeast(n: Int) {
2268+
suspend override fun awaitAtLeast(n: Int): Boolean {
2269+
return false
21482270
}
21492271
}
21502272

0 commit comments

Comments
 (0)