Skip to content

Commit ac87eab

Browse files
committed
fix: http call waiting for idle connection monitor to finish
1 parent 03cdfb4 commit ac87eab

File tree

4 files changed

+58
-35
lines changed

4 files changed

+58
-35
lines changed

runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ kotlin {
3636

3737
all {
3838
languageSettings.optIn("aws.smithy.kotlin.runtime.InternalApi")
39+
languageSettings.optIn("okhttp3.ExperimentalOkHttpApi")
3940
}
4041
}
4142
}

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,31 @@
55
package aws.smithy.kotlin.runtime.http.engine.okhttp
66

77
import aws.smithy.kotlin.runtime.telemetry.logging.logger
8-
import kotlinx.coroutines.CoroutineName
9-
import kotlinx.coroutines.CoroutineScope
10-
import kotlinx.coroutines.Dispatchers
11-
import kotlinx.coroutines.Job
12-
import kotlinx.coroutines.cancelAndJoin
13-
import kotlinx.coroutines.isActive
14-
import kotlinx.coroutines.launch
15-
import kotlinx.coroutines.runBlocking
8+
import kotlinx.coroutines.*
169
import okhttp3.Call
1710
import okhttp3.Connection
1811
import okhttp3.ConnectionListener
19-
import okhttp3.ExperimentalOkHttpApi
2012
import okhttp3.internal.closeQuietly
2113
import okio.EOFException
2214
import okio.buffer
2315
import okio.source
2416
import java.net.SocketException
2517
import java.net.SocketTimeoutException
2618
import java.util.concurrent.ConcurrentHashMap
19+
import kotlin.coroutines.CoroutineContext
2720
import kotlin.coroutines.coroutineContext
2821
import kotlin.time.Duration
2922
import kotlin.time.measureTime
3023

31-
@OptIn(ExperimentalOkHttpApi::class)
3224
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
25+
private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
3326
private val monitors = ConcurrentHashMap<Connection, Job>()
3427

28+
fun close(): Unit = runBlocking {
29+
monitors.values.forEach { it.cancelAndJoin() }
30+
monitorScope.cancel()
31+
}
32+
3533
private fun Call.callContext() =
3634
request()
3735
.tag(SdkRequestTag::class.java)
@@ -58,21 +56,20 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
5856

5957
override fun connectionReleased(connection: Connection, call: Call) {
6058
val connId = System.identityHashCode(connection)
61-
val context = call.callContext()
62-
val scope = CoroutineScope(context)
63-
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
64-
doMonitor(connection)
59+
val callContext = call.callContext()
60+
val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
61+
doMonitor(connection, callContext)
6562
}
66-
context.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
63+
callContext.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
6764

6865
// Non-locking map access is okay here because this code will only execute synchronously as part of a
6966
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
7067
// the same connection.
7168
monitors[connection] = monitor
7269
}
7370

74-
private suspend fun doMonitor(conn: Connection) {
75-
val logger = coroutineContext.logger<ConnectionIdleMonitor>()
71+
private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) {
72+
val logger = callContext.logger<ConnectionIdleMonitor>()
7673

7774
val socket = conn.socket()
7875
val source = try {
@@ -111,6 +108,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
111108
logger.trace { "Attempting to reset soTimeout..." }
112109
try {
113110
conn.socket().soTimeout = oldTimeout
111+
logger.trace { "SoTimeout reset." }
114112
} catch (e: Throwable) {
115113
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." }
116114
}

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import aws.smithy.kotlin.runtime.net.TlsVersion
1515
import aws.smithy.kotlin.runtime.operation.ExecutionContext
1616
import aws.smithy.kotlin.runtime.time.Instant
1717
import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds
18-
import kotlinx.coroutines.ExperimentalCoroutinesApi
1918
import kotlinx.coroutines.job
2019
import okhttp3.*
20+
import okhttp3.ConnectionPool
2121
import okhttp3.coroutines.executeAsync
2222
import java.util.concurrent.TimeUnit
2323
import kotlin.time.toJavaDuration
@@ -44,15 +44,15 @@ public class OkHttpEngine(
4444
}
4545

4646
private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider)
47-
private val client = config.buildClient(metrics)
47+
private val connectionIdleMonitor = if (config.connectionIdlePollingInterval != null) ConnectionIdleMonitor(config.connectionIdlePollingInterval) else null
48+
private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor)
4849

