Skip to content

Commit d468502

Browse files
authored
refactor!: remove maxConnections from generic HTTP config and fix enforcement in CRT engine (#904)
1 parent c9c284a commit d468502

File tree

11 files changed

+169
-108
lines changed

11 files changed

+169
-108
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"id": "3b28cba8-790f-4997-aeeb-0aa5f2ec6d43",
3+
"type": "bugfix",
4+
"description": "Enforce `maxConnections` for CRT HTTP engine",
5+
"issues": ["awslabs/smithy-kotlin#880"]
6+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id": "8e9156e5-eabc-4980-9423-2276eb3d8974",
3+
"type": "misc",
4+
"description": "**BREAKING**: Remove `maxConnections` from generic HTTP engine config since it can't be enforced for OkHttp."
5+
}

runtime/protocol/http-client-engines/http-client-engine-crt/api/http-client-engine-crt.api

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public final class aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig
1717
public synthetic fun <init> (Laws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig$Builder;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
1818
public final fun getClientBootstrap ()Laws/sdk/kotlin/crt/io/ClientBootstrap;
1919
public final fun getInitialWindowSizeBytes ()I
20+
public final fun getMaxConnections-pVg5ArA ()I
2021
public final fun setClientBootstrap (Laws/sdk/kotlin/crt/io/ClientBootstrap;)V
2122
public fun toBuilderApplicator ()Lkotlin/jvm/functions/Function1;
2223
}
@@ -25,8 +26,10 @@ public final class aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig
2526
public fun <init> ()V
2627
public final fun getClientBootstrap ()Laws/sdk/kotlin/crt/io/ClientBootstrap;
2728
public final fun getInitialWindowSizeBytes ()I
29+
public final fun getMaxConnections-pVg5ArA ()I
2830
public final fun setClientBootstrap (Laws/sdk/kotlin/crt/io/ClientBootstrap;)V
2931
public final fun setInitialWindowSizeBytes (I)V
32+
public final fun setMaxConnections-WZ4Q5Ns (I)V
3033
}
3134

