Skip to content

Commit 8255143

Browse files
authored
fix: ensure CRT doesn't starve the dispatcher (#294)
1 parent b4c12e6 commit 8255143

File tree

3 files changed

+47
-36
lines changed

3 files changed

+47
-36
lines changed

aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,12 @@ public class ReadChannelBodyStream(
3535
override val coroutineContext: CoroutineContext = callContext + producerJob
3636

3737
private val currBuffer = atomic<SdkBuffer?>(null)
38-
private val bufferChan = Channel<SdkBuffer>(1)
38+
private val bufferChan = Channel<SdkBuffer>(Channel.UNLIMITED)
3939

4040
init {
4141
producerJob.invokeOnCompletion { cause ->
4242
bodyChan.cancel(cause)
4343
}
44-
45-
// launch a coroutine to fill the buffer channel
46-
proxyRequestBody()
4744
}
4845

4946
// lie - CRT tries to control this via normal seek operations (e.g. when they calculate a hash for signing
@@ -52,16 +49,51 @@ public class ReadChannelBodyStream(
5249
// and handle these concerns.
5350
override fun resetPosition(): Boolean = true
5451

55-
@OptIn(ExperimentalCoroutinesApi::class)
5652
override fun sendRequestBody(buffer: MutableBuffer): Boolean {
53+
return doSendRequestBody(buffer).also { if (it) producerJob.complete() }
54+
}
55+
56+
@OptIn(ExperimentalCoroutinesApi::class)
57+
private fun doSendRequestBody(buffer: MutableBuffer): Boolean {
58+
// ensure the request context hasn't been cancelled
59+
callContext.ensureActive()
5760
var outgoing = currBuffer.getAndSet(null) ?: bufferChan.tryReceive().getOrNull()
5861

62+
if (bodyChan.availableForRead > 0 && outgoing == null) {
63+
// NOTE: It is critical that the coroutine launched doesn't actually suspend because it will never
64+
// get a chance to resume. The CRT will consume the dispatcher thread until the data has been read
65+
// completely. We could launch one of the coroutines into a different dispatcher but this won't work
66+
// on platforms (e.g. JS) that don't have multiple threads. Essentially the CRT will starve
67+
// the dispatcher and not allow other coroutines to make progress.
68+
// see: https://github.com/awslabs/aws-sdk-kotlin/issues/282
69+
//
70+
// TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version
71+
// see: https://youtrack.jetbrains.com/issue/KTOR-2772
72+
//
73+
// To get around this, if there is data to read we launch a coroutine UNDISPATCHED so that it runs
74+
// immediately in the current thread. The coroutine will fill the buffer but won't suspend because
75+
// we know data is available.
76+
launch(start = CoroutineStart.UNDISPATCHED) {
77+
val sdkBuffer = SdkBuffer(bodyChan.availableForRead)
78+
bodyChan.readAvailable(sdkBuffer)
79+
bufferChan.send(sdkBuffer)
80+
}.invokeOnCompletion { cause ->
81+
if (cause != null) {
82+
producerJob.completeExceptionally(cause)
83+
bufferChan.close(cause)
84+
}
85+
}
86+
}
87+
88+
if (bodyChan.availableForRead == 0 && bodyChan.isClosedForRead) {
89+
bufferChan.close()
90+
}
91+
5992
if (outgoing == null) {
6093
if (bufferChan.isClosedForReceive) {
6194
return true
6295
}
63-
// ensure the request context hasn't been cancelled
64-
callContext.ensureActive()
96+
6597
outgoing = bufferChan.tryReceive().getOrNull() ?: return false
6698
}
6799

@@ -73,29 +105,4 @@ public class ReadChannelBodyStream(
73105

74106
return bufferChan.isClosedForReceive && currBuffer.value == null
75107
}
76-
77-
private fun proxyRequestBody() {
78-
// TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version
79-
// see: https://youtrack.jetbrains.com/issue/KTOR-2772
80-
val job = launch(start = CoroutineStart.UNDISPATCHED) {
81-
while (!bodyChan.isClosedForRead) {
82-
bodyChan.awaitContent()
83-
if (bodyChan.isClosedForRead) return@launch
84-
85-
// TODO - we could pool these
86-
val buffer = SdkBuffer(bodyChan.availableForRead)
87-
bodyChan.readAvailable(buffer)
88-
bufferChan.send(buffer)
89-
}
90-
}
91-
92-
job.invokeOnCompletion { cause ->
93-
bufferChan.close(cause)
94-
if (cause != null) {
95-
producerJob.completeExceptionally(cause)
96-
} else {
97-
producerJob.complete()
98-
}
99-
}
100-
}
101108
}

aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,17 @@ class ReadChannelBodyStreamTest {
3434
}
3535

3636
@Test
37-
fun testCancellation() = runSuspendTest {
37+
fun testCancellation(): Unit = runSuspendTest {
3838
val chan = SdkByteChannel()
3939
val job = Job()
4040
val stream = ReadChannelBodyStream(chan, coroutineContext + job)
41-
yield()
4241

4342
job.cancel()
44-
yield()
4543

4644
val (sendBuffer, _) = mutableBuffer(16)
47-
assertTrue(stream.sendRequestBody(sendBuffer))
45+
assertFailsWith<CancellationException> {
46+
stream.sendRequestBody(sendBuffer)
47+
}
4848
}
4949

5050
@Test

services/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ subprojects {
104104
classpath = compileDependencyFiles + runtimeDependencyFiles
105105
testClassesDirs = output.classesDirs
106106
useJUnitPlatform()
107+
testLogging {
108+
events("passed", "skipped", "failed")
109+
showStandardStreams = true
110+
}
107111
}
108112
}
109113
}

0 commit comments

Comments
 (0)