Skip to content

Commit 4bf1167

Browse files
authored
fix: improve detection of available read bytes to avoid hang (#535)
1 parent 2c79886 commit 4bf1167

File tree

2 files changed

+36
-20
lines changed

2 files changed

+36
-20
lines changed

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import kotlinx.atomicfu.update
1414
import kotlinx.coroutines.*
1515
import kotlinx.coroutines.channels.Channel
1616
import kotlinx.coroutines.channels.ClosedReceiveChannelException
17+
import kotlinx.coroutines.sync.Mutex
1718
import kotlin.coroutines.resume
1819
import kotlin.coroutines.resumeWithException
1920

@@ -35,6 +36,7 @@ internal abstract class AbstractBufferedReadChannel(
3536
private val currSegment: AtomicRef<Segment?> = atomic(null)
3637

3738
private val readOp: AtomicRef<CancellableContinuation<Boolean>?> = atomic(null)
39+
private val readOpMutex = Mutex(locked = false)
3840

3941
private val _closed: AtomicRef<ClosedSentinel?> = atomic(null)
4042
protected val closed: ClosedSentinel?
@@ -52,33 +54,34 @@ internal abstract class AbstractBufferedReadChannel(
5254
get() = _availableForRead.value
5355

5456
/**
55-
* Suspend reading until at least [requested] bytes are available to read or the channel is closed.
56-
* If the requested amount can be fulfilled immediately this function will return without suspension.
57+
* Suspend reading until at least one byte is available to read or the channel is closed. If there is already at
58+
* least one byte available, this function will return without suspension.
5759
*/
58-
protected suspend fun readSuspend(requested: Int): Boolean {
60+
protected suspend fun readSuspend(): Boolean {
5961
// can fulfill immediately without suspension
60-
if (availableForRead >= requested) return true
62+
if (availableForRead > 0) return true
63+
64+
readOpMutex.lock()
6165

6266
closed?.let { closed ->
6367
// if already closed - rethrow
6468
closed.cause?.let { rethrowClosed(it) }
6569

6670
// no more data is coming
67-
return availableForRead >= requested
71+
readOpMutex.unlock()
72+
return availableForRead > 0
6873
}
6974

70-
return suspendCancellableCoroutine { cont ->
71-
setReadContinuation(cont)
75+
if (availableForRead > 0) {
76+
readOpMutex.unlock()
77+
return true
7278
}
73-
}
7479

75-
private fun setReadContinuation(cont: CancellableContinuation<Boolean>) {
76-
val success = readOp.compareAndSet(null, cont)
77-
check(success) { "Read operation already in progress" }
78-
}
79-
80-
private fun resumeRead() {
81-
readOp.getAndSet(null)?.resume(true)
80+
return suspendCancellableCoroutine { cont ->
81+
val success = readOp.compareAndSet(null, cont)
82+
readOpMutex.unlock()
83+
check(success) { "Read operation already in progress" }
84+
}
8285
}
8386

8487
/**
@@ -183,7 +186,7 @@ internal abstract class AbstractBufferedReadChannel(
183186
var remaining = length
184187

185188
do {
186-
if (!readSuspend(1)) {
189+
if (!readSuspend()) {
187190
throw ClosedReceiveChannelException("Unexpeced EOF: expected $remaining more bytes")
188191
}
189192

@@ -204,7 +207,7 @@ internal abstract class AbstractBufferedReadChannel(
204207
}
205208

206209
private suspend fun readAvailableSuspend(dest: ByteArray, offset: Int, length: Int): Int {
207-
if (!readSuspend(1)) {
210+
if (!readSuspend()) {
208211
return -1
209212
}
210213
return readAvailable(dest, offset, length)
@@ -225,11 +228,24 @@ internal abstract class AbstractBufferedReadChannel(
225228
// advertise bytes available
226229
_availableForRead.getAndAdd(bytesIn.size)
227230

228-
resumeRead()
231+
readOpMutex.withSpinLock {
232+
readOp.getAndSet(null)?.resume(true)
233+
}
234+
}
235+
236+
private inline fun <T> Mutex.withSpinLock(block: () -> T): T {
237+
while (!tryLock()) {
238+
// spin
239+
}
240+
return try {
241+
block()
242+
} finally {
243+
unlock()
244+
}
229245
}
230246

231247
override suspend fun awaitContent() {
232-
readSuspend(1)
248+
readSuspend()
233249
}
234250

235251
override fun cancel(cause: Throwable?): Boolean {

aws-runtime/http-client-engine-crt/jvm/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelJVM.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ internal class BufferedReadChannelImpl(
3131
}
3232

3333
private suspend fun readAvailableSuspend(dest: ByteBuffer): Int {
34-
if (!readSuspend(1)) {
34+
if (!readSuspend()) {
3535
return -1
3636
}
3737
return readAvailable(dest)

0 commit comments

Comments
 (0)