Skip to content

Commit e1ea10e

Browse files
authored
fix(rt): add special-casing for CrtHttpEngine aws-chunked requests (#760)
1 parent 23a4f71 commit e1ea10e

File tree

7 files changed

+216
-5
lines changed

7 files changed

+216
-5
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "f0ec11be-3875-498a-8e33-74c49416f3b8",
3+
"type": "bugfix",
4+
"description": "Fix `aws-chunked` requests in the CRT HTTP engine",
5+
"issues": [
6+
"https://github.com/awslabs/smithy-kotlin/issues/759"
7+
]
8+
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ kotlinLoggingVersion=2.1.21
4444
slf4jVersion=1.7.36
4545

4646
# crt
47-
crtKotlinVersion=0.6.6
47+
crtKotlinVersion=0.6.7-SNAPSHOT

runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@ import aws.smithy.kotlin.runtime.http.engine.callContext
1717
import aws.smithy.kotlin.runtime.http.operation.getLogger
1818
import aws.smithy.kotlin.runtime.http.request.HttpRequest
1919
import aws.smithy.kotlin.runtime.http.response.HttpCall
20+
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
2021
import aws.smithy.kotlin.runtime.logging.Logger
2122
import aws.smithy.kotlin.runtime.time.Instant
2223
import kotlinx.coroutines.job
2324
import kotlinx.coroutines.sync.Mutex
2425
import kotlinx.coroutines.sync.withLock
26+
import kotlinx.coroutines.withContext
2527
import kotlinx.coroutines.withTimeoutOrNull
2628

2729
internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024
30+
internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024
2831

2932
/**
3033
* [HttpClientEngine] based on the AWS Common Runtime HTTP client
@@ -100,6 +103,12 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE
100103
val stream = conn.makeRequest(engineRequest, respHandler)
101104
stream.activate()
102105

106+
if (request.isChunked) {
107+
withContext(SdkDispatchers.IO) {
108+
stream.sendChunkedBody(request.body)
109+
}
110+
}
111+
103112
val resp = respHandler.waitForResponse()
104113

105114
return HttpCall(request, resp, reqTime, Instant.now(), callContext)

runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ package aws.smithy.kotlin.runtime.http.engine.crt
77

88
import aws.sdk.kotlin.crt.http.HeadersBuilder
99
import aws.sdk.kotlin.crt.http.HttpRequestBodyStream
10+
import aws.sdk.kotlin.crt.http.HttpStream
1011
import aws.sdk.kotlin.crt.io.Protocol
1112
import aws.sdk.kotlin.crt.io.Uri
1213
import aws.sdk.kotlin.crt.io.UserInfo
1314
import aws.smithy.kotlin.runtime.crt.ReadChannelBodyStream
1415
import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream
1516
import aws.smithy.kotlin.runtime.http.HttpBody
1617
import aws.smithy.kotlin.runtime.http.request.HttpRequest
18+
import aws.smithy.kotlin.runtime.io.SdkBuffer
19+
import aws.smithy.kotlin.runtime.io.buffer
20+
import aws.smithy.kotlin.runtime.io.readToByteArray
1721
import kotlinx.coroutines.job
1822
import kotlin.coroutines.CoroutineContext
1923

@@ -34,7 +38,7 @@ internal val HttpRequest.uri: Uri
3438
internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest {
3539
val body = this.body
3640
check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" }
37-
val bodyStream = when (body) {
41+
val bodyStream = if (isChunked) null else when (body) {
3842
is HttpBody.Empty -> null
3943
is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes())
4044
is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext)
@@ -61,3 +65,47 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko
6165

6266
return aws.sdk.kotlin.crt.http.HttpRequest(method.name, url.encodedPath, crtHeaders.build(), bodyStream)
6367
}
68+
69+
/**
70+
* @return whether this HttpRequest is a chunked request.
71+
* Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`,
72+
* and the body is either [HttpBody.SourceContent] or [HttpBody.ChannelContent].
73+
*/
74+
internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) &&
75+
headers.contains("Transfer-Encoding", "chunked")
76+
77+
/**
78+
* Send a chunked body using the CRT writeChunk bindings.
79+
* @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent]
80+
*/
81+
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
82+
when (body) {
83+
is HttpBody.SourceContent -> {
84+
val source = body.readFrom()
85+
val bufferedSource = source.buffer()
86+
87+
while (!bufferedSource.exhausted()) {
88+
bufferedSource.request(CHUNK_BUFFER_SIZE)
89+
writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted())
90+
}
91+
}
92+
is HttpBody.ChannelContent -> {
93+
val chan = body.readFrom()
94+
var buffer = SdkBuffer()
95+
val nextBuffer = SdkBuffer()
96+
var sentFirstChunk = false
97+
98+
while (!chan.isClosedForRead) {
99+
val bytesRead = chan.read(buffer, CHUNK_BUFFER_SIZE)
100+
if (!sentFirstChunk && bytesRead == -1L) { throw RuntimeException("CRT does not support empty chunked bodies.") }
101+
102+
val isFinalChunk = chan.read(nextBuffer, CHUNK_BUFFER_SIZE) == -1L
103+
104+
writeChunk(buffer.readToByteArray(), isFinalChunk)
105+
if (isFinalChunk) break else buffer = nextBuffer
106+
sentFirstChunk = true
107+
}
108+
}
109+
else -> error("sendChunkedBody should not be called for non-chunked body types")
110+
}
111+
}

runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import aws.smithy.kotlin.runtime.http.*
1111
import aws.smithy.kotlin.runtime.http.content.ByteArrayContent
1212
import aws.smithy.kotlin.runtime.http.request.HttpRequest
1313
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
14+
import aws.smithy.kotlin.runtime.io.SdkSource
15+
import aws.smithy.kotlin.runtime.io.source
1416
import kotlinx.coroutines.Job
1517
import kotlinx.coroutines.cancel
1618
import kotlin.coroutines.EmptyCoroutineContext
17-
import kotlin.test.Test
18-
import kotlin.test.assertEquals
19-
import kotlin.test.assertFalse
19+
import kotlin.test.*
2020

2121
class RequestConversionTest {
2222
private fun byteStreamFromContents(contents: String): ByteStream =
@@ -103,4 +103,44 @@ class RequestConversionTest {
103103
val crtRequest = request.toCrtRequest(testContext)
104104
assertFalse(crtRequest.headers.contains("Content-Length"))
105105
}
106+
107+
@Test
108+
fun testEngineSetsNullBodyForChannelContentChunkedRequests() {
109+
val testData = ByteArray(1024) { 0 }
110+
111+
val request = HttpRequest(
112+
HttpMethod.POST,
113+
Url.parse("https://test.aws.com?foo=bar"),
114+
Headers.invoke { append("Transfer-Encoding", "chunked") },
115+
object : HttpBody.ChannelContent() {
116+
override val contentLength: Long = testData.size.toLong()
117+
override fun readFrom(): SdkByteReadChannel = SdkByteReadChannel(testData)
118+
},
119+
)
120+
121+
val testContext = EmptyCoroutineContext + Job()
122+
val crtRequest = request.toCrtRequest(testContext)
123+
assertNotNull(request.body)
124+
assertNull(crtRequest.body)
125+
}
126+
127+
@Test
128+
fun testEngineSetsNullBodyForSourceContentChunkedRequests() {
129+
val testData = ByteArray(1024) { 0 }
130+
131+
val request = HttpRequest(
132+
HttpMethod.POST,
133+
Url.parse("https://test.aws.com?foo=bar"),
134+
Headers.invoke { append("Transfer-Encoding", "chunked") },
135+
object : HttpBody.SourceContent() {
136+
override val contentLength: Long = testData.size.toLong()
137+
override fun readFrom(): SdkSource = testData.source()
138+
},
139+
)
140+
141+
val testContext = EmptyCoroutineContext + Job()
142+
val crtRequest = request.toCrtRequest(testContext)
143+
assertNotNull(request.body)
144+
assertNull(crtRequest.body)
145+
}
106146
}

runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class SdkStreamResponseHandlerTest {
2828
override fun activate() {}
2929
override fun close() { closed = true }
3030
override fun incrementWindow(size: Int) {}
31+
override fun writeChunk(chunkData: ByteArray, isFinalChunk: Boolean) {}
3132
}
3233

3334
private class MockHttpClientConnection : HttpClientConnection {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package aws.smithy.kotlin.runtime.http.engine.crt
7+
8+
import aws.sdk.kotlin.crt.http.HttpStream
9+
import aws.smithy.kotlin.runtime.http.toHttpBody
10+
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
11+
import aws.smithy.kotlin.runtime.io.readToByteArray
12+
import aws.smithy.kotlin.runtime.io.source
13+
import kotlinx.coroutines.ExperimentalCoroutinesApi
14+
import kotlinx.coroutines.test.runTest
15+
import kotlin.test.*
16+
17+
@OptIn(ExperimentalCoroutinesApi::class)
18+
class SendChunkedBodyTest {
19+
private class MockHttpStream(override val responseStatusCode: Int) : HttpStream {
20+
var closed: Boolean = false
21+
var numChunksWritten = 0
22+
override fun activate() {}
23+
override fun close() { closed = true }
24+
override fun incrementWindow(size: Int) {}
25+
override fun writeChunk(chunkData: ByteArray, isFinalChunk: Boolean) { numChunksWritten += 1 }
26+
}
27+
28+
@Test
29+
fun testSourceContent() = runTest {
30+
val stream = MockHttpStream(200)
31+
32+
val chunkedBytes = """
33+
100;chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(256)}\r\n\r\n
34+
""".trimIndent().toByteArray()
35+
36+
val source = chunkedBytes.source()
37+
38+
stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()))
39+
40+
// source should be fully consumed with 1 chunk written
41+
assertEquals(0, source.readToByteArray().size)
42+
assertEquals(1, stream.numChunksWritten)
43+
}
44+
45+
@Test
46+
fun testChannelContentMultipleChunks() = runTest {
47+
val stream = MockHttpStream(200)
48+
49+
val chunkSize = (CHUNK_BUFFER_SIZE * 5).toInt()
50+
51+
val chunkedBytes = """
52+
${chunkSize.toString(16)};chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(chunkSize)}\r\n\r\n
53+
""".trimIndent().toByteArray()
54+
55+
val source = chunkedBytes.source()
56+
57+
stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()))
58+
59+
// source should be fully consumed
60+
assertEquals(0, source.readToByteArray().size)
61+
62+
// there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are
63+
assertTrue(stream.numChunksWritten > 1)
64+
}
65+
66+
@Test
67+
fun testChannelContent() = runTest {
68+
val stream = MockHttpStream(200)
69+
70+
val chunkedBytes = """
71+
100;chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(256)}\r\n\r\n
72+
""".trimIndent().toByteArray()
73+
74+
val channel = SdkByteReadChannel(chunkedBytes)
75+
76+
stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()))
77+
78+
// channel should be fully consumed with 1 chunk written
79+
assertEquals(0, channel.availableForRead)
80+
assertTrue(channel.isClosedForRead)
81+
assertEquals(1, stream.numChunksWritten)
82+
}
83+
84+
@Test
85+
fun testSourceContentMultipleChunks() = runTest {
86+
val stream = MockHttpStream(200)
87+
88+
val chunkSize = (CHUNK_BUFFER_SIZE * 5).toInt()
89+
90+
val chunkedBytes = """
91+
${chunkSize.toString(16)};chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(chunkSize)}\r\n\r\n
92+
""".trimIndent().toByteArray()
93+
94+
val channel = SdkByteReadChannel(chunkedBytes)
95+
96+
stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()))
97+
98+
// source should be fully consumed
99+
assertEquals(0, channel.availableForRead)
100+
assertTrue(channel.isClosedForRead)
101+
102+
// there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are
103+
assertTrue(stream.numChunksWritten > 1)
104+
}
105+
}

0 commit comments

Comments
 (0)