3235
public final class aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig$Companion {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.smithy.kotlin.runtime.http.engine.crt
6+
7+
import aws.sdk.kotlin.crt.http.*
8+
import aws.sdk.kotlin.crt.io.SocketOptions
9+
import aws.sdk.kotlin.crt.io.TlsContext
10+
import aws.sdk.kotlin.crt.io.TlsContextOptionsBuilder
11+
import aws.sdk.kotlin.crt.io.Uri
12+
import aws.smithy.kotlin.runtime.config.TlsVersion
13+
import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
14+
import aws.smithy.kotlin.runtime.http.HttpErrorCode
15+
import aws.smithy.kotlin.runtime.http.HttpException
16+
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
17+
import aws.smithy.kotlin.runtime.http.request.HttpRequest
18+
import aws.smithy.kotlin.runtime.io.Closeable
19+
import kotlinx.coroutines.TimeoutCancellationException
20+
import kotlinx.coroutines.sync.Mutex
21+
import kotlinx.coroutines.sync.Semaphore
22+
import kotlinx.coroutines.sync.withLock
23+
import kotlinx.coroutines.withTimeout
24+
import aws.sdk.kotlin.crt.io.TlsContext as CrtTlsContext
25+
import aws.sdk.kotlin.crt.io.TlsVersion as CrtTlsVersion
26+
import aws.smithy.kotlin.runtime.config.TlsVersion as SdkTlsVersion
27+
28+
internal class ConnectionManager(
29+
private val config: CrtHttpEngineConfig,
30+
) : Closeable {
31+
private val leases = Semaphore(config.maxConnections.toInt())
32+
33+
private val crtTlsContext: TlsContext = TlsContextOptionsBuilder()
34+
.apply {
35+
verifyPeer = true
36+
alpn = config.tlsContext.alpn.joinToString(separator = ";") { it.protocolId }
37+
minTlsVersion = toCrtTlsVersion(config.tlsContext.minVersion)
38+
}
39+
.build()
40+
.let(::CrtTlsContext)
41+
42+
private val options = HttpClientConnectionManagerOptionsBuilder().apply {
43+
clientBootstrap = config.clientBootstrap ?: SdkDefaultIO.ClientBootstrap
44+
tlsContext = crtTlsContext
45+
manualWindowManagement = true
46+
socketOptions = SocketOptions(
47+
connectTimeoutMs = config.connectTimeout.inWholeMilliseconds.toInt(),
48+
)
49+
initialWindowSize = config.initialWindowSizeBytes
50+
maxConnections = config.maxConnections.toInt()
51+
maxConnectionIdleMs = config.connectionIdleTimeout.inWholeMilliseconds
52+
}
53+
54+
// connection managers are per host
55+
private val connManagers = mutableMapOf<String, HttpClientConnectionManager>()
56+
private val mutex = Mutex()
57+
58+
public suspend fun acquire(request: HttpRequest): HttpClientConnection {
59+
val proxyConfig = config.proxySelector.select(request.url)
60+
61+
val manager = getManagerForUri(request.uri, proxyConfig)
62+
var leaseAcquired = false
63+
64+
return try {
65+
// wait for an actual connection
66+
val conn = withTimeout(config.connectionAcquireTimeout) {
67+
// get a permit to acquire a connection (limits overall connections since managers are per/host)
68+
leases.acquire()
69+
leaseAcquired = true
70+
manager.acquireConnection()
71+
}
72+
73+
LeasedConnection(conn)
74+
} catch (ex: Exception) {
75+
if (leaseAcquired) {
76+
leases.release()
77+
}
78+
val httpEx = when (ex) {
79+
is HttpException -> ex
80+
is TimeoutCancellationException -> HttpException("timed out waiting for an HTTP connection to be acquired from the pool", errorCode = HttpErrorCode.CONNECTION_ACQUIRE_TIMEOUT)
81+
else -> HttpException(ex)
82+
}
83+
84+
throw httpEx
85+
}
86+
}
87+
private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock {
88+
connManagers.getOrPut(uri.authority) {
89+
val connOpts = options.apply {
90+
this.uri = uri
91+
proxyOptions = when (proxyConfig) {
92+
is ProxyConfig.Http -> HttpProxyOptions(
93+
proxyConfig.url.host.toString(),
94+
proxyConfig.url.port,
95+
authUsername = proxyConfig.url.userInfo?.username,
96+
authPassword = proxyConfig.url.userInfo?.password,
97+
authType = if (proxyConfig.url.userInfo != null) HttpProxyAuthorizationType.Basic else HttpProxyAuthorizationType.None,
98+
)
99+
else -> null
100+
}
101+
}.build()
102+
HttpClientConnectionManager(connOpts)
103+
}
104+
}
105+
override fun close() {
106+
connManagers.forEach { entry -> entry.value.close() }
107+
crtTlsContext.close()
108+
}
109+
110+
private inner class LeasedConnection(private val delegate: HttpClientConnection) : HttpClientConnection by delegate {
111+
override fun close() {
112+
try {
113+
// close actually returns to the pool
114+
delegate.close()
115+
} finally {
116+
leases.release()
117+
}
118+
}
119+
}
120+
}
121+
122+
private fun toCrtTlsVersion(sdkTlsVersion: SdkTlsVersion?): CrtTlsVersion = when (sdkTlsVersion) {
123+
null -> aws.sdk.kotlin.crt.io.TlsVersion.SYS_DEFAULT
124+
TlsVersion.TLS_1_0 -> CrtTlsVersion.TLSv1
125+
TlsVersion.TLS_1_1 -> CrtTlsVersion.TLS_V1_1
126+
TlsVersion.TLS_1_2 -> CrtTlsVersion.TLS_V1_2
127+
TlsVersion.TLS_1_3 -> CrtTlsVersion.TLS_V1_3
128+
}

runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,9 @@
55

66
package aws.smithy.kotlin.runtime.http.engine.crt
77

8-
import aws.sdk.kotlin.crt.http.HttpClientConnectionManager
9-
import aws.sdk.kotlin.crt.http.HttpClientConnectionManagerOptionsBuilder
10-
import aws.sdk.kotlin.crt.http.HttpProxyAuthorizationType
11-
import aws.sdk.kotlin.crt.http.HttpProxyOptions
12-
import aws.sdk.kotlin.crt.io.SocketOptions
13-
import aws.sdk.kotlin.crt.io.TlsContextOptionsBuilder
14-
import aws.sdk.kotlin.crt.io.Uri
15-
import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
16-
import aws.smithy.kotlin.runtime.http.HttpErrorCode
17-
import aws.smithy.kotlin.runtime.http.HttpException
188
import aws.smithy.kotlin.runtime.http.config.EngineFactory
199
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
2010
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
21-
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
2211
import aws.smithy.kotlin.runtime.http.engine.callContext
2312
import aws.smithy.kotlin.runtime.http.request.HttpRequest
2413
import aws.smithy.kotlin.runtime.http.response.HttpCall
@@ -27,13 +16,8 @@ import aws.smithy.kotlin.runtime.operation.ExecutionContext
2716
import aws.smithy.kotlin.runtime.telemetry.logging.logger
2817
import aws.smithy.kotlin.runtime.time.Instant
2918
import kotlinx.coroutines.*
30-
import kotlinx.coroutines.sync.Mutex
3119
import kotlinx.coroutines.sync.Semaphore
32-
import kotlinx.coroutines.sync.withLock
3320
import kotlinx.coroutines.sync.withPermit
34-
import aws.sdk.kotlin.crt.io.TlsContext as CrtTlsContext
35-
import aws.sdk.kotlin.crt.io.TlsVersion as CrtTlsVersion
36-
import aws.smithy.kotlin.runtime.config.TlsVersion as SdkTlsVersion
3721

3822
internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024
3923
internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024
@@ -51,15 +35,6 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
5135
override val engineConstructor: (CrtHttpEngineConfig.Builder.() -> Unit) -> CrtHttpEngine = ::invoke
5236
}
5337

