Skip to content

Commit 383ebb8

Browse files
authored
fix: properly close byte streams when using CRT engine
1 parent 018cd3e commit 383ebb8

File tree

3 files changed

+58
-7
lines changed

3 files changed

+58
-7
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id": "0c737e4d-adb6-45f4-95d3-7123f3455171",
3+
"type": "bugfix",
4+
"description": "Fix occasional stream leak due to race condition in CRT engine"
5+
}

runtime/crt-util/common/src/aws/smithy/kotlin/runtime/crt/ReadChannelBodyStream.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import kotlinx.atomicfu.atomic
1515
import kotlinx.coroutines.*
1616
import kotlinx.coroutines.channels.Channel
1717
import kotlin.coroutines.CoroutineContext
18+
import kotlin.time.Duration.Companion.milliseconds
19+
20+
private val POLLING_DELAY = 100.milliseconds
1821

1922
/**
2023
* write as much of [outgoing] to [dest] as possible
@@ -41,6 +44,21 @@ public class ReadChannelBodyStream(
4144
producerJob.invokeOnCompletion { cause ->
4245
bodyChan.cancel(cause)
4346
}
47+
48+
// Poll the channel's `isClosedForRead` and complete when it's true. This works around a timing issue when the
49+
// write side of the channel finishes sending bytes but doesn't call `close` in time for the CRT's
50+
// `sendRequestBody` loop to pick it up. If CRT reads all the bytes it expects, it ceases calling
51+
// `sendRequestBody`, which risks leaving the producer job open indefinitely. This polling loop catches any
52+
// missed channel closures and ends the producer job to avoid that issue.
53+
launch(coroutineContext + CoroutineName("body-channel-watchdog")) {
54+
while (producerJob.isActive) {
55+
if (bodyChan.isClosedForRead) {
56+
producerJob.complete()
57+
return@launch
58+
}
59+
delay(POLLING_DELAY)
60+
}
61+
}
4462
}
4563

4664
// lie - CRT tries to control this via normal seek operations (e.g. when they calculate a hash for signing

runtime/protocol/http-client-engines/test-suite/common/test/aws/smithy/kotlin/runtime/http/test/UploadTest.kt

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,18 @@ package aws.smithy.kotlin.runtime.http.test
77

88
import aws.smithy.kotlin.runtime.content.ByteStream
99
import aws.smithy.kotlin.runtime.hashing.sha256
10-
import aws.smithy.kotlin.runtime.http.HttpBody
11-
import aws.smithy.kotlin.runtime.http.HttpMethod
12-
import aws.smithy.kotlin.runtime.http.HttpStatusCode
10+
import aws.smithy.kotlin.runtime.http.*
1311
import aws.smithy.kotlin.runtime.http.content.ByteArrayContent
1412
import aws.smithy.kotlin.runtime.http.request.HttpRequest
1513
import aws.smithy.kotlin.runtime.http.request.headers
1614
import aws.smithy.kotlin.runtime.http.response.complete
1715
import aws.smithy.kotlin.runtime.http.test.util.AbstractEngineTest
1816
import aws.smithy.kotlin.runtime.http.test.util.test
1917
import aws.smithy.kotlin.runtime.http.test.util.testSetup
20-
import aws.smithy.kotlin.runtime.http.toHttpBody
2118
import aws.smithy.kotlin.runtime.io.SdkByteChannel
2219
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
2320
import aws.smithy.kotlin.runtime.util.encodeToHex
24-
import kotlinx.coroutines.coroutineScope
25-
import kotlinx.coroutines.delay
26-
import kotlinx.coroutines.launch
21+
import kotlinx.coroutines.*
2722
import kotlin.test.Test
2823
import kotlin.test.assertEquals
2924

@@ -84,6 +79,39 @@ class UploadTest : AbstractEngineTest() {
8479
}
8580
}
8681

82+
@Test
83+
fun testUploadWithClosingDelay() = testEngines {
84+
test { env, client ->
85+
val data = ByteArray(16) { it.toByte() }
86+
val sha = data.sha256().encodeToHex()
87+
val ch = SdkByteChannel(autoFlush = true)
88+
val content = object : HttpBody.Streaming() {
89+
override val contentLength: Long = data.size.toLong()
90+
override fun readFrom(): SdkByteReadChannel = ch
91+
}
92+
93+
val req = HttpRequest {
94+
method = HttpMethod.POST
95+
testSetup(env)
96+
url.path = "/upload/content"
97+
body = content
98+
}
99+
100+
coroutineScope {
101+
launch {
102+
ch.writeFully(data)
103+
delay(1000)
104+
// CRT will have stopped polling by now
105+
ch.close()
106+
}
107+
val call = client.call(req)
108+
call.complete()
109+
assertEquals(HttpStatusCode.OK, call.response.status)
110+
assertEquals(sha, call.response.headers["content-sha256"])
111+
}
112+
}
113+
}
114+
87115
@Test
88116
fun testUploadWithWrappedStream() = testEngines {
89117
// test custom ByteStream behavior

0 commit comments

Comments
 (0)