Skip to content

fix(amazonq): Replacing message bus with direct communication #5949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import software.aws.toolkits.jetbrains.services.amazonq.isQSupportedInThisVersio
import software.aws.toolkits.jetbrains.services.amazonq.lsp.AmazonQLspService
import software.aws.toolkits.jetbrains.services.amazonq.lsp.artifacts.ArtifactManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
import software.aws.toolkits.jetbrains.services.amazonq.messages.AmazonQMessage
import software.aws.toolkits.jetbrains.services.amazonq.messages.MessageConnector
Expand Down Expand Up @@ -66,6 +67,7 @@ class AmazonQPanel(val project: Project, private val scope: CoroutineScope) : Di
private val appConnections = mutableListOf<AppConnection>()

init {
// will be removed in next iteration.
project.messageBus.connect().subscribe(
AsyncChatUiListener.TOPIC,
object : AsyncChatUiListener {
Expand Down Expand Up @@ -135,6 +137,11 @@ class AmazonQPanel(val project: Project, private val scope: CoroutineScope) : Di
Browser(this@AmazonQPanel, webUri, project).also { browserInstance ->
wrapper.setContent(browserInstance.component())

// Register direct callback instead of using message bus
ChatCommunicationManager.getInstance(project).setChatUpdateCallback { message ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably just associate the browser with the communication manager

Copy link
Contributor Author

@LokeshDogga13 LokeshDogga13 Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Browser is not in shared package, and cannot import it. I am able to do it using Any, but not sure if it is a right way to do it.

browserInstance.postChat(message)
}

// Add DropTarget to the browser component
// JCEF does not propagate OS-level dragenter, dragOver and drop into DOM.
// As an alternative, enabling the native drag in JCEF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcNotification
import software.aws.toolkits.jetbrains.services.amazonq.lsp.JsonRpcRequest
import software.aws.toolkits.jetbrains.services.amazonq.lsp.encryption.JwtEncryptionManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AwsServerCapabilitiesProvider
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatAsyncResultManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.AUTH_FOLLOW_UP_CLICKED
Expand Down Expand Up @@ -123,7 +122,6 @@ class BrowserConnector(
) {
val uiReady = CompletableDeferred<Boolean>()
private val chatCommunicationManager = ChatCommunicationManager.getInstance(project)
private val chatAsyncResultManager = ChatAsyncResultManager.getInstance(project)

suspend fun connect(
browser: Browser,
Expand Down Expand Up @@ -240,7 +238,6 @@ class BrowserConnector(

val tabId = requestFromUi.params.tabId
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
chatCommunicationManager.registerPartialResultToken(partialResultToken)

var encryptionManager: JwtEncryptionManager? = null
val result = AmazonQLspService.executeAsyncIfRunning(project) { server ->
Expand All @@ -261,7 +258,6 @@ class BrowserConnector(
val tabId = requestFromUi.params.tabId
val quickActionParams = node.params ?: error("empty payload")
val partialResultToken = chatCommunicationManager.addPartialChatMessage(tabId)
chatCommunicationManager.registerPartialResultToken(partialResultToken)
var encryptionManager: JwtEncryptionManager? = null
val result = AmazonQLspService.executeAsyncIfRunning(project) { server ->
encryptionManager = this.encryptionManager
Expand Down Expand Up @@ -595,30 +591,13 @@ class BrowserConnector(
chatCommunicationManager.removeInflightRequestForTab(tabId)
} catch (e: CancellationException) {
LOG.warn { "Cancelled chat generation" }
try {
chatAsyncResultManager.createRequestId(partialResultToken)
chatAsyncResultManager.getResult(partialResultToken)
handleCancellation(tabId, browser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this ok to delete

Copy link
Contributor Author

@LokeshDogga13 LokeshDogga13 Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code is introduced when stop conversation needed to be handled on JB side. Now it is not required. So reverting this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also maintain records for partial results cause they are received separately and we mark them done when the result is completed, are we sure that line 599-600 can be deleted?

Copy link
Contributor

@samgst-amazon samgst-amazon Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws/amazon-q-eclipse@b944c77

I tried to mirror the cancellation logic here how it's handled in Eclipse. In the bug bash today removing this results in the original issue where:

  • Hitting stop button once stops the response, prints "you have stopped.." message, but stop button still shows
  • Hitting stop button second time prints"you have stopped.." again, stop button disappears

jk jk
I did not see that this behavior was regression and that the double clicking stop button issue is still present in prod rn.

} catch (ex: Exception) {
LOG.warn(ex) { "An error occurred while processing cancellation" }
} finally {
chatAsyncResultManager.removeRequestId(partialResultToken)
chatCommunicationManager.removePartialResultLock(partialResultToken)
chatCommunicationManager.removeFinalResultProcessed(partialResultToken)
}
} catch (e: Exception) {
LOG.warn(e) { "Failed to send chat message" }
browser.postChat(chatCommunicationManager.getErrorUiMessage(tabId, e, partialResultToken))
}
}
}

private fun handleCancellation(tabId: String, browser: Browser) {
// Send a message to hide the stop button without showing an error
val cancelMessage = chatCommunicationManager.getCancellationUiMessage(tabId)
browser.postChat(cancelMessage)
}

private fun updateQuickActionsInBrowser(browser: Browser) {
val isFeatureDevAvailable = isFeatureDevAvailable(project)
val isCodeTransformAvailable = isCodeTransformAvailable(project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.intellij.openapi.project.Project
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.runBlocking
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.GENERIC_COMMAND
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.GenericCommandParams
Expand Down Expand Up @@ -40,7 +40,8 @@ class ActionRegistrar {
val params = SendToPromptParams(selection = codeSelection, triggerType = TriggerType.CONTEXT_MENU)
uiMessage = FlareUiMessage(command = SEND_TO_PROMPT, params = params)
}
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
ChatCommunicationManager.getInstance(project).notifyUi(uiMessage)
// AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.intellij.openapi.actionSystem.DataKey
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.project.DumbAware
import kotlinx.coroutines.runBlocking
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.ChatPrompt
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.chat.SEND_TO_PROMPT
Expand Down Expand Up @@ -58,7 +58,8 @@ class ExplainCodeIssueAction : AnAction(), DumbAware {
)

val uiMessage = FlareUiMessage(SEND_TO_PROMPT, params)
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
ChatCommunicationManager.getInstance(project).notifyUi(uiMessage)
// AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import software.aws.toolkits.core.utils.info
import software.aws.toolkits.core.utils.warn
import software.aws.toolkits.jetbrains.core.credentials.ToolkitConnectionManager
import software.aws.toolkits.jetbrains.core.credentials.pinning.QConnection
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.AsyncChatUiListener
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.ChatCommunicationManager
import software.aws.toolkits.jetbrains.services.amazonq.lsp.flareChat.FlareUiMessage
import software.aws.toolkits.jetbrains.services.amazonq.lsp.model.aws.LSPAny
Expand Down Expand Up @@ -399,8 +398,7 @@ class AmazonQLanguageClientImpl(private val project: Project) : AmazonQLanguageC
}

override fun sendChatUpdate(params: LSPAny) {
AsyncChatUiListener.notifyPartialMessageUpdate(
project,
chatManager.notifyUi(
FlareUiMessage(
command = CHAT_SEND_UPDATE,
params = params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ interface AsyncChatUiListener : EventListener {
project.messageBus.syncPublisher(TOPIC).onChange(command)
}

// will be removed in next iteration.
@Deprecated("shouldn't need this version")
fun notifyPartialMessageUpdate(project: Project, command: String) {
project.messageBus.syncPublisher(TOPIC).onChange(command)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,28 @@
import java.util.concurrent.ConcurrentHashMap

@Service(Service.Level.PROJECT)
class ChatCommunicationManager(private val project: Project, private val cs: CoroutineScope) {

Check warning on line 40 in plugins/amazonq/shared/jetbrains-community/src/software/aws/toolkits/jetbrains/services/amazonq/lsp/flareChat/ChatCommunicationManager.kt

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Unused symbol

Property "project" is never used
val uiReady = CompletableDeferred<Boolean>()
private val chatPartialResultMap = ConcurrentHashMap<String, String>()
private val inflightRequestByTabId = ConcurrentHashMap<String, CompletableFuture<String>>()
private val pendingSerializedChatRequests = ConcurrentHashMap<String, CompletableFuture<GetSerializedChatResult>>()
private val pendingTabRequests = ConcurrentHashMap<String, CompletableFuture<LSPAny>>()
private val partialResultLocks = ConcurrentHashMap<String, Any>()
private val finalResultProcessed = ConcurrentHashMap<String, Boolean>()
private val openTabs = mutableSetOf<String>()

fun setUiReady() {
uiReady.complete(true)
}

private var chatUpdateCallback: ((FlareUiMessage) -> Unit)? = null

fun setChatUpdateCallback(callback: (FlareUiMessage) -> Unit) {
chatUpdateCallback = callback
}

fun notifyUi(uiMessage: FlareUiMessage) {
cs.launch {
uiReady.await()
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
chatUpdateCallback?.invoke(uiMessage)
}
}

Expand Down Expand Up @@ -118,20 +122,6 @@
fun removeTabOpenRequest(requestId: String) =
pendingTabRequests.remove(requestId)

fun removePartialResultLock(token: String) {
partialResultLocks.remove(token)
}

fun removeFinalResultProcessed(token: String) {
finalResultProcessed.remove(token)
}

fun registerPartialResultToken(partialResultToken: String) {
val lock = Any()
partialResultLocks[partialResultToken] = lock
finalResultProcessed[partialResultToken] = false
}

fun handlePartialResultProgressNotification(project: Project, params: ProgressParams) {
val token = ProgressNotificationUtils.getToken(params)
val tabId = getPartialChatMessage(token)
Expand All @@ -147,49 +137,18 @@
val encryptedPartialChatResult = getObject(params, String::class.java)
if (encryptedPartialChatResult != null) {
val partialChatResult = AmazonQLspService.getInstance(project).encryptionManager.decrypt(encryptedPartialChatResult)

// Special case: check for stop message before proceeding
val partialResultMap = tryOrNull {
val partialResultMap: Any = tryOrNull {
Gson().fromJson(partialChatResult, Map::class.java)
}
} ?: partialChatResult

if (partialResultMap != null) {
@Suppress("UNCHECKED_CAST")
val additionalMessages = partialResultMap["additionalMessages"] as? List<Map<String, Any>>
if (additionalMessages != null) {
for (message in additionalMessages) {
val messageId = message["messageId"] as? String
if (messageId != null && messageId.startsWith("stopped")) {
// Process stop messages immediately
val uiMessage = convertToJsonToSendToChat(
command = SEND_CHAT_COMMAND_PROMPT,
tabId = tabId,
params = partialChatResult,
isPartialResult = true
)
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
finalResultProcessed[token] = true
ChatAsyncResultManager.getInstance(project).setResult(token, partialResultMap)
return
}
}
}
}

// Normal processing for non-stop messages
val lock = partialResultLocks[token] ?: return
synchronized(lock) {
if (finalResultProcessed[token] == true || partialResultLocks[token] == null) {
return@synchronized
}
val uiMessage = convertToJsonToSendToChat(
notifyUi(
FlareUiMessage(
command = SEND_CHAT_COMMAND_PROMPT,
tabId = tabId,
params = partialChatResult,
params = partialResultMap,
isPartialResult = true
)
AsyncChatUiListener.notifyPartialMessageUpdate(project, uiMessage)
}
)
}
}

Expand Down Expand Up @@ -219,21 +178,6 @@
return uiMessage
}

fun getCancellationUiMessage(tabId: String): String {
// Create a minimal error params with empty error message to hide the stop button
// without showing an actual error message to the user
val errorParams = Gson().toJson(ErrorParams(tabId, null, "", "")).toString()

return """
{
"command":"$CHAT_ERROR_PARAMS",
"tabId": "$tabId",
"params": $errorParams,
"isPartialResult": false
}
""".trimIndent()
}

fun handleAuthFollowUpClicked(project: Project, params: AuthFollowUpClickedParams) {
val incomingType = params.authFollowupType
val connectionManager = ToolkitConnectionManager.getInstance(project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ data class FlareUiMessage(
val command: String,
val params: Any,
val requestId: String? = null,
val tabId: String? = null,
val isPartialResult: Boolean? = false,
)
Loading