4950
override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
5051
val callContext = callContext()
5152

5253
val engineRequest = request.toOkHttpRequest(context, callContext, metrics)
5354
val engineCall = client.newCall(engineRequest)
5455

55-
@OptIn(ExperimentalCoroutinesApi::class)
5656
val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() }
5757

5858
val response = engineResponse.toSdkResponse()
@@ -71,17 +71,17 @@ public class OkHttpEngine(
7171
}
7272

7373
override fun shutdown() {
74+
connectionIdleMonitor?.close()
7475
client.connectionPool.evictAll()
7576
client.dispatcher.executorService.shutdown()
7677
metrics.close()
7778
}
7879
}
7980

80-
/**
81-
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
82-
*/
83-
@InternalApi
84-
public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient {
81+
private fun OkHttpEngineConfig.buildClientFromConfig(
82+
metrics: HttpClientMetrics,
83+
poolOverride: ConnectionPool? = null
84+
): OkHttpClient {
8585
val config = this
8686

8787
return OkHttpClient.Builder().apply {
@@ -99,22 +99,13 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
9999
readTimeout(config.socketReadTimeout.toJavaDuration())
100100
writeTimeout(config.socketWriteTimeout.toJavaDuration())
101101

102-
@OptIn(ExperimentalOkHttpApi::class)
103-
val connectionListener = if (config.connectionIdlePollingInterval == null) {
104-
ConnectionListener.NONE
105-
} else {
106-
ConnectionIdleMonitor(connectionIdlePollingInterval)
107-
}
108-
109102
// use our own pool configured with the timeout settings taken from config
110-
@OptIn(ExperimentalOkHttpApi::class)
111103
val pool = ConnectionPool(
112104
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
113105
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
114106
TimeUnit.MILLISECONDS,
115-
connectionListener = connectionListener,
116107
)
117-
connectionPool(pool)
108+
connectionPool(poolOverride ?: pool)
118109

119110
val dispatcher = Dispatcher().apply {
120111
maxRequests = config.maxConcurrency.toInt()
@@ -147,6 +138,33 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
147138
}.build()
148139
}
149140

141+
/**
142+
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
143+
*/
144+
// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener`
145+
// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private
146+
@InternalApi
147+
public fun OkHttpEngineConfig.buildClient(
148+
metrics: HttpClientMetrics
149+
): OkHttpClient = this.buildClientFromConfig(metrics)
150+
151+
/**
152+
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
153+
*/
154+
// Used by OkHttpEngine - OkHttp5 does have `connectionListener`
155+
private fun OkHttpEngineConfig.buildClientWithConnectionListener(
156+
metrics: HttpClientMetrics,
157+
connectionListener: ConnectionIdleMonitor?
158+
): OkHttpClient = this.buildClientFromConfig(
159+
metrics,
160+
ConnectionPool(
161+
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
162+
keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds,
163+
timeUnit = TimeUnit.MILLISECONDS,
164+
connectionListener = connectionListener ?: ConnectionListener.NONE,
165+
)
166+
)
167+
150168
private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec {
151169
val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2
152170
val okHttpTlsVersions = SdkTlsVersion

runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public class OkHttp4Engine(
7070
}
7171
}
7272
}
73+
74+
override fun shutdown() {
75+
client.connectionPool.evictAll()
76+
client.dispatcher.executorService.shutdown()
77+
metrics.close()
78+
}
7379
}
7480

7581
// Copied from okhttp3 5.x:

0 commit comments

Comments
 (0)