Skip to content

Commit 2219a94

Browse files
fix: response stream parser buffering assistant text until the end (#685)
* fix: response stream parser buffering assistant text until the end * fix parser bug
1 parent 3dc87a9 commit 2219a94

File tree

2 files changed

+37
-43
lines changed

2 files changed

+37
-43
lines changed

crates/q_cli/src/cli/chat/mod.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
289289
}
290290
}
291291

292-
let mut waiting_for_tool = false;
292+
let mut tool_name_being_recvd: Option<String> = None;
293293
loop {
294294
match parser.recv().await {
295295
Ok(msg_event) => {
@@ -307,14 +307,10 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
307307
});
308308
},
309309
parser::ResponseEvent::ToolUseStart { name } => {
310-
if self.is_interactive {
311-
execute!(self.output, style::Print("\n\n"))?;
312-
self.spinner = Some(Spinner::new(
313-
Spinners::Dots,
314-
format!("Creating tool {}...", name.green()),
315-
));
316-
}
317-
waiting_for_tool = true;
310+
// We need to flush the buffer here, otherwise text will not be
311+
// printed while we are receiving tool use events.
312+
buf.push('\n');
313+
tool_name_being_recvd = Some(name);
318314
},
319315
parser::ResponseEvent::AssistantText(text) => {
320316
buf.push_str(&text);
@@ -330,7 +326,7 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
330326
)?;
331327
}
332328
self.tool_uses.push(tool_use);
333-
waiting_for_tool = false;
329+
tool_name_being_recvd = None;
334330
},
335331
parser::ResponseEvent::EndStream { message } => {
336332
self.conversation_state.push_assistant_message(message);
@@ -373,7 +369,7 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
373369
buf.push('\n');
374370
}
375371

376-
if !waiting_for_tool && !buf.is_empty() && self.is_interactive && self.spinner.is_some() {
372+
if tool_name_being_recvd.is_none() && !buf.is_empty() && self.is_interactive && self.spinner.is_some() {
377373
drop(self.spinner.take());
378374
queue!(
379375
self.output,
@@ -400,6 +396,15 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
400396
}
401397
}
402398

399+
// Set spinner after showing all of the assistant text content so far.
400+
if let (Some(name), true) = (&tool_name_being_recvd, self.is_interactive) {
401+
execute!(self.output, style::Print("\n"))?;
402+
self.spinner = Some(Spinner::new(
403+
Spinners::Dots,
404+
format!("Creating tool {}...", name.clone().green()),
405+
));
406+
}
407+
403408
if ended {
404409
if let (Some(conversation_id), Some(message_id)) = (
405410
self.conversation_state.conversation_id(),

crates/q_cli/src/cli/chat/parser.rs

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ pub struct ResponseParser {
5252
assistant_text: String,
5353
/// Tool uses requested by the model.
5454
tool_uses: Vec<ToolUse>,
55-
/// Buffered line required in case we need to discard a code reference event
56-
buffered_line: Option<String>,
57-
/// Short circuit and return early since we simply need to clear our buffered line
58-
short_circuit: bool,
5955
/// Whether or not we are currently receiving tool use delta events. Tuple of
6056
/// `Some((tool_use_id, name))` if true, [None] otherwise.
6157
parsing_tool_use: Option<(String, String)>,
@@ -69,46 +65,40 @@ impl ResponseParser {
6965
message_id: None,
7066
assistant_text: String::new(),
7167
tool_uses: Vec::new(),
72-
buffered_line: None,
73-
short_circuit: false,
7468
parsing_tool_use: None,
7569
}
7670
}
7771

7872
/// Consumes the associated [ConverseStreamResponse] until a valid [ResponseEvent] is parsed.
7973
pub async fn recv(&mut self) -> Result<ResponseEvent> {
80-
if self.short_circuit {
81-
let message = Message(ChatMessage::AssistantResponseMessage(AssistantResponseMessage {
82-
message_id: self.message_id.take(),
83-
content: std::mem::take(&mut self.assistant_text),
84-
tool_uses: if self.tool_uses.is_empty() {
85-
None
86-
} else {
87-
Some(self.tool_uses.clone().into_iter().map(Into::into).collect())
88-
},
89-
}));
90-
return Ok(ResponseEvent::EndStream { message });
91-
}
92-
9374
if let Some((id, name)) = self.parsing_tool_use.take() {
9475
let tool_use = self.parse_tool_use(id, name).await?;
9576
self.tool_uses.push(tool_use.clone());
9677
return Ok(ResponseEvent::ToolUse(tool_use));
9778
}
9879

80+
// First, handle discarding AssistantResponseEvent's that immediately precede a
81+
// CodeReferenceEvent.
82+
let peek = self.peek().await?;
83+
if let Some(ChatResponseStream::AssistantResponseEvent { content }) = peek {
84+
// Cloning to bypass borrowchecker stuff.
85+
let content = content.clone();
86+
self.next().await?;
87+
match self.peek().await? {
88+
Some(ChatResponseStream::CodeReferenceEvent(_)) => (),
89+
_ => {
90+
self.assistant_text.push_str(&content);
91+
return Ok(ResponseEvent::AssistantText(content));
92+
},
93+
}
94+
}
95+
9996
loop {
10097
match self.next().await {
10198
Ok(Some(output)) => match output {
10299
ChatResponseStream::AssistantResponseEvent { content } => {
103100
self.assistant_text.push_str(&content);
104-
let text = self.buffered_line.take();
105-
self.buffered_line = Some(content);
106-
if let Some(text) = text {
107-
return Ok(ResponseEvent::AssistantText(text));
108-
}
109-
},
110-
ChatResponseStream::CodeReferenceEvent(_) => {
111-
self.buffered_line = None;
101+
return Ok(ResponseEvent::AssistantText(content));
112102
},
113103
ChatResponseStream::InvalidStateEvent { reason, message } => {
114104
error!(%reason, %message, "invalid state event");
@@ -141,11 +131,6 @@ impl ResponseParser {
141131
_ => {},
142132
},
143133
Ok(None) => {
144-
if let Some(text) = self.buffered_line.take() {
145-
self.short_circuit = true;
146-
return Ok(ResponseEvent::AssistantText(text));
147-
}
148-
149134
let message = Message(ChatMessage::AssistantResponseMessage(AssistantResponseMessage {
150135
message_id: self.message_id.take(),
151136
content: std::mem::take(&mut self.assistant_text),
@@ -248,6 +233,10 @@ mod tests {
248233
ChatResponseStream::AssistantResponseEvent {
249234
content: " there".to_string(),
250235
},
236+
ChatResponseStream::AssistantResponseEvent {
237+
content: "IGNORE ME PLEASE".to_string(),
238+
},
239+
ChatResponseStream::CodeReferenceEvent(()),
251240
ChatResponseStream::ToolUseEvent {
252241
tool_use_id: tool_use_id.clone(),
253242
name: tool_name.clone(),

0 commit comments

Comments
 (0)