Skip to content

Commit 20d93fc

Browse files
authored
fix: http call waiting for idle connection monitor to finish (#1178)
1 parent 200a1e8 commit 20d93fc

File tree

3 files changed

+60
-33
lines changed

3 files changed

+60
-33
lines changed

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,7 @@
55
package aws.smithy.kotlin.runtime.http.engine.okhttp
66

77
import aws.smithy.kotlin.runtime.telemetry.logging.logger
8-
import kotlinx.coroutines.CoroutineName
9-
import kotlinx.coroutines.CoroutineScope
10-
import kotlinx.coroutines.Dispatchers
11-
import kotlinx.coroutines.Job
12-
import kotlinx.coroutines.cancelAndJoin
13-
import kotlinx.coroutines.isActive
14-
import kotlinx.coroutines.launch
15-
import kotlinx.coroutines.runBlocking
8+
import kotlinx.coroutines.*
169
import okhttp3.Call
1710
import okhttp3.Connection
1811
import okhttp3.ConnectionListener
@@ -24,14 +17,23 @@ import okio.source
2417
import java.net.SocketException
2518
import java.net.SocketTimeoutException
2619
import java.util.concurrent.ConcurrentHashMap
20+
import kotlin.coroutines.CoroutineContext
2721
import kotlin.coroutines.coroutineContext
2822
import kotlin.time.Duration
2923
import kotlin.time.measureTime
3024

3125
@OptIn(ExperimentalOkHttpApi::class)
3226
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
27+
private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
3328
private val monitors = ConcurrentHashMap<Connection, Job>()
3429

30+
fun close(): Unit = runBlocking {
31+
val monitorJob = requireNotNull(monitorScope.coroutineContext[Job]) {
32+
"Connection idle monitor scope cannot be cancelled because it does not have a job: $this"
33+
}
34+
monitorJob.cancelAndJoin()
35+
}
36+
3537
private fun Call.callContext() =
3638
request()
3739
.tag(SdkRequestTag::class.java)
@@ -58,21 +60,20 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
5860

5961
override fun connectionReleased(connection: Connection, call: Call) {
6062
val connId = System.identityHashCode(connection)
61-
val context = call.callContext()
62-
val scope = CoroutineScope(context)
63-
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
64-
doMonitor(connection)
63+
val callContext = call.callContext()
64+
val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
65+
doMonitor(connection, callContext)
6566
}
66-
context.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
67+
callContext.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
6768

6869
// Non-locking map access is okay here because this code will only execute synchronously as part of a
6970
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
7071
// the same connection.
7172
monitors[connection] = monitor
7273
}
7374

74-
private suspend fun doMonitor(conn: Connection) {
75-
val logger = coroutineContext.logger<ConnectionIdleMonitor>()
75+
private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) {
76+
val logger = callContext.logger<ConnectionIdleMonitor>()
7677

7778
val socket = conn.socket()
7879
val source = try {
@@ -111,6 +112,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
111112
logger.trace { "Attempting to reset soTimeout..." }
112113
try {
113114
conn.socket().soTimeout = oldTimeout
115+
logger.trace { "soTimeout reset." }
114116
} catch (e: Throwable) {
115117
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." }
116118
}

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import aws.smithy.kotlin.runtime.net.TlsVersion
1515
import aws.smithy.kotlin.runtime.operation.ExecutionContext
1616
import aws.smithy.kotlin.runtime.time.Instant
1717
import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds
18-
import kotlinx.coroutines.ExperimentalCoroutinesApi
1918
import kotlinx.coroutines.job
2019
import okhttp3.*
20+
import okhttp3.ConnectionPool
2121
import okhttp3.coroutines.executeAsync
2222
import java.util.concurrent.TimeUnit
2323
import kotlin.time.toJavaDuration
@@ -44,15 +44,15 @@ public class OkHttpEngine(
4444
}
4545

4646
private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider)
47-
private val client = config.buildClient(metrics)
47+
private val connectionIdleMonitor = config.connectionIdlePollingInterval?.let { ConnectionIdleMonitor(it) }
48+
private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor)
4849

4950
override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
5051
val callContext = callContext()
5152

5253
val engineRequest = request.toOkHttpRequest(context, callContext, metrics)
5354
val engineCall = client.newCall(engineRequest)
5455

55-
@OptIn(ExperimentalCoroutinesApi::class)
5656
val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() }
5757

5858
val response = engineResponse.toSdkResponse()
@@ -71,17 +71,17 @@ public class OkHttpEngine(
7171
}
7272

7373
override fun shutdown() {
74+
connectionIdleMonitor?.close()
7475
client.connectionPool.evictAll()
7576
client.dispatcher.executorService.shutdown()
7677
metrics.close()
7778
}
7879
}
7980

80-
/**
81-
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
82-
*/
83-
@InternalApi
84-
public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient {
81+
private fun OkHttpEngineConfig.buildClientFromConfig(
82+
metrics: HttpClientMetrics,
83+
poolOverride: ConnectionPool? = null,
84+
): OkHttpClient {
8585
val config = this
8686

8787
return OkHttpClient.Builder().apply {
@@ -99,20 +99,11 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
9999
readTimeout(config.socketReadTimeout.toJavaDuration())
100100
writeTimeout(config.socketWriteTimeout.toJavaDuration())
101101

102-
@OptIn(ExperimentalOkHttpApi::class)
103-
val connectionListener = if (config.connectionIdlePollingInterval == null) {
104-
ConnectionListener.NONE
105-
} else {
106-
ConnectionIdleMonitor(connectionIdlePollingInterval)
107-
}
108-
109102
// use our own pool configured with the timeout settings taken from config
110-
@OptIn(ExperimentalOkHttpApi::class)
111-
val pool = ConnectionPool(
103+
val pool = poolOverride ?: ConnectionPool(
112104
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
113105
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
114106
TimeUnit.MILLISECONDS,
115-
connectionListener = connectionListener,
116107
)
117108
connectionPool(pool)
118109

@@ -147,6 +138,34 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
147138
}.build()
148139
}
149140

141+
/**
142+
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
143+
*/
144+
// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener`
145+
// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private
146+
@InternalApi
147+
public fun OkHttpEngineConfig.buildClient(
148+
metrics: HttpClientMetrics,
149+
): OkHttpClient = this.buildClientFromConfig(metrics)
150+
151+
/**
152+
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
153+
*/
154+
// Used by OkHttpEngine - OkHttp5 does have `connectionListener`
155+
@OptIn(ExperimentalOkHttpApi::class)
156+
private fun OkHttpEngineConfig.buildClientWithConnectionListener(
157+
metrics: HttpClientMetrics,
158+
connectionListener: ConnectionIdleMonitor?,
159+
): OkHttpClient = this.buildClientFromConfig(
160+
metrics,
161+
ConnectionPool(
162+
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
163+
keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds,
164+
timeUnit = TimeUnit.MILLISECONDS,
165+
connectionListener = connectionListener ?: ConnectionListener.NONE,
166+
),
167+
)
168+
150169
private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec {
151170
val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2
152171
val okHttpTlsVersions = SdkTlsVersion

runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public class OkHttp4Engine(
7070
}
7171
}
7272
}
73+
74+
override fun shutdown() {
75+
client.connectionPool.evictAll()
76+
client.dispatcher.executorService.shutdown()
77+
metrics.close()
78+
}
7379
}
7480

7581
// Copied from okhttp3 5.x:

0 commit comments

Comments
 (0)