Skip to content

Commit ac43387

Browse files
authored
fix(amazonq): Adding backoff and retry for export result archive streaming API. (#5320)
* Adding backoff and retry for export result archive.
1 parent 8318155 commit ac43387

File tree

4 files changed

+252
-60
lines changed

4 files changed

+252
-60
lines changed

plugins/amazonq/chat/jetbrains-community/tst/software/aws/toolkits/jetbrains/services/amazonq/clients/AmazonQStreamingClientTest.kt

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ package software.aws.toolkits.jetbrains.services.amazonq.clients
55

66
import com.intellij.testFramework.RuleChain
77
import com.intellij.testFramework.replaceService
8+
import kotlinx.coroutines.runBlocking
89
import kotlinx.coroutines.test.runTest
10+
import org.assertj.core.api.Assertions.assertThat
911
import org.junit.Before
1012
import org.junit.Rule
1113
import org.junit.Test
1214
import org.mockito.kotlin.any
1315
import org.mockito.kotlin.argumentCaptor
16+
import org.mockito.kotlin.doAnswer
1417
import org.mockito.kotlin.doReturn
1518
import org.mockito.kotlin.mock
1619
import org.mockito.kotlin.stub
@@ -20,6 +23,7 @@ import software.amazon.awssdk.services.codewhispererstreaming.CodeWhispererStrea
2023
import software.amazon.awssdk.services.codewhispererstreaming.model.ExportIntent
2124
import software.amazon.awssdk.services.codewhispererstreaming.model.ExportResultArchiveRequest
2225
import software.amazon.awssdk.services.codewhispererstreaming.model.ExportResultArchiveResponseHandler
26+
import software.amazon.awssdk.services.codewhispererstreaming.model.ValidationException
2327
import software.amazon.awssdk.services.ssooidc.SsoOidcClient
2428
import software.aws.toolkits.core.TokenConnectionSettings
2529
import software.aws.toolkits.core.utils.test.aString
@@ -81,4 +85,156 @@ class AmazonQStreamingClientTest : AmazonQTestBase() {
8185
verify(streamingBearerClient).exportResultArchive(requestCaptor.capture(), handlerCaptor.capture())
8286
}
8387
}
88+
89+
@Test
90+
fun `verify retry on ValidationException`(): Unit = runBlocking {
91+
var attemptCount = 0
92+
streamingBearerClient = mockClientManagerRule.create<CodeWhispererStreamingAsyncClient>().stub {
93+
on {
94+
exportResultArchive(any<ExportResultArchiveRequest>(), any<ExportResultArchiveResponseHandler>())
95+
} doAnswer {
96+
attemptCount++
97+
if (attemptCount <= 2) {
98+
CompletableFuture.runAsync {
99+
throw VALIDATION_EXCEPTION
100+
}
101+
} else {
102+
CompletableFuture.completedFuture(mock())
103+
}
104+
}
105+
}
106+
107+
amazonQStreamingClient.exportResultArchive("test-id", ExportIntent.TRANSFORMATION, null, {}, {})
108+
109+
assertThat(attemptCount).isEqualTo(3)
110+
}
111+
112+
@Test
113+
fun `verify retry gives up after max attempts`(): Unit = runBlocking {
114+
var attemptCount = 0
115+
streamingBearerClient = mockClientManagerRule.create<CodeWhispererStreamingAsyncClient>().stub {
116+
on {
117+
exportResultArchive(any<ExportResultArchiveRequest>(), any<ExportResultArchiveResponseHandler>())
118+
} doAnswer {
119+
attemptCount++
120+
CompletableFuture.runAsync {
121+
throw VALIDATION_EXCEPTION
122+
}
123+
}
124+
}
125+
126+
val thrown = catchCoroutineException {
127+
amazonQStreamingClient.exportResultArchive("test-id", ExportIntent.TRANSFORMATION, null, {}, {})
128+
}
129+
130+
assertThat(attemptCount).isEqualTo(3)
131+
assertThat(thrown)
132+
.isInstanceOf(ValidationException::class.java)
133+
.hasMessage("Resource validation failed")
134+
}
135+
136+
@Test
137+
fun `verify no retry on non-retryable exception`(): Unit = runBlocking {
138+
var attemptCount = 0
139+
140+
streamingBearerClient = mockClientManagerRule.create<CodeWhispererStreamingAsyncClient>().stub {
141+
on {
142+
exportResultArchive(any<ExportResultArchiveRequest>(), any<ExportResultArchiveResponseHandler>())
143+
} doAnswer {
144+
attemptCount++
145+
CompletableFuture.runAsync {
146+
throw IllegalArgumentException("Non-retryable error")
147+
}
148+
}
149+
}
150+
151+
val thrown = catchCoroutineException {
152+
amazonQStreamingClient.exportResultArchive("test-id", ExportIntent.TRANSFORMATION, null, {}, {})
153+
}
154+
155+
assertThat(attemptCount).isEqualTo(1)
156+
assertThat(thrown)
157+
.isInstanceOf(IllegalArgumentException::class.java)
158+
.hasMessage("Non-retryable error")
159+
}
160+
161+
@Test
162+
fun `verify backoff timing between retries`(): Unit = runBlocking {
163+
var lastAttemptTime = 0L
164+
var minBackoffObserved = Long.MAX_VALUE
165+
var maxBackoffObserved = 0L
166+
167+
streamingBearerClient = mockClientManagerRule.create<CodeWhispererStreamingAsyncClient>().stub {
168+
on {
169+
exportResultArchive(any<ExportResultArchiveRequest>(), any<ExportResultArchiveResponseHandler>())
170+
} doAnswer {
171+
val currentTime = System.currentTimeMillis()
172+
if (lastAttemptTime > 0) {
173+
val backoffTime = currentTime - lastAttemptTime
174+
minBackoffObserved = minOf(minBackoffObserved, backoffTime)
175+
maxBackoffObserved = maxOf(maxBackoffObserved, backoffTime)
176+
}
177+
lastAttemptTime = currentTime
178+
179+
CompletableFuture.runAsync {
180+
throw VALIDATION_EXCEPTION
181+
}
182+
}
183+
}
184+
185+
val thrown = catchCoroutineException {
186+
amazonQStreamingClient.exportResultArchive("test-id", ExportIntent.TRANSFORMATION, null, {}, {})
187+
}
188+
189+
assertThat(thrown)
190+
.isInstanceOf(ValidationException::class.java)
191+
.hasMessage("Resource validation failed")
192+
assertThat(minBackoffObserved).isGreaterThanOrEqualTo(100)
193+
assertThat(maxBackoffObserved).isLessThanOrEqualTo(10000)
194+
}
195+
196+
@Test
197+
fun `verify onError callback is called with final exception`(): Unit = runBlocking {
198+
var errorCaught: Exception? = null
199+
200+
streamingBearerClient = mockClientManagerRule.create<CodeWhispererStreamingAsyncClient>().stub {
201+
on {
202+
exportResultArchive(any<ExportResultArchiveRequest>(), any<ExportResultArchiveResponseHandler>())
203+
} doAnswer {
204+
CompletableFuture.runAsync {
205+
throw VALIDATION_EXCEPTION
206+
}
207+
}
208+
}
209+
210+
val thrown = catchCoroutineException {
211+
amazonQStreamingClient.exportResultArchive(
212+
"test-id",
213+
ExportIntent.TRANSFORMATION,
214+
null,
215+
{ errorCaught = it },
216+
{}
217+
)
218+
}
219+
220+
assertThat(thrown)
221+
.isInstanceOf(ValidationException::class.java)
222+
.hasMessage("Resource validation failed")
223+
assertThat(errorCaught).isEqualTo(VALIDATION_EXCEPTION)
224+
}
225+
226+
private suspend fun catchCoroutineException(block: suspend () -> Unit): Throwable {
227+
try {
228+
block()
229+
error("Expected exception was not thrown")
230+
} catch (e: Throwable) {
231+
return e
232+
}
233+
}
234+
235+
companion object {
236+
private val VALIDATION_EXCEPTION = ValidationException.builder()
237+
.message("Resource validation failed")
238+
.build()
239+
}
84240
}