54-
private val crtTlsContext: CrtTlsContext = TlsContextOptionsBuilder()
55-
.apply {
56-
verifyPeer = true
57-
alpn = config.tlsContext.alpn.joinToString(separator = ";") { it.protocolId }
58-
minTlsVersion = toCrtTlsVersion(config.tlsContext.minVersion)
59-
}
60-
.build()
61-
.let(::CrtTlsContext)
62-
6338
// FIXME - re-enable when SLF4j default is available
6439
// init {
6540
// if (config.socketReadTimeout != CrtHttpEngineConfig.Default.socketReadTimeout) {
@@ -75,36 +50,17 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
7550
// }
7651
// }
7752

78-
private val options = HttpClientConnectionManagerOptionsBuilder().apply {
79-
clientBootstrap = config.clientBootstrap ?: SdkDefaultIO.ClientBootstrap
80-
tlsContext = crtTlsContext
81-
manualWindowManagement = true
82-
socketOptions = SocketOptions(
83-
connectTimeoutMs = config.connectTimeout.inWholeMilliseconds.toInt(),
84-
)
85-
initialWindowSize = config.initialWindowSizeBytes
86-
// FIXME - given managers are _per host_ the maxConnections parameter is not actually respected here
87-
maxConnections = config.maxConnections.toInt()
88-
maxConnectionIdleMs = config.connectionIdleTimeout.inWholeMilliseconds
89-
}
90-
91-
// connection managers are per host
92-
private val connManagers = mutableMapOf<String, HttpClientConnectionManager>()
93-
private val mutex = Mutex()
9453
private val requestLimiter = Semaphore(config.maxConcurrency.toInt())
54+
private val connectionManager = ConnectionManager(config)
9555

