@@ -215,9 +215,7 @@ internal class ByteBufferChannel(
215
215
}
216
216
217
217
private fun setupStateForRead (): ByteBuffer ? {
218
- if (readOp != null ) throw IllegalStateException (" Read operation is already in progress" )
219
-
220
- val (_, newState) = updateState { state ->
218
+ val newState = updateStateAndGet { state ->
221
219
when (state) {
222
220
ReadWriteBufferState .Terminated -> closed?.cause?.let { throw it } ? : return null
223
221
ReadWriteBufferState .IdleEmpty -> closed?.cause?.let { throw it } ? : return null
@@ -236,7 +234,7 @@ internal class ByteBufferChannel(
236
234
private fun restoreStateAfterRead () {
237
235
var toRelease: ReadWriteBufferState .IdleNonEmpty ? = null
238
236
239
- val (_, newState) = updateState { state ->
237
+ val newState = updateStateAndGet { state ->
240
238
toRelease?.let {
241
239
it.capacity.resetForWrite()
242
240
resumeWriteOp()
@@ -2040,17 +2038,14 @@ internal class ByteBufferChannel(
2040
2038
}
2041
2039
2042
2040
private fun writeSuspendPredicate (size : Int ): Boolean {
2043
- if (closed != null ) return false
2044
2041
val joined = joining
2045
2042
val state = state
2046
2043
val closed = closed
2047
2044
2048
- // println("writeSuspend? $joined, $state, ${state.capacity.availableForWrite}, $closed")
2049
-
2050
- return if (joined == null ) {
2051
- state.capacity.availableForWrite < size && state != = ReadWriteBufferState .IdleEmpty && closed == null
2052
- } else {
2053
- state != = ReadWriteBufferState .Terminated
2045
+ return when {
2046
+ closed != null -> false
2047
+ joined == null -> state.capacity.availableForWrite < size && state != = ReadWriteBufferState .IdleEmpty
2048
+ else -> state != = ReadWriteBufferState .Terminated
2054
2049
}
2055
2050
}
2056
2051
@@ -2109,6 +2104,15 @@ internal class ByteBufferChannel(
2109
2104
pool.recycle(buffer)
2110
2105
}
2111
2106
2107
+ private inline fun updateStateAndGet (block : (ReadWriteBufferState ) -> ReadWriteBufferState ? ): ReadWriteBufferState {
2108
+ val updater = State
2109
+ while (true ) {
2110
+ val old = state
2111
+ val newState = block(old) ? : continue
2112
+ if (old == = newState || updater.compareAndSet(this , old, newState)) return newState
2113
+ }
2114
+ }
2115
+
2112
2116
// todo: replace with atomicfu
2113
2117
private inline fun updateState (block : (ReadWriteBufferState ) -> ReadWriteBufferState ? ):
2114
2118
Pair <ReadWriteBufferState , ReadWriteBufferState > = update({ state }, State , block)
0 commit comments