Skip to content

Commit 3e44ed2

Browse files
committed
fix(amazonq): suppress notification failure noise and surface underlying lsp process failure
1 parent 8a74642 commit 3e44ed2

File tree

4 files changed

+87
-63
lines changed

4 files changed

+87
-63
lines changed

plugins/amazonq/chat/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/webview/BrowserConnector.kt

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ import kotlinx.coroutines.channels.awaitClose
2020
import kotlinx.coroutines.coroutineScope
2121
import kotlinx.coroutines.flow.Flow
2222
import kotlinx.coroutines.flow.callbackFlow
23+
import kotlinx.coroutines.flow.catch
2324
import kotlinx.coroutines.flow.distinctUntilChanged
2425
import kotlinx.coroutines.flow.launchIn
2526
import kotlinx.coroutines.flow.merge
2627
import kotlinx.coroutines.flow.onEach
28+
import kotlinx.coroutines.flow.timeout
29+
import kotlinx.coroutines.flow.toList
2730
import kotlinx.coroutines.launch
31+
import kotlinx.coroutines.runBlocking
2832
import org.cef.browser.CefBrowser
2933
import org.eclipse.lsp4j.TextDocumentIdentifier
34+
import org.eclipse.lsp4j.jsonrpc.JsonRpcException
3035
import org.eclipse.lsp4j.jsonrpc.ResponseErrorException
3136
import org.eclipse.lsp4j.jsonrpc.messages.ResponseErrorCode
3237
import software.aws.toolkits.core.utils.error
@@ -37,6 +42,7 @@ import software.aws.toolkits.jetbrains.services.amazonq.apps.AppConnection
3742
import software.aws.toolkits.jetbrains.services.amazonq.commands.MessageSerializer
3843
import software.aws.toolkits.jetbrains.services.amazonq.lsp.AmazonQChatServer
3944
import software.aws.toolkits.jetbrains.services.amazonq.lsp.AmazonQLspService
45+
import software.aws.toolkits.jetbrains.services.amazonq.lsp.AmazonQServerInstanceFacade
4046
import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcMethod
4147
import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcNotification
4248
import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcRequest
@@ -114,6 +120,7 @@ import software.aws.toolkits.telemetry.Telemetry
114120
import java.util.concurrent.CompletableFuture
115121
import java.util.concurrent.CompletionException
116122
import java.util.function.Function
123+
import kotlin.time.Duration.Companion.milliseconds
117124

