Skip to content

Commit ce3a4ed

Browse files
fix(amazonq): Replacing message bus with direct communication (aws#5949)
* Reverting the PR 5765 * removing usage of messagebus * Replacing other places as well * fixing detekt * Fixing context to show in chat * Fixing detekt * Addressing code review comments
1 parent 6209636 commit ce3a4ed

File tree

9 files changed

+29
-171
lines changed

9 files changed

+29
-171
lines changed

plugins/amazonq/chat/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/toolwindow/AmazonQPanel.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import software.aws.toolkits.jetbrains.services.amazonq.isQSupportedInThisVersio
3232
import software.aws.toolkits.jetbrains.services.amazonq.lsp.AmazonQLspService
3333
import software.aws.toolkits.jetbrains.services.amazonq.lsp.artifacts.ArtifactManager
3434
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
35+
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
3536
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
3637
import software.aws.toolkits.jetbrains.services.amazonq.messages.AmazonQMessage
3738
import software.aws.toolkits.jetbrains.services.amazonq.messages.MessageConnector
@@ -66,6 +67,7 @@ class AmazonQPanel(val project: Project, private val scope: CoroutineScope) : Di
6667
private val appConnections = mutableListOf<AppConnection>()
6768

6869
init {
70+
// will be removed in next iteration.
6971
project.messageBus.connect().subscribe(
7072
AsyncChatUiListener.TOPIC,
7173
object : AsyncChatUiListener {
@@ -135,6 +137,11 @@ class AmazonQPanel(val project: Project, private val scope: CoroutineScope) : Di
135137
Browser(this@AmazonQPanel, webUri, project).also { browserInstance ->
136138
wrapper.setContent(browserInstance.component())
137139

140+
// Register direct callback instead of using message bus
141+
ChatCommunicationManager.getInstance(project).setChatUpdateCallback { message ->
142+
browserInstance.postChat(message)
143+
}
144+
138145
// Add DropTarget to the browser component
139146
// JCEF does not propagate OS-level dragenter, dragOver and drop into DOM.
140147
// As an alternative, enabling the native drag in JCEF,

plugins/amazonq/chat/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/webview/BrowserConnector.kt

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcNotification
4343
import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcRequest
4444
import software.aws.toolkits.jetbrains.services.amazonq.lsp.encryption.JwtEncryptionManager
4545
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AwsServerCapabilitiesProvider
46-
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatAsyncResultManager
4746
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
4847
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
4948
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.AUTH_FOLLOW_UP_CLICKED
@@ -123,7 +122,6 @@ class BrowserConnector(
123122
) {
124123
val uiReady = CompletableDeferred<Boolean>()
125124
private val chatCommunicationManager = ChatCommunicationManager.getInstance(project)
126-
private val chatAsyncResultManager = ChatAsyncResultManager.getInstance(project)
127125

128126
suspend fun connect(
129127
browser: Browser,
@@ -240,7 +238,6 @@ class BrowserConnector(
240238

241239
val tabId = requestFromUi.params.tabId
242240
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
243-
chatCommunicationManager.registerPartialResultToken(partialResultToken)
244241

245242
var encryptionManager: JwtEncryptionManager? = null
246243
val result = AmazonQLspService.executeAsyncIfRunning(project) { server ->
@@ -261,7 +258,6 @@ class BrowserConnector(
261258
val tabId = requestFromUi.params.tabId
262259
val quickActionParams = node.params ?: error("empty payload")
263260
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
264-
chatCommunicationManager.registerPartialResultToken(partialResultToken)
265261
var encryptionManager: JwtEncryptionManager? = null
266262
val result = AmazonQLspService.executeAsyncIfRunning(project) { server ->
267263
encryptionManager = this.encryptionManager
@@ -595,30 +591,13 @@ class BrowserConnector(
595591
chatCommunicationManager.removeInflightRequestForTab(tabId)
596592
} catch (e: CancellationException) {
597593
LOG.warn { "Cancelled chat generation" }
598-
try {
599-
chatAsyncResultManager.createRequestId(partialResultToken)
600-
chatAsyncResultManager.getResult(partialResultToken)
601-
handleCancellation(tabId, browser)
602-
} catch (ex: Exception) {
603-
LOG.warn(ex) { "An error occurred while processing cancellation" }
604-
} finally {
605-
chatAsyncResultManager.removeRequestId(partialResultToken)
606-
chatCommunicationManager.removePartialResultLock(partialResultToken)
607-
chatCommunicationManager.removeFinalResultProcessed(partialResultToken)
608-
}
609594
} catch (e: Exception) {
610595
LOG.warn(e) { "Failed to send chat message" }
611596
browser.postChat(chatCommunicationManager.getErrorUiMessage(tabId, e, partialResultToken))
612597
}
613598
}
614599
}
615600

616-
private fun handleCancellation(tabId: String, browser: Browser) {
617-
// Send a message to hide the stop button without showing an error
618-
val cancelMessage = chatCommunicationManager.getCancellationUiMessage(tabId)
619-
browser.postChat(cancelMessage)
620-
}
621-
622601
private fun updateQuickActionsInBrowser(browser: Browser) {
623602
val isFeatureDevAvailable = isFeatureDevAvailable(project)
624603
val isCodeTransformAvailable = isCodeTransformAvailable(project)

plugins/amazonq/chat/jetbrains-community/src/software/aws/toolkits/jetbrains/services/cwc/commands/ActionRegistrar.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import com.intellij.openapi.project.Project
88
import kotlinx.coroutines.flow.MutableSharedFlow
99
import kotlinx.coroutines.flow.asSharedFlow
1010
import kotlinx.coroutines.runBlocking
11-
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
11+
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
1212
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
1313
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.GENERIC_COMMAND
1414
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.GenericCommandParams
@@ -40,7 +40,7 @@ class ActionRegistrar {
4040
val params = SendToPromptParams(selection = codeSelection, triggerType = TriggerType.CONTEXT_MENU)
4141
uiMessage = FlareUiMessage(command = SEND_TO_PROMPT, params = params)
4242
}
43-
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
43+
ChatCommunicationManager.getInstance(project).notifyUi(uiMessage)
4444
}
4545
}
4646
}

plugins/amazonq/chat/jetbrains-community/src/software/aws/toolkits/jetbrains/services/cwc/commands/codescan/actions/ExplainCodeIssueAction.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import com.intellij.openapi.actionSystem.DataKey
1111
import com.intellij.openapi.application.ApplicationManager
1212
import com.intellij.openapi.project.DumbAware
1313
import kotlinx.coroutines.runBlocking
14-
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
14+
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
1515
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
1616
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.ChatPrompt
1717
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.SEND_TO_PROMPT
@@ -58,7 +58,8 @@ class ExplainCodeIssueAction : AnAction(), DumbAware {
5858
)
5959

6060
val uiMessage = FlareUiMessage(SEND_TO_PROMPT, params)
61-
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
61+
ChatCommunicationManager.getInstance(project).notifyUi(uiMessage)
62+
// AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
6263
}
6364
}
6465
}

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/AmazonQLanguageClientImpl.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import software.aws.toolkits.core.utils.info
4444
import software.aws.toolkits.core.utils.warn
4545
import software.aws.toolkits.jetbrains.core.credentials.ToolkitConnectionManager
4646
import software.aws.toolkits.jetbrains.core.credentials.pinning.QConnection
47-
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
4847
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
4948
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
5049
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.LSPAny
@@ -399,8 +398,7 @@ class AmazonQLanguageClientImpl(private val project: Project) : AmazonQLanguageC
399398
}
400399

401400
override fun sendChatUpdate(params: LSPAny) {
402-
AsyncChatUiListener.notifyPartialMessageUpdate(
403-
project,
401+
chatManager.notifyUi(
404402
FlareUiMessage(
405403
command = CHAT_SEND_UPDATE,
406404
params = params,

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/flareChat/AsyncChatUiListener.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ interface AsyncChatUiListener : EventListener {
2323
project.messageBus.syncPublisher(TOPIC).onChange(command)
2424
}
2525

26+
// will be removed in next iteration.
2627
@Deprecated("shouldn't need this version")
2728
fun notifyPartialMessageUpdate(project: Project, command: String) {
2829
project.messageBus.syncPublisher(TOPIC).onChange(command)

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/flareChat/ChatAsyncResultManager.kt

Lines changed: 0 additions & 74 deletions
This file was deleted.

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/flareChat/ChatCommunicationManager.kt

Lines changed: 13 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,22 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
4343
private val inflightRequestByTabId = ConcurrentHashMap<String, CompletableFuture<String>>()
4444
private val pendingSerializedChatRequests = ConcurrentHashMap<String, CompletableFuture<GetSerializedChatResult>>()
4545
private val pendingTabRequests = ConcurrentHashMap<String, CompletableFuture<LSPAny>>()
46-
private val partialResultLocks = ConcurrentHashMap<String, Any>()
47-
private val finalResultProcessed = ConcurrentHashMap<String, Boolean>()
4846
private val openTabs = mutableSetOf<String>()
4947

5048
fun setUiReady() {
5149
uiReady.complete(true)
5250
}
5351

52+
private var chatUpdateCallback: ((FlareUiMessage) -> Unit)? = null
53+
54+
fun setChatUpdateCallback(callback: (FlareUiMessage) -> Unit) {
55+
chatUpdateCallback = callback
56+
}
57+
5458
fun notifyUi(uiMessage: FlareUiMessage) {
5559
cs.launch {
5660
uiReady.await()
57-
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
61+
chatUpdateCallback?.invoke(uiMessage)
5862
}
5963
}
6064

@@ -118,20 +122,6 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
118122
fun removeTabOpenRequest(requestId: String) =
119123
pendingTabRequests.remove(requestId)
120124

121-
fun removePartialResultLock(token: String) {
122-
partialResultLocks.remove(token)
123-
}
124-
125-
fun removeFinalResultProcessed(token: String) {
126-
finalResultProcessed.remove(token)
127-
}
128-
129-
fun registerPartialResultToken(partialResultToken: String) {
130-
val lock = Any()
131-
partialResultLocks[partialResultToken] = lock
132-
finalResultProcessed[partialResultToken] = false
133-
}
134-
135125
fun handlePartialResultProgressNotification(project: Project, params: ProgressParams) {
136126
val token = ProgressNotificationUtils.getToken(params)
137127
val tabId = getPartialChatMessage(token)
@@ -147,49 +137,18 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
147137
val encryptedPartialChatResult = getObject(params, String::class.java)
148138
if (encryptedPartialChatResult != null) {
149139
val partialChatResult = AmazonQLspService.getInstance(project).encryptionManager.decrypt(encryptedPartialChatResult)
150-
151-
// Special case: check for stop message before proceeding
152-
val partialResultMap = tryOrNull {
140+
val partialResultMap: Any = tryOrNull {
153141
Gson().fromJson(partialChatResult, Map::class.java)
154-
}
142+
} ?: partialChatResult
155143

156-
if (partialResultMap != null) {
157-
@Suppress("UNCHECKED_CAST")
158-
val additionalMessages = partialResultMap["additionalMessages"] as? List<Map<String, Any>>
159-
if (additionalMessages != null) {
160-
for (message in additionalMessages) {
161-
val messageId = message["messageId"] as? String
162-
if (messageId != null && messageId.startsWith("stopped")) {
163-
// Process stop messages immediately
164-
val uiMessage = convertToJsonToSendToChat(
165-
command = SEND_CHAT_COMMAND_PROMPT,
166-
tabId = tabId,
167-
params = partialChatResult,
168-
isPartialResult = true
169-
)
170-
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
171-
finalResultProcessed[token] = true
172-
ChatAsyncResultManager.getInstance(project).setResult(token, partialResultMap)
173-
return
174-
}
175-
}
176-
}
177-
}
178-
179-
// Normal processing for non-stop messages
180-
val lock = partialResultLocks[token] ?: return
181-
synchronized(lock) {
182-
if (finalResultProcessed[token] == true || partialResultLocks[token] == null) {
183-
return@synchronized
184-
}
185-
val uiMessage = convertToJsonToSendToChat(
144+
notifyUi(
145+
FlareUiMessage(
186146
command = SEND_CHAT_COMMAND_PROMPT,
187147
tabId = tabId,
188-
params = partialChatResult,
148+
params = partialResultMap,
189149
isPartialResult = true
190150
)
191-
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
192-
}
151+
)
193152
}
194153
}
195154

@@ -219,21 +178,6 @@ class ChatCommunicationManager(private val project: Project, private val cs: Cor
219178
return uiMessage
220179
}
221180

222-
fun getCancellationUiMessage(tabId: String): String {
223-
// Create a minimal error params with empty error message to hide the stop button
224-
// without showing an actual error message to the user
225-
val errorParams = Gson().toJson(ErrorParams(tabId, null, "", "")).toString()
226-
227-
return """
228-
{
229-
"command":"$CHAT_ERROR_PARAMS",
230-
"tabId": "$tabId",
231-
"params": $errorParams,
232-
"isPartialResult": false
233-
}
234-
""".trimIndent()
235-
}
236-
237181
fun handleAuthFollowUpClicked(project: Project, params: AuthFollowUpClickedParams) {
238182
val incomingType = params.authFollowupType
239183
val connectionManager = ToolkitConnectionManager.getInstance(project)

plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/flareChat/FlareUiMessage.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ data class FlareUiMessage(
77
val command: String,
88
val params: Any,
99
val requestId: String? = null,
10+
val tabId: String? = null,
11+
val isPartialResult: Boolean? = false,
1012
)

0 commit comments

Comments
 (0)