Skip to content

Commit df3d15a

Browse files
authored
fix(rt): enforce only once shutdown logic for crt engine connections (#497)
Fixes the segfault that can happen when an exception is handled twice leading to a connection being closed after it has been free'd. This change refactors the handling of the connection close logic to be handled in a single place regardless of why the connection is being closed.
1 parent f123c6b commit df3d15a

File tree

5 files changed

+48
-44
lines changed

5 files changed

+48
-44
lines changed

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,36 +76,30 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE
7676
override suspend fun roundTrip(request: HttpRequest): HttpCall {
7777
val callContext = callContext()
7878
val manager = getManagerForUri(request.uri)
79+
80+
// LIFETIME: connection will be released back to the pool/manager when
81+
// the response completes OR on exception (both handled by the completion handler registered on the stream
82+
// handler)
7983
val conn = withTimeoutOrNull(config.connectionAcquireTimeout) {
8084
manager.acquireConnection()
8185
} ?: throw ClientException("timed out waiting for an HTTP connection to be acquired from the pool")
8286

83-
try {
84-
val reqTime = Instant.now()
85-
val engineRequest = request.toCrtRequest(callContext)
86-
87-
// LIFETIME: connection will be released back to the pool/manager when
88-
// the response completes OR on exception
89-
val respHandler = SdkStreamResponseHandler(conn)
90-
callContext.job.invokeOnCompletion {
91-
// ensures the stream is driven to completion regardless of what the downstream consumer does
92-
respHandler.complete()
93-
}
94-
95-
val stream = conn.makeRequest(engineRequest, respHandler)
96-
stream.activate()
97-
98-
val resp = respHandler.waitForResponse()
99-
100-
return HttpCall(request, resp, reqTime, Instant.now(), callContext)
101-
} catch (ex: Exception) {
102-
try {
103-
manager.releaseConnection(conn)
104-
} catch (ex2: Exception) {
105-
ex.addSuppressed(ex2)
106-
}
107-
throw ex
87+
val respHandler = SdkStreamResponseHandler(conn)
88+
callContext.job.invokeOnCompletion {
89+
logger.trace { "completing handler; cause=$it" }
90+
// ensures the stream is driven to completion regardless of what the downstream consumer does
91+
respHandler.complete()
10892
}
93+
94+
val reqTime = Instant.now()
95+
val engineRequest = request.toCrtRequest(callContext)
96+
97+
val stream = conn.makeRequest(engineRequest, respHandler)
98+
stream.activate()
99+
100+
val resp = respHandler.waitForResponse()
101+
102+
return HttpCall(request, resp, reqTime, Instant.now(), callContext)
109103
}
110104

111105
override fun shutdown() {

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ public class CrtHttpEngineConfig private constructor(builder: Builder) : HttpCli
1717
* The default engine config. Most clients should use this.
1818
*/
1919
public val Default: CrtHttpEngineConfig = CrtHttpEngineConfig(Builder())
20+
21+
public operator fun invoke(block: Builder.() -> Unit): CrtHttpEngineConfig =
22+
Builder().apply(block).build()
2023
}
2124

2225
/**

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import aws.smithy.kotlin.runtime.http.*
1313
import aws.smithy.kotlin.runtime.http.HeadersBuilder
1414
import aws.smithy.kotlin.runtime.http.response.HttpResponse
1515
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
16+
import aws.smithy.kotlin.runtime.logging.Logger
1617
import kotlinx.atomicfu.locks.reentrantLock
1718
import kotlinx.atomicfu.locks.withLock
1819
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -30,13 +31,17 @@ internal class SdkStreamResponseHandler(
3031
// There is no great way to do that currently without either (1) closing the connection or (2) throwing an
3132
// exception from a callback such that AWS_OP_ERROR is returned. Wait for HttpStream to have explicit cancellation
3233

34+
private val logger = Logger.getLogger<SdkStreamResponseHandler>()
3335
private val responseReady = Channel<HttpResponse>(1)
3436
private val headers = HeadersBuilder()
3537

3638
private var sdkBody: BufferedReadChannel? = null
3739

38-
private val lock = reentrantLock()
40+
private val lock = reentrantLock() // protects crtStream and cancelled state
3941
private var crtStream: HttpStream? = null
42+
// if the (coroutine) job is completed before the stream's onResponseComplete callback is
43+
// invoked (for any reason) we consider the stream "cancelled"
44+
private var cancelled = false
4045

4146
private val Int.isMainHeadersBlock: Boolean
4247
get() = when (this) {
@@ -115,7 +120,13 @@ internal class SdkStreamResponseHandler(
115120
}
116121

117122
override fun onResponseBody(stream: HttpStream, bodyBytesIn: Buffer): Int {
118-
lock.withLock { crtStream = stream }
123+
val isCancelled = lock.withLock {
124+
crtStream = stream
125+
cancelled
126+
}
127+
128+
// short circuit, stop buffering data and discard remaining incoming bytes
129+
if (isCancelled) return bodyBytesIn.len
119130

120131
// we should have created a response channel if we expected a body
121132
val sdkRespChan = checkNotNull(sdkBody) { "unexpected response body" }
@@ -134,10 +145,6 @@ internal class SdkStreamResponseHandler(
134145
streamCompleted = true
135146
}
136147

137-
// release it back to the pool, this is safe to do now since the body (and any other response data)
138-
// has been copied to buffers we own by now
139-
conn.close()
140-
141148
// close the body channel
142149
if (errorCode != 0) {
143150
val errorDescription = CRT.errorString(errorCode)
@@ -162,13 +169,19 @@ internal class SdkStreamResponseHandler(
162169
internal fun complete() {
163170
// We have no way of cancelling the stream, we have to drive it to exhaustion OR close the connection.
164171
// At this point we know it's safe to release resources so if the stream hasn't completed yet
165-
// we forcefully close the connection. This can happen when the stream's window is full and it's waiting
172+
// we forcefully shutdown the connection. This can happen when the stream's window is full and it's waiting
166173
// on the window to be incremented to proceed (i.e. the user didn't consume the stream for whatever reason
167-
// and more data is pending arrival).
168-
val forceClose = lock.withLock { !streamCompleted }
174+
// and more data is pending arrival). It can also happen if the coroutine for this request is cancelled
175+
// before onResponseComplete fires.
176+
lock.withLock {
177+
val forceClose = !streamCompleted
178+
179+
if (forceClose) {
180+
logger.debug { "stream did not complete before job, forcing connection shutdown! handler=$this; conn=$conn; stream=$crtStream" }
181+
conn.shutdown()
182+
cancelled = true
183+
}
169184

170-
if (forceClose) {
171-
conn.shutdown()
172185
conn.close()
173186
}
174187
}

aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ class SdkStreamResponseHandlerTest {
4949
assertEquals(HttpStatusCode.OK, resp.status)
5050

5151
assertTrue(resp.body is HttpBody.Empty)
52+
handler.onResponseComplete(stream, 0)
5253

5354
assertFalse(mockConn.isClosed)
54-
handler.onResponseComplete(stream, 0)
55+
handler.complete()
5556
assertTrue(mockConn.isClosed)
5657
}
5758

@@ -65,7 +66,6 @@ class SdkStreamResponseHandlerTest {
6566

6667
val resp = handler.waitForResponse()
6768
assertEquals(HttpStatusCode.OK, resp.status)
68-
assertTrue(mockConn.isClosed)
6969
}
7070

7171
@Test
@@ -80,8 +80,6 @@ class SdkStreamResponseHandlerTest {
8080
assertFails {
8181
handler.waitForResponse()
8282
}
83-
84-
assertTrue(mockConn.isClosed)
8583
}
8684

8785
@Test
@@ -107,7 +105,6 @@ class SdkStreamResponseHandlerTest {
107105

108106
assertFalse(mockConn.isClosed)
109107
handler.onResponseComplete(stream, 0)
110-
assertTrue(mockConn.isClosed)
111108
assertTrue(respChan.isClosedForWrite)
112109
}
113110

@@ -134,7 +131,6 @@ class SdkStreamResponseHandlerTest {
134131
assertTrue(resp.body is HttpBody.Streaming)
135132
val respChan = (resp.body as HttpBody.Streaming).readFrom()
136133

137-
assertTrue(mockConn.isClosed)
138134
assertTrue(respChan.isClosedForWrite)
139135

140136
assertEquals(data, respChan.readRemaining().decodeToString())

aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/AsyncStressTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import aws.smithy.kotlin.runtime.http.request.url
1414
import aws.smithy.kotlin.runtime.http.response.complete
1515
import aws.smithy.kotlin.runtime.http.sdkHttpClient
1616
import aws.smithy.kotlin.runtime.httptest.TestWithLocalServer
17-
import aws.smithy.kotlin.runtime.testing.IgnoreWindows
1817
import io.ktor.application.*
1918
import io.ktor.response.*
2019
import io.ktor.routing.*
@@ -71,7 +70,6 @@ class AsyncStressTest : TestWithLocalServer() {
7170
}
7271
}
7372

74-
@IgnoreWindows("https://github.com/awslabs/aws-sdk-kotlin/issues/413")
7573
@OptIn(ExperimentalTime::class)
7674
@Test
7775
fun testStreamNotConsumed() = runSuspendTest {

0 commit comments

Comments
 (0)