9656
override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit {
9757
val callContext = callContext()
9858
val logger = callContext.logger<CrtHttpEngine>()
99-
val proxyConfig = config.proxySelector.select(request.url)
100-
val manager = getManagerForUri(request.uri, proxyConfig)
10159

10260
// LIFETIME: connection will be released back to the pool/manager when
10361
// the response completes OR on exception (both handled by the completion handler registered on the stream
10462
// handler)
105-
val conn = withTimeoutOrNull(config.connectionAcquireTimeout) {
106-
manager.acquireConnection()
107-
} ?: throw HttpException("timed out waiting for an HTTP connection to be acquired from the pool", errorCode = HttpErrorCode.CONNECTION_ACQUIRE_TIMEOUT)
63+
val conn = connectionManager.acquire(request)
10864
logger.trace { "Acquired connection ${conn.id}" }
10965

11066
val respHandler = SdkStreamResponseHandler(conn, callContext)
@@ -137,34 +93,6 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
13793
override fun shutdown() {
13894
// close all resources
13995
// SAFETY: shutdown is only invoked once AND only after all requests have completed and no more are coming
140-
connManagers.forEach { entry -> entry.value.close() }
141-
crtTlsContext.close()
96+
connectionManager.close()
14297
}
143-
144-
private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock {
145-
connManagers.getOrPut(uri.authority) {
146-
val connOpts = options.apply {
147-
this.uri = uri
148-
proxyOptions = when (proxyConfig) {
149-
is ProxyConfig.Http -> HttpProxyOptions(
150-
proxyConfig.url.host.toString(),
151-
proxyConfig.url.port,
152-
authUsername = proxyConfig.url.userInfo?.username,
153-
authPassword = proxyConfig.url.userInfo?.password,
154-
authType = if (proxyConfig.url.userInfo != null) HttpProxyAuthorizationType.Basic else HttpProxyAuthorizationType.None,
155-
)
156-
else -> null
157-
}
158-
}.build()
159-
HttpClientConnectionManager(connOpts)
160-
}
161-
}
162-
}
163-
164-
private fun toCrtTlsVersion(sdkTlsVersion: SdkTlsVersion?): CrtTlsVersion = when (sdkTlsVersion) {
165-
null -> CrtTlsVersion.SYS_DEFAULT
166-
SdkTlsVersion.TLS_1_0 -> CrtTlsVersion.TLSv1
167-
SdkTlsVersion.TLS_1_1 -> CrtTlsVersion.TLS_V1_1
168-
SdkTlsVersion.TLS_1_2 -> CrtTlsVersion.TLS_V1_2
169-
SdkTlsVersion.TLS_1_3 -> CrtTlsVersion.TLS_V1_3
17098
}

runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public class CrtHttpEngineConfig private constructor(builder: Builder) : HttpCli
2828
public val Default: CrtHttpEngineConfig = CrtHttpEngineConfig(Builder())
2929
}
3030

31+
/**
32+
* Maximum number of open connections
33+
*/
34+
public val maxConnections: UInt = builder.maxConnections
35+
3136
/**
3237
* The amount of data that can be buffered before reading from the socket will cease. Reading will
3338
* resume as data is consumed.
@@ -43,6 +48,7 @@ public class CrtHttpEngineConfig private constructor(builder: Builder) : HttpCli
4348
super.toBuilderApplicator()()
4449

4550
if (this is Builder) {
51+
maxConnections = this@CrtHttpEngineConfig.maxConnections
4652
initialWindowSizeBytes = this@CrtHttpEngineConfig.initialWindowSizeBytes
4753
clientBootstrap = this@CrtHttpEngineConfig.clientBootstrap
4854
}
@@ -52,6 +58,11 @@ public class CrtHttpEngineConfig private constructor(builder: Builder) : HttpCli
5258
* A builder for [CrtHttpEngineConfig]
5359
*/
5460
public class Builder : BuilderImpl() {
61+
/**
62+
* Maximum number of open connections
63+
*/
64+
public var maxConnections: UInt = 64u
65+
5566
/**
5667
* Set the amount of data that can be buffered before reading from the socket will cease. Reading will
5768
* resume as data is consumed.

runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ internal class SdkStreamResponseHandler(
207207
}
208208

209209
logger.trace { "Closing connection ${conn.id}" }
210+
// return to pool
210211
conn.close()
211212
}
212213
}

0 commit comments

Comments
 (0)