Skip to content

Commit 0e6ce8a

Browse files
committed
Merge remote-tracking branch 'origin/main' into release
2 parents 4290142 + a0c1772 commit 0e6ce8a

File tree

13 files changed

+87
-126
lines changed

13 files changed

+87
-126
lines changed

runtime/io/common/src/aws/smithy/kotlin/runtime/io/SdkByteReadChannel.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ internal suspend fun SdkByteReadChannel.readAvailableFallback(dest: SdkByteBuffe
158158
// channel was closed while waiting and no further content was made available
159159
if (availableForRead == 0 && isClosedForRead) return -1
160160
val tmp = ByteArray(minOf(availableForRead.toLong(), limit, Int.MAX_VALUE.toLong()).toInt())
161+
readFully(tmp)
161162
dest.writeFully(tmp)
162163
return tmp.size.toLong()
163164
}

runtime/io/common/test/aws/smithy/kotlin/runtime/io/SdkByteChannelOpsTest.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,36 @@ class SdkByteChannelOpsTest {
170170
}
171171
assertNull(chan.readUtf8CodePoint())
172172
}
173+
174+
private class ProxyChan(
175+
val ch: SdkByteReadChannel
176+
) : SdkByteReadChannel by ch {
177+
var proxyCalled = false
178+
override suspend fun readRemaining(limit: Int): ByteArray {
179+
proxyCalled = true
180+
return ch.readRemaining(limit)
181+
}
182+
override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) {
183+
proxyCalled = true
184+
return ch.readFully(sink, offset, length)
185+
}
186+
override suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int {
187+
proxyCalled = true
188+
return ch.readAvailable(sink, offset, length)
189+
}
190+
}
191+
192+
@Test
193+
fun testReadAvailableSdkByteBufferFallback() = runTest {
194+
val content = "a".repeat(64).encodeToByteArray()
195+
val inner = SdkByteReadChannel(content)
196+
val chan = ProxyChan(inner)
197+
198+
val dest = SdkByteBuffer(32U)
199+
val rc = chan.readAvailable(dest)
200+
assertEquals(dest.readRemaining.toLong(), rc)
201+
val expected = "a".repeat(32)
202+
assertEquals(expected, dest.decodeToString())
203+
assertTrue(chan.proxyCalled)
204+
}
173205
}

runtime/protocol/http-client-engines/test-suite/common/test/aws/smithy/kotlin/runtime/http/test/UploadTest.kt

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package aws.smithy.kotlin.runtime.http.test
77

