Skip to content

Commit 158353e

Browse files
authored
fix: handle stream response exceptions properly in Ktor engine (#589)
1 parent 19c8757 commit 158353e

File tree

3 files changed

+56
-3
lines changed

3 files changed

+56
-3
lines changed

runtime/protocol/http-client-engines/http-client-engine-ktor/build.gradle.kts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ description = "Ktor HTTP Client Engine for Smithy services generated by smithy-k
77
extra["displayName"] = "Smithy :: Kotlin :: HTTP :: Engine :: Ktor"
88
extra["moduleName"] = "aws.smithy.kotlin.runtime.http.engine.ktor"
99

10+
val coroutinesVersion: String by project
1011
val ktorVersion: String by project
1112

1213
kotlin {
@@ -24,6 +25,11 @@ kotlin {
2425
implementation(project(":runtime:logging"))
2526
}
2627
}
28+
jvmTest {
29+
dependencies {
30+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion")
31+
}
32+
}
2733

2834
all {
2935
languageSettings.optIn("aws.smithy.kotlin.runtime.util.InternalApi")

runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/aws/smithy/kotlin/runtime/http/engine/ktor/KtorEngine.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import kotlinx.coroutines.channels.Channel
2121
import kotlinx.coroutines.channels.SendChannel
2222
import kotlinx.coroutines.job
2323
import kotlinx.coroutines.launch
24+
import kotlinx.coroutines.sync.Mutex
2425
import okhttp3.ConnectionPool
2526
import okhttp3.Protocol
2627
import java.util.concurrent.TimeUnit
@@ -152,11 +153,11 @@ actual class KtorEngine actual constructor(
152153
* Simple notify mechanism that waits for a signal
153154
*/
154155
internal class Waiter {
155-
private val channel = Channel<Unit>(0)
156+
private val mutex = Mutex(locked = true)
156157

157158
// wait for the signal
158-
suspend fun wait() { channel.receive() }
159+
suspend fun wait() { mutex.lock() }
159160

160161
// give the signal to continue
161-
fun signal() { channel.trySend(Unit).getOrThrow() }
162+
fun signal() { mutex.unlock() }
162163
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package aws.smithy.kotlin.runtime.http.engine.ktor
7+
8+
import kotlinx.coroutines.ExperimentalCoroutinesApi
9+
import kotlinx.coroutines.delay
10+
import kotlinx.coroutines.launch
11+
import kotlinx.coroutines.test.runBlockingTest
12+
import kotlin.test.Test
13+
import kotlin.test.assertEquals
14+
import kotlin.time.Duration.Companion.milliseconds
15+
import kotlin.time.ExperimentalTime
16+
17+
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalTime::class)
18+
class WaiterTest {
19+
@Test
20+
fun testSignalWhenWaiting() = runBlockingTest {
21+
val start = currentTime
22+
23+
val waiter = Waiter()
24+
launch {
25+
delay(500.milliseconds)
26+
waiter.signal()
27+
}
28+
waiter.wait()
29+
30+
assertEquals(500, currentTime - start)
31+
}
32+
33+
@Test
34+
fun testSignalWhenNotWaiting() = runBlockingTest {
35+
val start = currentTime
36+
37+
val waiter = Waiter()
38+
launch {
39+
delay(500.milliseconds)
40+
waiter.signal()
41+
}
42+
delay(1000.milliseconds)
43+
44+
assertEquals(1000, currentTime - start)
45+
}
46+
}

0 commit comments

Comments
 (0)