Skip to content

Commit 5c1bc59

Browse files
authored
fix(rt): propagate crt stream errors to response body consumer (#510)
1 parent 50a35d3 commit 5c1bc59

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ internal abstract class AbstractBufferedReadChannel(
235235
val success = _closed.compareAndSet(null, ClosedSentinel(cause))
236236
if (!success) return false
237237

238-
segments.close()
238+
segments.close(cause)
239239

240240
readOp.getAndSet(null)?.let { cont ->
241241
if (cause != null) {

aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ package aws.sdk.kotlin.runtime.http.engine.crt
88
import aws.sdk.kotlin.crt.http.*
99
import aws.sdk.kotlin.crt.io.byteArrayBuffer
1010
import aws.sdk.kotlin.runtime.testing.runSuspendTest
11+
import aws.smithy.kotlin.runtime.ClientException
1112
import aws.smithy.kotlin.runtime.http.HttpBody
1213
import aws.smithy.kotlin.runtime.http.HttpStatusCode
14+
import io.kotest.matchers.string.shouldContain
1315
import kotlinx.coroutines.launch
1416
import kotlin.test.*
1517

@@ -135,4 +137,33 @@ class SdkStreamResponseHandlerTest {
135137

136138
assertEquals(data, respChan.readRemaining().decodeToString())
137139
}
140+
141+
@Test
142+
fun testStreamError(): Unit = runSuspendTest {
143+
val handler = SdkStreamResponseHandler(mockConn)
144+
val stream = MockHttpStream(200)
145+
val data = "foo bar"
146+
val socketClosedEc = 1051
147+
launch {
148+
val headers = listOf(
149+
HttpHeader("Content-Length", "${data.length}")
150+
)
151+
handler.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.blockType, headers)
152+
handler.onResponseHeadersDone(stream, HttpHeaderBlock.MAIN.blockType)
153+
handler.onResponseBody(stream, byteArrayBuffer("foo".encodeToByteArray()))
154+
handler.onResponseComplete(stream, socketClosedEc)
155+
}
156+
157+
// should be signalled as soon as headers are available
158+
val resp = handler.waitForResponse()
159+
assertEquals(HttpStatusCode.OK, resp.status)
160+
161+
assertEquals(data.length.toLong(), resp.body.contentLength)
162+
val respChan = (resp.body as HttpBody.Streaming).readFrom()
163+
164+
assertTrue(respChan.isClosedForWrite)
165+
assertFailsWith<ClientException> {
166+
respChan.readRemaining()
167+
}.message.shouldContain("CrtHttpEngine::response failed: ec=$socketClosedEc")
168+
}
138169
}

0 commit comments

Comments
 (0)