Skip to content

Commit bffc45f

Browse files
authored
fix(rt): OkHttp request body retry failing due to Job completed (#832)
When OkHttp config has retry on connect failures enabled it will attempt to re-use the request body in some scenarios. The coroutine Job was setup to only be used once though and was completed when the original request fails. This causes the (internal okhttp) retry to fail with an obscure JobCompleted exception.
1 parent 67b155c commit bffc45f

File tree

3 files changed

+34
-42
lines changed

3 files changed

+34
-42
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id": "a9e3a074-33fa-41da-8322-c83adc2e3637",
3+
"type": "bugfix",
4+
"description": "Fix okhttp streaming body failing to retry"
5+
}

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/StreamingRequestBody.kt

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import aws.smithy.kotlin.runtime.http.HttpBody
99
import aws.smithy.kotlin.runtime.io.internal.toOkio
1010
import aws.smithy.kotlin.runtime.io.internal.toSdk
1111
import aws.smithy.kotlin.runtime.io.readAll
12+
import aws.smithy.kotlin.runtime.tracing.trace
1213
import aws.smithy.kotlin.runtime.util.derivedName
1314
import kotlinx.coroutines.*
1415
import okhttp3.MediaType
@@ -20,44 +21,50 @@ import kotlin.coroutines.CoroutineContext
2021
/**
2122
* OkHttp [RequestBody] that reads from [body] channel or source
2223
*/
24+
@OptIn(DelicateCoroutinesApi::class, ExperimentalStdlibApi::class)
2325
internal class StreamingRequestBody(
2426
private val body: HttpBody,
25-
callContext: CoroutineContext,
26-
) : RequestBody(), CoroutineScope {
27+
private val callContext: CoroutineContext,
28+
) : RequestBody() {
2729

2830
init {
2931
require(body is HttpBody.ChannelContent || body is HttpBody.SourceContent) { "Invalid streaming body $body" }
3032
}
3133

32-
private val producerJob = Job(callContext[Job])
33-
override val coroutineContext: CoroutineContext = callContext + producerJob + callContext.derivedName("send-request-body") + Dispatchers.IO
3434
override fun contentType(): MediaType? = null
3535
override fun contentLength(): Long = body.contentLength ?: -1
3636
override fun isOneShot(): Boolean = body.isOneShot
3737
override fun isDuplex(): Boolean = body.isDuplex
3838

39-
override fun writeTo(sink: BufferedSink) = try {
40-
doWriteTo(sink)
41-
} catch (ex: Exception) {
42-
throw when (ex) {
43-
is IOException -> ex
44-
// wrap all exceptions thrown from inside `okhttp3.RequestBody#writeTo(..)` as an IOException
45-
// see https://github.com/awslabs/aws-sdk-kotlin/issues/733
46-
else -> IOException(ex)
39+
override fun writeTo(sink: BufferedSink) {
40+
try {
41+
doWriteTo(sink)
42+
} catch (ex: Exception) {
43+
when (ex) {
44+
is CancellationException -> {
45+
callContext.trace<StreamingRequestBody> { "request cancelled" }
46+
// shouldn't need to propagate the exception because okhttp is cancellation aware via executeAsync()
47+
return
48+
}
49+
is IOException -> throw ex
50+
// wrap all exceptions thrown from inside `okhttp3.RequestBody#writeTo(..)` as an IOException
51+
// see https://github.com/awslabs/aws-sdk-kotlin/issues/733
52+
else -> throw IOException(ex)
53+
}
4754
}
4855
}
4956

50-
@OptIn(ExperimentalStdlibApi::class)
5157
private fun doWriteTo(sink: BufferedSink) {
58+
val context = callContext + callContext.derivedName("send-request-body")
5259
if (isDuplex()) {
5360
// launch coroutine that writes to sink in the background
54-
launch {
61+
GlobalScope.launch(context + Dispatchers.IO) {
5562
sink.use { transferBody(it) }
5663
}
5764
} else {
5865
// remove the current dispatcher (if it exists) and use the internal
5966
// runBlocking dispatcher that blocks the *current* thread
60-
val blockingContext = coroutineContext.minusKey(CoroutineDispatcher)
67+
val blockingContext = context.minusKey(CoroutineDispatcher)
6168

6269
// Non-duplex (aka "normal") requests MUST write all of their request body
6370
// before this function returns. Requests are given a background thread to
@@ -68,13 +75,15 @@ internal class StreamingRequestBody(
6875
}
6976
}
7077
}
71-
private suspend fun transferBody(sink: BufferedSink) = withJob(producerJob) {
78+
79+
private suspend fun transferBody(sink: BufferedSink) {
7280
when (body) {
7381
is HttpBody.ChannelContent -> {
7482
val chan = body.readFrom()
7583
val sdkSink = sink.toSdk()
7684
chan.readAll(sdkSink)
7785
}
86+
7887
is HttpBody.SourceContent -> {
7988
val source = body.readFrom()
8089
source.toOkio().use {
@@ -86,19 +95,3 @@ internal class StreamingRequestBody(
8695
}
8796
}
8897
}
89-
90-
/**
91-
* Completes the given job when the block returns calling either `complete()` when the block runs
92-
* successfully or `completeExceptionally()` on exception.
93-
* @return the result of calling [block]
94-
*/
95-
private inline fun <T> withJob(job: CompletableJob, block: () -> T): T {
96-
try {
97-
return block()
98-
} catch (ex: Exception) {
99-
job.completeExceptionally(ex)
100-
throw ex
101-
} finally {
102-
job.complete()
103-
}
104-
}

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/test/aws/smithy/kotlin/runtime/http/engine/okhttp/StreamingRequestBodyTest.kt

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import kotlinx.coroutines.test.runTest
1515
import okio.Buffer
1616
import okio.BufferedSink
1717
import org.junit.jupiter.api.Test
18-
import java.io.IOException
1918
import kotlin.coroutines.EmptyCoroutineContext
2019
import kotlin.test.*
2120
import kotlin.time.Duration.Companion.milliseconds
@@ -110,13 +109,9 @@ class StreamingRequestBodyTest {
110109
val callContext = coroutineContext + Job(coroutineContext.job)
111110
val actual = StreamingRequestBody(body, callContext)
112111
val buffer = Buffer()
113-
// see https://github.com/awslabs/aws-sdk-kotlin/issues/733 for why we expect
114-
// this to be an IOException
115-
val ex = assertFailsWith<IOException> {
116-
actual.writeTo(buffer)
117-
}
118-
assertIs<CancellationException>(ex.cause)
112+
actual.writeTo(buffer)
119113
}
114+
120115
delay(100.milliseconds)
121116

122117
job.cancel()
@@ -148,10 +143,9 @@ class StreamingRequestBodyTest {
148143

149144
assertTrue(actual.isDuplex())
150145

151-
assertEquals(1, callJob.children.toList().size) // producerJob
152-
assertEquals(0, callJob.children.toList()[0].children.toList().size)
146+
assertEquals(0, callJob.children.toList().size)
153147
actual.writeTo(sink)
154-
assertEquals(1, callJob.children.toList()[0].children.toList().size)
148+
assertEquals(1, callJob.children.toList().size) // writer
155149
assertEquals(sink.buffer.size, 0)
156150
chan.writeAll(content.source())
157151

0 commit comments

Comments
 (0)