8+
import aws.smithy.kotlin.runtime.content.ByteStream
89
import aws.smithy.kotlin.runtime.http.HttpBody
910
import aws.smithy.kotlin.runtime.http.HttpMethod
1011
import aws.smithy.kotlin.runtime.http.HttpStatusCode
@@ -14,6 +15,7 @@ import aws.smithy.kotlin.runtime.http.request.url
1415
import aws.smithy.kotlin.runtime.http.response.complete
1516
import aws.smithy.kotlin.runtime.http.test.util.AbstractEngineTest
1617
import aws.smithy.kotlin.runtime.http.test.util.test
18+
import aws.smithy.kotlin.runtime.http.toHttpBody
1719
import aws.smithy.kotlin.runtime.io.SdkByteChannel
1820
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
1921
import aws.smithy.kotlin.runtime.util.encodeToHex
@@ -29,7 +31,7 @@ class UploadTest : AbstractEngineTest() {
2931
fun testUploadIntegrity() = testEngines {
3032
// test that what we write the entire contents given to us
3133
test { env, client ->
32-
val data = ByteArray(16 * 1024 * 1023) { it.toByte() }
34+
val data = ByteArray(16 * 1024 * 1024) { it.toByte() }
3335
val sha = data.sha256().encodeToHex()
3436

3537
val req = HttpRequest {
@@ -80,4 +82,34 @@ class UploadTest : AbstractEngineTest() {
8082
}
8183
}
8284
}
85+
86+
@Test
87+
fun testUploadWithWrappedStream() = testEngines {
88+
// test custom ByteStream behavior
89+
// see https://github.com/awslabs/smithy-kotlin/issues/613
90+
test { env, client ->
91+
val data = ByteArray(1024 * 1024) { it.toByte() }
92+
val sha = data.sha256().encodeToHex()
93+
94+
val wrappedStream = object : ByteStream.ReplayableStream() {
95+
override val contentLength: Long = data.size.toLong()
96+
override fun newReader(): SdkByteReadChannel {
97+
val underlying = SdkByteReadChannel(data)
98+
return object : SdkByteReadChannel by underlying {}
99+
}
100+
}
101+
102+
val req = HttpRequest {
103+
method = HttpMethod.POST
104+
url(env.testServer)
105+
url.path = "/upload/content"
106+
body = wrappedStream.toHttpBody()
107+
}
108+
109+
val call = client.call(req)
110+
call.complete()
111+
assertEquals(HttpStatusCode.OK, call.response.status)
112+
assertEquals(sha, call.response.headers["content-sha256"], "sha mismatch for upload on ${client.engine}")
113+
}
114+
}
83115
}

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/retries/StandardRetryStrategy.kt

Lines changed: 8 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,10 @@
55

66
package aws.smithy.kotlin.runtime.retries
77

8-
import aws.smithy.kotlin.runtime.retries.delay.DelayProvider
9-
import aws.smithy.kotlin.runtime.retries.delay.RetryCapacityExceededException
10-
import aws.smithy.kotlin.runtime.retries.delay.RetryToken
11-
import aws.smithy.kotlin.runtime.retries.delay.RetryTokenBucket
8+
import aws.smithy.kotlin.runtime.retries.delay.*
129
import aws.smithy.kotlin.runtime.retries.policy.RetryDirective
1310
import aws.smithy.kotlin.runtime.retries.policy.RetryPolicy
1411
import kotlinx.coroutines.CancellationException
15-
import kotlinx.coroutines.TimeoutCancellationException
16-
import kotlinx.coroutines.withTimeout
17-
import kotlin.time.Duration
18-
import kotlin.time.Duration.Companion.milliseconds
1912

2013
/**
2114
* Implements a retry strategy utilizing backoff delayer and a token bucket for rate limiting and circuit breaking. Note
@@ -27,18 +20,16 @@ import kotlin.time.Duration.Companion.milliseconds
2720
* @param delayProvider A delayer that can back off after the initial try to spread out the retries.
2821
*/
2922
class StandardRetryStrategy(
30-
val options: StandardRetryStrategyOptions,
31-
private val tokenBucket: RetryTokenBucket,
32-
private val delayProvider: DelayProvider,
23+
val options: StandardRetryStrategyOptions = StandardRetryStrategyOptions.Default,
24+
private val tokenBucket: RetryTokenBucket = StandardRetryTokenBucket(),
25+
private val delayProvider: DelayProvider = ExponentialBackoffWithJitter()
3326
) : RetryStrategy {
3427
/**
3528
* Retry the given block of code until it's successful. Note this method throws exceptions for non-successful
3629
* outcomes from retrying.
3730
*/
3831
override suspend fun <R> retry(policy: RetryPolicy<R>, block: suspend () -> R): Outcome<R> =
39-
withTimeout(options.maxTime) {
40-
doTryLoop(block, policy, 1, tokenBucket.acquireToken(), null)
41-
}
32+
doTryLoop(block, policy, 1, tokenBucket.acquireToken())
4233

4334
/**
4435
* Perform a single iteration of the try loop. Execute the block of code, evaluate the result, and take action to
@@ -49,20 +40,16 @@ class StandardRetryStrategy(
4940
* @param fromToken A [RetryToken] which grants the strategy capacity to execute a try. This token is resolved
5041
* inside the function by calling [notifySuccess][RetryToken.notifySuccess],
5142
* [notifyFailure][RetryToken.notifyFailure], or [scheduleRetry][RetryToken.scheduleRetry].
52-
* @param previousResult The [Result] from the prior loop iteration. This is used in the case of a timeout to
53-
* include in the thrown exception.
5443
* @return The successful [Outcome] from the final try.
5544
*/
5645
private tailrec suspend fun <R> doTryLoop(
5746
block: suspend () -> R,
5847
policy: RetryPolicy<R>,
5948
attempt: Int,
6049
fromToken: RetryToken,
61-
previousResult: Result<R>?,
6250
): Outcome<R> {
6351
val callResult = runCatching { block() }
6452
when (val ex = callResult.exceptionOrNull()) {
65-
is TimeoutCancellationException -> throwTimeOut(fromToken, attempt, previousResult)
6653
is CancellationException -> throw ex
6754
}
6855

@@ -83,16 +70,14 @@ class StandardRetryStrategy(
8370
fromToken.scheduleRetry(evaluation.reason)
8471
}
8572
}
86-
} catch (ex: TimeoutCancellationException) {
87-
throwTimeOut(fromToken, attempt, callResult)
8873
} catch (ex: RetryCapacityExceededException) {
8974
throwCapacityExceeded(ex, attempt, callResult)
9075
} catch (ex: Throwable) {
9176
fromToken.notifyFailure()
9277
throw ex
9378
}
9479

95-
return doTryLoop(block, policy, attempt + 1, nextToken, callResult)
80+
return doTryLoop(block, policy, attempt + 1, nextToken)
9681
}
9782