118125
class BrowserConnector(
119126
private val serializer: MessageSerializer = MessageSerializer.getInstance(),
@@ -237,43 +244,20 @@ class BrowserConnector(
237244
val chatParams: ObjectNode = (node.params as ObjectNode)
238245
.setAll(serializedEnrichmentParams)
239246

240-
val tabId = requestFromUi.params.tabId
241-
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
242-
chatCommunicationManager.registerPartialResultToken(partialResultToken)
243-
244-
var encryptionManager: JwtEncryptionManager? = null
245-
val result = AmazonQLspService.executeAsyncIfRunning(project) { server ->
246-
encryptionManager = this.encryptionManager
247-
248-
val encryptedParams = EncryptedChatParams(this.encryptionManager.encrypt(chatParams), partialResultToken)
249-
rawEndpoint.request(SEND_CHAT_COMMAND_PROMPT, encryptedParams) as CompletableFuture<String>
250-
} ?: (CompletableFuture.failedFuture(IllegalStateException("LSP Server not running")))
251-
252-
// We assume there is only one outgoing request per tab because the input is
253-
// blocked when there is an outgoing request
254-
chatCommunicationManager.setInflightRequestForTab(tabId, result)
255-
showResult(result, partialResultToken, tabId, encryptionManager, browser)
247+
doChatRequest(requestFromUi.params.tabId, browser) { serverFacade, partialResultToken ->
248+
val encryptedParams = EncryptedChatParams(serverFacade.encryptionManager.encrypt(chatParams), partialResultToken)
249+
(serverFacade.rawEndpoint.request(SEND_CHAT_COMMAND_PROMPT, encryptedParams) as CompletableFuture<String>)
250+
}
256251
}
257252

258253
CHAT_QUICK_ACTION -> {
259-
val requestFromUi = serializer.deserializeChatMessages<QuickChatActionRequest>(node)
260-
val tabId = requestFromUi.params.tabId
261254
val quickActionParams = node.params ?: error("empty payload")
262-
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
263-
chatCommunicationManager.registerPartialResultToken(partialResultToken)
264-
var encryptionManager: JwtEncryptionManager? = null
265-
val result = AmazonQLspService.executeAsyncIfRunning(project) { server ->
266-
encryptionManager = this.encryptionManager
267-
268-
val encryptedParams = EncryptedQuickActionChatParams(this.encryptionManager.encrypt(quickActionParams), partialResultToken)
269-
rawEndpoint.request(CHAT_QUICK_ACTION, encryptedParams) as CompletableFuture<String>
270-
} ?: (CompletableFuture.failedFuture(IllegalStateException("LSP Server not running")))
271-
272-
// We assume there is only one outgoing request per tab because the input is
273-
// blocked when there is an outgoing request
274-
chatCommunicationManager.setInflightRequestForTab(tabId, result)
255+
val requestFromUi = serializer.deserializeChatMessages<QuickChatActionRequest>(node)
275256

276-
showResult(result, partialResultToken, tabId, encryptionManager, browser)
257+
doChatRequest(requestFromUi.params.tabId, browser) { serverFacade, partialResultToken ->
258+
val encryptedParams = EncryptedQuickActionChatParams(serverFacade.encryptionManager.encrypt(quickActionParams), partialResultToken)
259+
serverFacade.rawEndpoint.request(CHAT_QUICK_ACTION, encryptedParams) as CompletableFuture<String>
260+
}
277261
}
278262

279263
CHAT_LIST_CONVERSATIONS -> {
@@ -465,7 +449,6 @@ class BrowserConnector(
465449
AUTH_FOLLOW_UP_CLICKED -> {
466450
val message = serializer.deserializeChatMessages<AuthFollowUpClickNotification>(node)
467451
chatCommunicationManager.handleAuthFollowUpClicked(
468-
project,
469452
message.params
470453
)
471454
}
@@ -564,18 +547,44 @@ class BrowserConnector(
564547
}
565548
}
566549

567-
private fun showResult(
568-
result: CompletableFuture<String>,
569-
partialResultToken: String,
550+
private suspend fun doChatRequest(
570551
tabId: String,
571-
encryptionManager: JwtEncryptionManager?,
572552
browser: Browser,
553+
action: (AmazonQServerInstanceFacade, String) -> CompletableFuture<String>,
573554
) {
555+
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
556+
chatCommunicationManager.registerPartialResultToken(partialResultToken)
557+
var encryptionManager: JwtEncryptionManager? = null
558+
val result = AmazonQLspService.executeAsyncIfRunning(project) { _ ->
559+
// jank
560+
encryptionManager = this@executeAsyncIfRunning.encryptionManager
561+
action(this, partialResultToken)
562+
.handle { result, ex ->
563+
if (ex == null) {
564+
return@handle result
565+
}
566+
567+
if (JsonRpcException.indicatesStreamClosed(ex)) {
568+
// the flow buffer will never complete so insert some arbitrary timeout until we figure out how to end the flow
569+
// after the error stream is closed and drained
570+
val errorStream = runBlocking { this@executeAsyncIfRunning.errorStream.timeout(500.milliseconds).catch { }.toList() }
571+
throw RuntimeException("LSP execution error. See logs for more details: ${errorStream.joinToString(separator = "")}", ex.cause)
572+
}
573+
574+
throw ex
575+
}
576+
} ?: (CompletableFuture.failedFuture(IllegalStateException("LSP Server not running")))
577+
578+
// We assume there is only one outgoing request per tab because the input is
579+
// blocked when there is an outgoing request
580+
chatCommunicationManager.setInflightRequestForTab(tabId, result)
581+
574582
result.whenComplete { value, error ->
575583
try {
576584
if (error != null) {
577585
throw error
578586
}
587+
579588
chatCommunicationManager.removePartialChatMessage(partialResultToken)
580589
val messageToChat = ChatCommunicationManager.convertToJsonToSendToChat(
581590
SEND_CHAT_COMMAND_PROMPT,
@@ -585,7 +594,7 @@ class BrowserConnector(
585594
)
586595
browser.postChat(messageToChat)
587596
chatCommunicationManager.removeInflightRequestForTab(tabId)
588-
} catch (e: CancellationException) {
597+
} catch (_: CancellationException) {
589598
LOG.warn { "Cancelled chat generation" }
590599
try {
591600
chatAsyncResultManager.createRequestId(partialResultToken)

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/AmazonQLanguageClientImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ import java.util.concurrent.TimeUnit
8484
/**
8585
* Concrete implementation of [AmazonQLanguageClient] to handle messages sent from server
8686
*/
87-
class AmazonQLanguageClientImpl(private val project: Project) : AmazonQLanguageClient {
87+
class AmazonQLanguageClientImpl(private val project: Project, private val instanceFacade: AmazonQServerInstanceFacade) : AmazonQLanguageClient {
8888
private val chatManager
8989
get() = ChatCommunicationManager.getInstance(project)
9090
private fun handleTelemetryMap(telemetryMap: Map<*, *>) {
@@ -399,7 +399,7 @@ class AmazonQLanguageClientImpl(private val project: Project) : AmazonQLanguageC
399399
override fun notifyProgress(params: ProgressParams?) {
400400
if (params == null) return
401401
try {
402-
chatManager.handlePartialResultProgressNotification(project, params)
402+
chatManager.handlePartialResultProgressNotification(instanceFacade.encryptionManager, params)
403403
} catch (e: Exception) {
404404
LOG.error(e) { "Cannot handle partial chat" }
405405
}

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/AmazonQLspService.kt

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import kotlinx.coroutines.Job
3434
import kotlinx.coroutines.async
3535
import kotlinx.coroutines.channels.BufferOverflow
3636
import kotlinx.coroutines.delay
37+
import kotlinx.coroutines.flow.Flow
3738
import kotlinx.coroutines.flow.MutableSharedFlow
3839
import kotlinx.coroutines.flow.asSharedFlow
3940
import kotlinx.coroutines.flow.map
@@ -55,6 +56,7 @@ import org.eclipse.lsp4j.InitializedParams
5556
import org.eclipse.lsp4j.SynchronizationCapabilities
5657
import org.eclipse.lsp4j.TextDocumentClientCapabilities
5758
import org.eclipse.lsp4j.WorkspaceClientCapabilities
59+
import org.eclipse.lsp4j.jsonrpc.JsonRpcException
5860
import org.eclipse.lsp4j.jsonrpc.Launcher
5961
import org.eclipse.lsp4j.jsonrpc.MessageConsumer
6062
import org.eclipse.lsp4j.jsonrpc.RemoteEndpoint
@@ -109,6 +111,7 @@ internal class LSPProcessListener : ProcessListener {
109111
private val outputStream = PipedOutputStream()
110112
private val outputStreamWriter = OutputStreamWriter(outputStream, StandardCharsets.UTF_8)
111113
val inputStream = PipedInputStream(outputStream)
114+
val errorStream = MutableSharedFlow<String>(replay = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST)
112115

113116
override fun onTextAvailable(event: ProcessEvent, outputType: Key<*>) {
114117
if (ProcessOutputType.isStdout(outputType)) {
@@ -120,6 +123,7 @@ internal class LSPProcessListener : ProcessListener {
120123
}
121124
} else if (ProcessOutputType.isStderr(outputType)) {
122125
LOG.warn { "LSP process stderr: ${event.text}" }
126+
errorStream.tryEmit(event.text)
123127
} else if (outputType == ProcessOutputType.SYSTEM) {
124128
LOG.info { "LSP system events: ${event.text}" }
125129
}
@@ -159,31 +163,29 @@ class AmazonQLspService @VisibleForTesting constructor(
159163
val instanceFlow = _flowInstance.asSharedFlow().map { it.languageServer }
160164

161165
private var instance: Deferred<AmazonQServerInstanceFacade>
162-
163-
val encryptionManager
164-
get() = instance.getCompleted().encryptionManager
165166
private val heartbeatJob: Job
166167
private val restartTimestamps = ArrayDeque<Long>()
167168
private val restartMutex = Mutex() // Separate mutex for restart tracking
168169

169-
val rawEndpoint
170-
get() = instance.getCompleted().rawEndpoint
171-
172170
// dont allow lsp commands if server is restarting
173171
private val mutex = Mutex(false)
174172

175173
private fun start(): Deferred<AmazonQServerInstanceFacade> = cs.async {
176174
// manage lifecycle RAII-like so we can restart at arbitrary time
177175
// and suppress IDE error if server fails to start
178176
var attempts = 0
179-
while (attempts < 3) {
177+
while (isActive && attempts < 3 && checkForRemainingRestartAttempts()) {
180178
try {
181179
// no timeout; start() can download which may take long time
182180
val instance = starter.start(project, cs).also {
183181
Disposer.register(this@AmazonQLspService, it)
184182
}
185-
// wait for handshake to complete
186-
instance.initializeResult.join()
183+
184+
// no reason this should take long to process after init key sent
185+
withTimeout(5.seconds) {
186+
// wait for handshake to complete
187+
instance.initializeResult.await()
188+
}
187189

188190
return@async instance.also {
189191
_flowInstance.emit(it)
@@ -294,31 +296,31 @@ class AmazonQLspService @VisibleForTesting constructor(
294296
suspend fun<T> execute(runnable: suspend AmazonQLspService.(AmazonQLanguageServer) -> T): T {
295297
val lsp = withTimeout(5.seconds) {
296298
val holder = mutex.withLock { instance }.await()
297-
holder.initializeResult.join()
299+
holder.initializeResult.await()
298300

299301
holder.languageServer
300302
}
301303
return runnable(lsp)
302304
}
303305

304-
suspend fun<T> executeIfRunning(runnable: suspend AmazonQLspService.(AmazonQLanguageServer) -> T): T? = withContext(dispatcher) {
306+
suspend fun<T> executeIfRunning(runnable: suspend AmazonQServerInstanceFacade.(AmazonQLanguageServer) -> T): T? = withContext(dispatcher) {
305307
val lsp = try {
306308
withTimeout(5.seconds) {
307309
val holder = mutex.withLock { instance }.await()
308-
holder.initializeResult.join()
310+
holder.initializeResult.await()
309311

310-
holder.languageServer
312+
holder
311313
}
312314
} catch (_: Exception) {
313315
LOG.debug { "LSP not running" }
314316

315317
null
316318
}
317319

318-
lsp?.let { runnable(it) }
320+
lsp?.let { runnable(it, it.languageServer) }
319321
}
320322

321-
fun<T> syncExecuteIfRunning(runnable: suspend AmazonQLspService.(AmazonQLanguageServer) -> T): T? =
323+
fun<T> syncExecuteIfRunning(runnable: suspend AmazonQServerInstanceFacade.(AmazonQLanguageServer) -> T): T? =
322324
runBlocking(dispatcher) {
323325
executeIfRunning(runnable)
324326
}
@@ -331,13 +333,14 @@ class AmazonQLspService @VisibleForTesting constructor(
331333
private const val RESTART_WINDOW_MS = 3 * 60 * 1000
332334
fun getInstance(project: Project) = project.service<AmazonQLspService>()
333335

334-
suspend fun <T> executeAsyncIfRunning(project: Project, runnable: suspend AmazonQLspService.(AmazonQLanguageServer) -> T): T? =
336+
suspend fun <T> executeAsyncIfRunning(project: Project, runnable: suspend AmazonQServerInstanceFacade.(AmazonQLanguageServer) -> T): T? =
335337
project.serviceIfCreated<AmazonQLspService>()?.executeIfRunning(runnable)
336338
}
337339
}
338340

339341
interface AmazonQServerInstanceFacade : Disposable {
340342
val launcher: Launcher<AmazonQLanguageServer>
343+
val errorStream: Flow<String>
341344

342345
@Suppress("ForbiddenVoid")
343346
val launcherFuture: Future<Void>
@@ -354,6 +357,7 @@ interface AmazonQServerInstanceFacade : Disposable {
354357
private class AmazonQServerInstance(private val project: Project, private val cs: CoroutineScope) : Disposable, AmazonQServerInstanceFacade {
355358
override val encryptionManager = JwtEncryptionManager()
356359
override val launcher: Launcher<AmazonQLanguageServer>
360+
override val errorStream: Flow<String>
357361

358362
@Suppress("ForbiddenVoid")
359363
override val launcherFuture: Future<Void>
@@ -502,6 +506,7 @@ private class AmazonQServerInstance(private val project: Project, private val cs
502506

503507
launcherHandler = KillableColoredProcessHandler.Silent(cmd)
504508
val inputWrapper = LSPProcessListener()
509+
errorStream = inputWrapper.errorStream
505510
launcherHandler.addProcessListener(inputWrapper)
506511
launcherHandler.startNotify()
507512

@@ -539,11 +544,21 @@ private class AmazonQServerInstance(private val project: Project, private val cs
539544
AwsServerCapabilitiesProvider.getInstance(project).setAwsServerCapabilities(result.getAwsServerCapabilities())
540545
}
541546

542-
// required
543-
consumer?.consume(message)
547+
try {
548+
// required
549+
consumer?.consume(message)
550+
} catch (e: JsonRpcException) {
551+
// suppress stream error if notification, else bubble up for correct error propagation
552+
if (JsonRpcException.indicatesStreamClosed(e) && message is NotificationMessage) {
553+
LOG.warn { "Failed to send notification message (${message.method}): ${e.cause}" }
554+
LOG.debug(e) { "Failed to send notification message (${message.method})." }
555+
} else {
556+
throw e
557+
}
558+
}
544559
}
545560
}
546-
.setLocalService(AmazonQLanguageClientImpl(project))
561+
.setLocalService(AmazonQLanguageClientImpl(project, this))
547562
.setRemoteInterface(AmazonQLanguageServer::class.java)
548563
.configureGson {
549564
// otherwise Gson treats all numbers as double which causes deser issues

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/flareChat/ChatCommunicationManager.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import software.aws.toolkits.core.utils.warn
1919
import software.aws.toolkits.jetbrains.core.credentials.ToolkitConnectionManager
2020
import software.aws.toolkits.jetbrains.core.credentials.pinning.QConnection
2121
import software.aws.toolkits.jetbrains.core.credentials.reauthConnectionIfNeeded
22-
import software.aws.toolkits.jetbrains.services.amazonq.lsp.AmazonQLspService
22+
import software.aws.toolkits.jetbrains.services.amazonq.lsp.encryption.JwtEncryptionManager
2323
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ProgressNotificationUtils.getObject
2424
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.LSPAny
2525
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.AuthFollowUpClickedParams
@@ -132,7 +132,7 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
132132
finalResultProcessed[partialResultToken] = false
133133
}
134134

135-
fun handlePartialResultProgressNotification(project: Project, params: ProgressParams) {
135+
fun handlePartialResultProgressNotification(encryptionManager: JwtEncryptionManager, params: ProgressParams) {
136136
val token = ProgressNotificationUtils.getToken(params)
137137
val tabId = getPartialChatMessage(token)
138138
if (tabId.isNullOrEmpty()) {
@@ -146,7 +146,7 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
146146

147147
val encryptedPartialChatResult = getObject(params, String::class.java)
148148
if (encryptedPartialChatResult != null) {
149-
val partialChatResult = AmazonQLspService.getInstance(project).encryptionManager.decrypt(encryptedPartialChatResult)
149+
val partialChatResult = encryptionManager.decrypt(encryptedPartialChatResult)
150150

151151
// Special case: check for stop message before proceeding
152152
val partialResultMap = tryOrNull {
@@ -234,7 +234,7 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
234234
""".trimIndent()
235235
}
236236

237-
fun handleAuthFollowUpClicked(project: Project, params: AuthFollowUpClickedParams) {
237+
fun handleAuthFollowUpClicked(params: AuthFollowUpClickedParams) {
238238
val incomingType = params.authFollowupType
239239
val connectionManager = ToolkitConnectionManager.getInstance(project)
240240
try {

0 commit comments

Comments
 (0)