@@ -14,6 +14,7 @@ import kotlinx.atomicfu.update
1414import kotlinx.coroutines.*
1515import kotlinx.coroutines.channels.Channel
1616import kotlinx.coroutines.channels.ClosedReceiveChannelException
17+ import kotlinx.coroutines.sync.Mutex
1718import kotlin.coroutines.resume
1819import kotlin.coroutines.resumeWithException
1920
@@ -35,6 +36,7 @@ internal abstract class AbstractBufferedReadChannel(
3536 private val currSegment: AtomicRef <Segment ?> = atomic(null )
3637
3738 private val readOp: AtomicRef <CancellableContinuation <Boolean >? > = atomic(null )
39+ private val readOpMutex = Mutex (locked = false )
3840
3941 private val _closed : AtomicRef <ClosedSentinel ?> = atomic(null )
4042 protected val closed: ClosedSentinel ?
@@ -52,33 +54,34 @@ internal abstract class AbstractBufferedReadChannel(
5254 get() = _availableForRead .value
5355
5456 /* *
55- * Suspend reading until at least [requested] bytes are available to read or the channel is closed.
56- * If the requested amount can be fulfilled immediately this function will return without suspension.
57+ * Suspend reading until at least one byte is available to read or the channel is closed. If there is already at
58+ * least one byte available, this function will return without suspension.
5759 */
58- protected suspend fun readSuspend (requested : Int ): Boolean {
60+ protected suspend fun readSuspend (): Boolean {
5961 // can fulfill immediately without suspension
60- if (availableForRead >= requested) return true
62+ if (availableForRead > 0 ) return true
63+
64+ readOpMutex.lock()
6165
6266 closed?.let { closed ->
6367 // if already closed - rethrow
6468 closed.cause?.let { rethrowClosed(it) }
6569
6670 // no more data is coming
67- return availableForRead >= requested
71+ readOpMutex.unlock()
72+ return availableForRead > 0
6873 }
6974
70- return suspendCancellableCoroutine { cont ->
71- setReadContinuation(cont)
75+ if (availableForRead > 0 ) {
76+ readOpMutex.unlock()
77+ return true
7278 }
73- }
7479
75- private fun setReadContinuation (cont : CancellableContinuation <Boolean >) {
76- val success = readOp.compareAndSet(null , cont)
77- check(success) { " Read operation already in progress" }
78- }
79-
80- private fun resumeRead () {
81- readOp.getAndSet(null )?.resume(true )
80+ return suspendCancellableCoroutine { cont ->
81+ val success = readOp.compareAndSet(null , cont)
82+ readOpMutex.unlock()
83+ check(success) { " Read operation already in progress" }
84+ }
8285 }
8386
8487 /* *
@@ -183,7 +186,7 @@ internal abstract class AbstractBufferedReadChannel(
183186 var remaining = length
184187
185188 do {
186- if (! readSuspend(1 )) {
189+ if (! readSuspend()) {
187190 throw ClosedReceiveChannelException (" Unexpeced EOF: expected $remaining more bytes" )
188191 }
189192
@@ -204,7 +207,7 @@ internal abstract class AbstractBufferedReadChannel(
204207 }
205208
206209 private suspend fun readAvailableSuspend (dest : ByteArray , offset : Int , length : Int ): Int {
207- if (! readSuspend(1 )) {
210+ if (! readSuspend()) {
208211 return - 1
209212 }
210213 return readAvailable(dest, offset, length)
@@ -225,11 +228,24 @@ internal abstract class AbstractBufferedReadChannel(
225228 // advertise bytes available
226229 _availableForRead .getAndAdd(bytesIn.size)
227230
228- resumeRead()
231+ readOpMutex.withSpinLock {
232+ readOp.getAndSet(null )?.resume(true )
233+ }
234+ }
235+
236+ private inline fun <T > Mutex.withSpinLock (block : () -> T ): T {
237+ while (! tryLock()) {
238+ // spin
239+ }
240+ return try {
241+ block()
242+ } finally {
243+ unlock()
244+ }
229245 }
230246
231247 override suspend fun awaitContent () {
232- readSuspend(1 )
248+ readSuspend()
233249 }
234250
235251 override fun cancel (cause : Throwable ? ): Boolean {
0 commit comments