9883
/**
@@ -136,26 +121,6 @@ class StandardRetryStrategy(
136121
else -> throw ex
137122
}
138123

139-
/**
140-
* Handles the termination of the retry loop because too much time has elapsed by marking the [RetryToken] as failed
141-
* and throwing a [TimedOutException].
142-
* @param token The [RetryToken] used in the attempt that was waiting or executing when the timeout occurred.
143-
* @param attempt The number of attempts completed.
144-
* @param previousResult The last result that was received (i.e., from the prior loop iteration).
145-
*/
146-
private suspend fun <R> throwTimeOut(token: RetryToken, attempt: Int, previousResult: Result<R>?): Nothing {
147-
token.notifyFailure()
148-
when (val ex = previousResult?.exceptionOrNull()) {
149-
null -> throw TimedOutException(
150-
"Took more than ${options.maxTime} to yield a result",
151-
attempt,
152-
previousResult?.getOrNull(),
153-
previousResult?.exceptionOrNull(),
154-
)
155-
else -> throw ex
156-
}
157-
}
158-
159124
/**
160125
* Handles the termination of the retry loop because too many attempts have been made by throwing a
161126
* [TimedOutException].
@@ -178,14 +143,13 @@ class StandardRetryStrategy(
178143

179144
/**
180145
* Defines configuration for a [StandardRetryStrategy].
181-
* @param maxTime The maximum amount of time to retry.
182146
* @param maxAttempts The maximum number of attempts to make (including the first attempt).
183147
*/
184-
data class StandardRetryStrategyOptions(val maxTime: Duration, val maxAttempts: Int) {
148+
data class StandardRetryStrategyOptions(val maxAttempts: Int) {
185149
companion object {
186150
/**
187151
* The default retry strategy configuration.
188152
*/
189-
val Default = StandardRetryStrategyOptions(maxTime = 20_000.milliseconds, maxAttempts = 3)
153+
val Default = StandardRetryStrategyOptions(maxAttempts = 3)
190154
}
191155
}

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/retries/delay/ExponentialBackoffWithJitter.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import kotlin.time.DurationUnit
2525
*
2626
* @param options The configuration to use for this delayer.
2727
*/
28-
class ExponentialBackoffWithJitter(val options: ExponentialBackoffWithJitterOptions) : DelayProvider {
28+
class ExponentialBackoffWithJitter(
29+
val options: ExponentialBackoffWithJitterOptions = ExponentialBackoffWithJitterOptions.Default
30+
) : DelayProvider {
2931
private val random = Random.Default
3032

3133
/**

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/retries/delay/StandardRetryTokenBucket.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private const val MS_PER_S = 1_000
2323
* @param clock A clock to use for time calculations.
2424
*/
2525
class StandardRetryTokenBucket(
26-
val options: StandardRetryTokenBucketOptions,
26+
val options: StandardRetryTokenBucketOptions = StandardRetryTokenBucketOptions.Default,
2727
private val clock: Clock = Clock.System,
2828
) : RetryTokenBucket {
2929
internal var capacity = options.maxCapacity

runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/retries/StandardRetryStrategyTest.kt

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ import aws.smithy.kotlin.runtime.retries.policy.RetryDirective
1212
import aws.smithy.kotlin.runtime.retries.policy.RetryErrorType
1313
import aws.smithy.kotlin.runtime.retries.policy.RetryPolicy
1414
import kotlinx.coroutines.ExperimentalCoroutinesApi
15-
import kotlinx.coroutines.delay
1615
import kotlinx.coroutines.test.runTest
1716
import kotlin.test.*
18-
import kotlin.time.Duration.Companion.milliseconds
1917

2018
class StandardRetryStrategyTest {
2119
@OptIn(ExperimentalCoroutinesApi::class)
@@ -161,53 +159,6 @@ class StandardRetryStrategyTest {
161159
val token = bucket.lastTokenAcquired!!
162160
assertTrue(token.nextToken!!.nextToken!!.isFailure)
163161
}
164-
165-
@OptIn(ExperimentalCoroutinesApi::class)
166-
@Test
167-
fun testTooLongFromException() = runTest {
168-
val options = StandardRetryStrategyOptions.Default.copy(maxTime = 1_500.milliseconds)
169-
val bucket = RecordingTokenBucket()
170-
val delayer = RecordingDelayer()
171-
val retryer = StandardRetryStrategy(options, bucket, delayer)
172-
val policy = StringRetryPolicy()
173-
174-
val result = runCatching {
175-
retryer.retry(policy) {
176-
delay(1_000)
177-
throw ConcurrentModificationException()
178-
}
179-
}
180-
181-
assertIs<ConcurrentModificationException>(result.exceptionOrNull(), "Unexpected ${result.exceptionOrNull()}")
182-
183-
val token = bucket.lastTokenAcquired!!
184-
assertTrue(token.nextToken!!.isFailure)
185-
}
186-
187-
@OptIn(ExperimentalCoroutinesApi::class)
188-
@Test
189-
fun testTooLongFromResult() = runTest {
190-
val options = StandardRetryStrategyOptions.Default.copy(maxTime = 1_000.milliseconds)
191-
val bucket = RecordingTokenBucket()
192-
val delayer = RecordingDelayer()
193-
val retryer = StandardRetryStrategy(options, bucket, delayer)
194-
val policy = StringRetryPolicy()
195-
196-
val result = runCatching {
197-
retryer.retry(policy) {
198-
delay(2_000)
199-
"This will never run!"
200-
}
201-
}
202-
203-
val ex = assertIs<TimedOutException>(result.exceptionOrNull(), "Unexpected ${result.exceptionOrNull()}")
204-
assertEquals(1, ex.attempts)
205-
assertNull(ex.lastResponse)
206-
assertNull(ex.lastException)
207-
208-
val token = bucket.lastTokenAcquired!!
209-
assertTrue(token.isFailure)
210-
}
211162
}
212163

213164
fun block(

runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/retries/impl/StandardRetryIntegrationTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import kotlinx.coroutines.test.runTest
2020
import kotlinx.serialization.SerialName
2121
import kotlinx.serialization.Serializable
2222
import kotlin.test.*
23-
import kotlin.time.Duration
2423
import kotlin.time.Duration.Companion.milliseconds
2524

2625
class StandardRetryIntegrationTest {
@@ -30,7 +29,7 @@ class StandardRetryIntegrationTest {
3029
val testCases = standardRetryIntegrationTestCases
3130
.mapValues { Yaml.default.decodeFromString(TestCase.serializer(), it.value) }
3231
testCases.forEach { (name, tc) ->
33-
val options = StandardRetryStrategyOptions(maxTime = Duration.INFINITE, maxAttempts = tc.given.maxAttempts)
32+
val options = StandardRetryStrategyOptions(maxAttempts = tc.given.maxAttempts)
3433
val tokenBucket = StandardRetryTokenBucket(
3534
StandardRetryTokenBucketOptions.Default.copy(
3635
maxCapacity = tc.given.initialRetryTokens,

smithy-kotlin-codegen/src/main/kotlin/software/amazon/smithy/kotlin/codegen/rendering/ClientConfigProperty.kt

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -263,24 +263,9 @@ object KotlinClientRuntimeConfigProperty {
263263
strategy.
264264
""".trimIndent()
265265

266-
val retryStrategyBlock = """
267-
run {
268-
val strategyOptions = StandardRetryStrategyOptions.Default
269-
val tokenBucket = StandardRetryTokenBucket(StandardRetryTokenBucketOptions.Default)
270-
val delayer = ExponentialBackoffWithJitter(ExponentialBackoffWithJitterOptions.Default)
271-
StandardRetryStrategy(strategyOptions, tokenBucket, delayer)
272-
}
273-
""".trimIndent()
274-
propertyType = ClientConfigPropertyType.ConstantValue(retryStrategyBlock)
275-
276-
additionalImports = listOf(
277-
RuntimeTypes.Core.Retries.StandardRetryStrategy,
278-
RuntimeTypes.Core.Retries.StandardRetryStrategyOptions,
279-
RuntimeTypes.Core.Retries.Delay.StandardRetryTokenBucket,
280-
RuntimeTypes.Core.Retries.Delay.StandardRetryTokenBucketOptions,
281-
RuntimeTypes.Core.Retries.Delay.ExponentialBackoffWithJitter,
282-
RuntimeTypes.Core.Retries.Delay.ExponentialBackoffWithJitterOptions,
283-
)
266+
propertyType = ClientConfigPropertyType.ConstantValue("StandardRetryStrategy()")
267+
268+
additionalImports = listOf(RuntimeTypes.Core.Retries.StandardRetryStrategy)
284269
}
285270

286271
SdkLogMode = ClientConfigProperty {

smithy-kotlin-codegen/src/main/kotlin/software/amazon/smithy/kotlin/codegen/rendering/waiters/WaiterGenerator.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private fun KotlinWriter.renderRetryStrategy(wi: WaiterInfo, asValName: String)
3434
}
3535
write("val delay = ExponentialBackoffWithJitter(delayOptions)")
3636
write("")
37-
write("val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.#T, maxAttempts = 20)", KotlinTypes.Time.seconds)
37+
write("val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)")
3838
write("StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)")
3939
}
4040
}

0 commit comments

Comments
 (0)