plugins/amazonq/codewhisperer/jetbrains-community/src/software/aws/toolkits/jetbrains/services/codewhisperer/util/CodeWhispererZipUploadManager.kt

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import software.amazon.awssdk.utils.IoUtils
2020
import software.aws.toolkits.core.utils.debug
2121
import software.aws.toolkits.core.utils.getLogger
2222
import software.aws.toolkits.jetbrains.core.AwsClientManager
23+
import software.aws.toolkits.jetbrains.services.amazonq.RetryableOperation
2324
import software.aws.toolkits.jetbrains.services.codewhisperer.codescan.CodeWhispererCodeScanSession.Companion.APPLICATION_ZIP
2425
import software.aws.toolkits.jetbrains.services.codewhisperer.codescan.CodeWhispererCodeScanSession.Companion.AWS_KMS
2526
import software.aws.toolkits.jetbrains.services.codewhisperer.codescan.CodeWhispererCodeScanSession.Companion.CONTENT_MD5
@@ -208,41 +209,3 @@ fun getTelemetryErrorMessage(e: Exception, featureUseCase: CodeWhispererConstant
208209
else -> message("testgen.message.failed")
209210
}
210211
}
211-
212-
class RetryableOperation<T> {
213-
private var attempts = 0
214-
private var currentDelay = INITIAL_DELAY
215-
private var lastException: Exception? = null
216-
217-
fun execute(
218-
operation: () -> T,
219-
isRetryable: (Exception) -> Boolean,
220-
errorHandler: (Exception, Int) -> Nothing,
221-
): T {
222-
while (attempts < MAX_RETRY_ATTEMPTS) {
223-
try {
224-
return operation()
225-
} catch (e: Exception) {
226-
lastException = e
227-
228-
attempts++
229-
if (attempts < MAX_RETRY_ATTEMPTS && isRetryable(e)) {
230-
Thread.sleep(currentDelay)
231-
currentDelay = (currentDelay * 2).coerceAtMost(MAX_BACKOFF)
232-
continue
233-
}
234-
235-
errorHandler(e, attempts)
236-
}
237-
}
238-
239-
// This line should never be reached due to errorHandler throwing exception
240-
throw RuntimeException("Unexpected state after $attempts attempts")
241-
}
242-
243-
companion object {
244-
private const val INITIAL_DELAY = 100L // milliseconds
245-
private const val MAX_BACKOFF = 10000L // milliseconds
246-
private const val MAX_RETRY_ATTEMPTS = 3
247-
}
248-
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package software.aws.toolkits.jetbrains.services.amazonq
5+
6+
import kotlinx.coroutines.delay
7+
import kotlinx.coroutines.runBlocking
8+
import software.amazon.awssdk.core.exception.RetryableException
9+
import kotlin.random.Random
10+
11+
class RetryableOperation<T> {
12+
private var attempts = 0
13+
private var currentDelay = INITIAL_DELAY
14+
15+
private fun getJitteredDelay(): Long {
16+
currentDelay = (currentDelay * 2).coerceAtMost(MAX_BACKOFF)
17+
return (currentDelay * (0.5 + Random.nextDouble(0.5))).toLong()
18+
}
19+
20+
fun execute(
21+
operation: () -> T,
22+
isRetryable: (Exception) -> Boolean = { it is RetryableException },
23+
errorHandler: ((Exception, Int) -> Nothing),
24+
): T = runBlocking {
25+
executeSuspend(operation, isRetryable, errorHandler)
26+
}
27+
28+
suspend fun executeSuspend(
29+
operation: suspend () -> T,
30+
isRetryable: (Exception) -> Boolean = { it is RetryableException },
31+
errorHandler: (suspend (Exception, Int) -> Nothing),
32+
): T {
33+
while (attempts < MAX_RETRY_ATTEMPTS) {
34+
try {
35+
return operation()
36+
} catch (e: Exception) {
37+
attempts++
38+
if (attempts >= MAX_RETRY_ATTEMPTS || !isRetryable(e)) {
39+
errorHandler.invoke(e, attempts)
40+
}
41+
delay(getJitteredDelay())
42+
}
43+
}
44+
45+
throw RuntimeException("Unexpected state after $attempts attempts")
46+
}
47+
48+
companion object {
49+
private const val INITIAL_DELAY = 100L
50+
private const val MAX_BACKOFF = 10000L
51+
private const val MAX_RETRY_ATTEMPTS = 3
52+
}
53+
}

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/clients/AmazonQStreamingClient.kt

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,21 @@ import com.intellij.openapi.components.Service
77
import com.intellij.openapi.components.service
88
import com.intellij.openapi.project.Project
99
import kotlinx.coroutines.future.await
10+
import software.amazon.awssdk.core.exception.SdkException
1011
import software.amazon.awssdk.services.codewhispererstreaming.CodeWhispererStreamingAsyncClient
1112
import software.amazon.awssdk.services.codewhispererstreaming.model.ExportContext
1213
import software.amazon.awssdk.services.codewhispererstreaming.model.ExportIntent
1314
import software.amazon.awssdk.services.codewhispererstreaming.model.ExportResultArchiveResponseHandler
15+
import software.amazon.awssdk.services.codewhispererstreaming.model.ThrottlingException
16+
import software.amazon.awssdk.services.codewhispererstreaming.model.ValidationException
1417
import software.aws.toolkits.core.utils.getLogger
1518
import software.aws.toolkits.core.utils.warn
1619
import software.aws.toolkits.jetbrains.core.awsClient
1720
import software.aws.toolkits.jetbrains.core.credentials.ToolkitConnectionManager
1821
import software.aws.toolkits.jetbrains.core.credentials.pinning.QConnection
22+
import software.aws.toolkits.jetbrains.services.amazonq.RetryableOperation
1923
import java.time.Instant
24+
import java.util.concurrent.TimeoutException
2025
import java.util.concurrent.atomic.AtomicReference
2126

2227
@Service(Service.Level.PROJECT)
@@ -54,30 +59,45 @@ class AmazonQStreamingClient(private val project: Project) {
5459
val checksum = AtomicReference("")
5560

5661
try {
57-
val result = streamingBearerClient().exportResultArchive(
58-
{
59-
it.exportId(exportId)
60-
it.exportIntent(exportIntent)
61-
it.exportContext(exportContext)
62+
RetryableOperation<Unit>().executeSuspend(
63+
operation = {
64+
val result = streamingBearerClient().exportResultArchive(
65+
{
66+
it.exportId(exportId)
67+
it.exportIntent(exportIntent)
68+
it.exportContext(exportContext)
69+
},
70+
ExportResultArchiveResponseHandler.builder().subscriber(
71+
ExportResultArchiveResponseHandler.Visitor.builder()
72+
.onBinaryMetadataEvent {
73+
checksum.set(it.contentChecksum())
74+
}.onBinaryPayloadEvent {
75+
val payloadBytes = it.bytes().asByteArray()
76+
byteBufferList.add(payloadBytes)
77+
}.onDefault {
78+
LOG.warn { "Received unknown payload stream: $it" }
79+
}
80+
.build()
81+
)
82+
.build()
83+
)
84+
result.await()
6285
},
63-
ExportResultArchiveResponseHandler.builder().subscriber(
64-
ExportResultArchiveResponseHandler.Visitor.builder()
65-
.onBinaryMetadataEvent {
66-
checksum.set(it.contentChecksum())
67-
}.onBinaryPayloadEvent {
68-
val payloadBytes = it.bytes().asByteArray()
69-
byteBufferList.add(payloadBytes)
70-
}.onDefault {
71-
LOG.warn { "Received unknown payload stream: $it" }
72-
}
73-
.build()
74-
)
75-
.build()
86+
isRetryable = { e ->
87+
when (e) {
88+
is ValidationException,
89+
is ThrottlingException,
90+
is SdkException,
91+
is TimeoutException,
92+
-> true
93+
else -> false
94+
}
95+
},
96+
errorHandler = { e, attempts ->
97+
onError(e)
98+
throw e
99+
}
76100
)
77-
result.await()
78-
} catch (e: Exception) {
79-
onError(e)
80-
throw e
81101
} finally {
82102
onStreamingFinished(startTime)
83103
}

0 commit comments

Comments
 (0)