@@ -6,18 +6,26 @@ package software.aws.toolkits.jetbrains.services.amazonq.clients
66import com.intellij.openapi.components.Service
77import com.intellij.openapi.components.service
88import com.intellij.openapi.project.Project
9+ import kotlinx.coroutines.delay
910import kotlinx.coroutines.future.await
11+ import software.amazon.awssdk.core.exception.RetryableException
12+ import software.amazon.awssdk.core.exception.SdkException
1013import software.amazon.awssdk.services.codewhispererstreaming.CodeWhispererStreamingAsyncClient
1114import software.amazon.awssdk.services.codewhispererstreaming.model.ExportContext
1215import software.amazon.awssdk.services.codewhispererstreaming.model.ExportIntent
1316import software.amazon.awssdk.services.codewhispererstreaming.model.ExportResultArchiveResponseHandler
17+ import software.amazon.awssdk.services.codewhispererstreaming.model.ThrottlingException
18+ import software.amazon.awssdk.services.codewhispererstreaming.model.ValidationException
1419import software.aws.toolkits.core.utils.getLogger
1520import software.aws.toolkits.core.utils.warn
1621import software.aws.toolkits.jetbrains.core.awsClient
1722import software.aws.toolkits.jetbrains.core.credentials.ToolkitConnectionManager
1823import software.aws.toolkits.jetbrains.core.credentials.pinning.QConnection
1924import java.time.Instant
25+ import java.util.concurrent.TimeoutException
2026import java.util.concurrent.atomic.AtomicReference
27+ import javax.naming.ServiceUnavailableException
28+ import kotlin.random.Random
2129
2230@Service(Service .Level .PROJECT )
2331class AmazonQStreamingClient (private val project : Project ) {
@@ -54,27 +62,42 @@ class AmazonQStreamingClient(private val project: Project) {
5462 val checksum = AtomicReference (" " )
5563
5664 try {
57- val result = streamingBearerClient().exportResultArchive(
58- {
59- it.exportId(exportId)
60- it.exportIntent(exportIntent)
61- it.exportContext(exportContext)
65+ withRetry(
66+ block = {
67+ val result = streamingBearerClient().exportResultArchive(
68+ {
69+ it.exportId(exportId)
70+ it.exportIntent(exportIntent)
71+ it.exportContext(exportContext)
72+ },
73+ ExportResultArchiveResponseHandler .builder().subscriber(
74+ ExportResultArchiveResponseHandler .Visitor .builder()
75+ .onBinaryMetadataEvent {
76+ checksum.set(it.contentChecksum())
77+ }.onBinaryPayloadEvent {
78+ val payloadBytes = it.bytes().asByteArray()
79+ byteBufferList.add(payloadBytes)
80+ }.onDefault {
81+ LOG .warn { " Received unknown payload stream: $it " }
82+ }
83+ .build()
84+ )
85+ .build()
86+ )
87+ result.await()
6288 },
63- ExportResultArchiveResponseHandler .builder().subscriber(
64- ExportResultArchiveResponseHandler .Visitor .builder()
65- .onBinaryMetadataEvent {
66- checksum.set(it.contentChecksum())
67- }.onBinaryPayloadEvent {
68- val payloadBytes = it.bytes().asByteArray()
69- byteBufferList.add(payloadBytes)
70- }.onDefault {
71- LOG .warn { " Received unknown payload stream: $it " }
72- }
73- .build()
74- )
75- .build()
89+ isRetryable = { e ->
90+ when (e) {
91+ is ValidationException ,
92+ is ThrottlingException ,
93+ is ServiceUnavailableException ,
94+ is SdkException ,
95+ is TimeoutException ,
96+ -> true
97+ else -> false
98+ }
99+ }
76100 )
77- result.await()
78101 } catch (e: Exception ) {
79102 onError(e)
80103 throw e
@@ -85,8 +108,43 @@ class AmazonQStreamingClient(private val project: Project) {
85108 return byteBufferList
86109 }
87110
111+ /* *
112+ * Helper function to implement retry logic with exponential backoff and jitter
113+ *
114+ * @param block The suspend function to execute with retry logic
115+ * @param isRetryable A function that determines if an exception should trigger a retry
116+ * @return The result of the block execution
117+ */
118+ private suspend fun <T > withRetry (
119+ block : suspend () -> T ,
120+ isRetryable : (Exception ) -> Boolean = { it is RetryableException },
121+ ): T {
122+ var currentDelay = INITIAL_DELAY
123+ var attempt = 0
124+
125+ while (true ) {
126+ try {
127+ return block()
128+ } catch (e: Exception ) {
129+ attempt++
130+ if (attempt >= MAX_RETRY_ATTEMPTS || ! isRetryable(e)) {
131+ throw e
132+ }
133+
134+ // Calculate delay with exponential backoff and jitter
135+ currentDelay = (currentDelay * 2 ).coerceAtMost(MAX_BACKOFF )
136+ val jitteredDelay = currentDelay * (0.5 + Random .nextDouble(0.5 ))
137+
138+ delay(jitteredDelay.toLong())
139+ }
140+ }
141+ }
142+
88143 companion object {
89144 private val LOG = getLogger<AmazonQStreamingClient >()
145+ private const val INITIAL_DELAY = 100L // milliseconds
146+ private const val MAX_BACKOFF = 10000L // milliseconds
147+ private const val MAX_RETRY_ATTEMPTS = 3
90148
91149 fun getInstance (project : Project ) = project.service<AmazonQStreamingClient >()
92150 }
0 commit comments