Skip to content

Commit d281002

Browse files
committed
fix: break up tool uses into multiple parts if stream ends unexpectedly
1 parent 10da3f6 commit d281002

File tree

2 files changed

+124
-16
lines changed

2 files changed

+124
-16
lines changed

crates/q_cli/src/cli/chat/mod.rs

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ struct ChatContext<'o, W> {
207207
tool_use_recursions: u32,
208208
current_user_input_id: Option<String>,
209209
tool_use_events: Vec<ToolUseEventBuilder>,
210+
211+
/// Whether or not an unexpected end of chat stream was encountered while consuming the model
212+
/// response's tool use data. Pair of (tool_use_id, name) for the tool that was being received.
213+
encountered_tool_use_eos: Option<(String, String)>,
210214
}
211215

212216
impl<'o, W> ChatContext<'o, W>
@@ -228,6 +232,7 @@ where
228232
tool_use_recursions: 0,
229233
current_user_input_id: None,
230234
tool_use_events: vec![],
235+
encountered_tool_use_eos: None,
231236
}
232237
}
233238

@@ -381,19 +386,43 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
381386
cursor::Show
382387
)?;
383388
}
384-
execute!(
385-
self.output,
386-
style::SetAttribute(Attribute::Bold),
387-
style::SetForegroundColor(Color::Red),
388-
style::Print(format!(
389-
"We're having trouble responding right now, please try again later: {:?}",
390-
err
391-
)),
392-
style::SetForegroundColor(Color::Reset),
393-
style::SetAttribute(Attribute::Reset),
394-
)?;
395-
if self.conversation_state.next_message.is_none() {
396-
self.conversation_state.history.pop_back();
389+
match err {
390+
parser::RecvError::UnexpectedToolUseEos {
391+
tool_use_id,
392+
name,
393+
message,
394+
} => {
395+
error!(
396+
tool_use_id,
397+
name, "The response stream ended before the entire tool use was received"
398+
);
399+
self.conversation_state.push_assistant_message(*message);
400+
self.encountered_tool_use_eos = Some((tool_use_id, name));
401+
if self.is_interactive {
402+
execute!(self.output, cursor::Hide)?;
403+
self.spinner = Some(Spinner::new(
404+
Spinners::Dots,
405+
"The generated tool use was too large, trying to divide up the work..."
406+
.to_string(),
407+
));
408+
}
409+
},
410+
err => {
411+
execute!(
412+
self.output,
413+
style::SetAttribute(Attribute::Bold),
414+
style::SetForegroundColor(Color::Red),
415+
style::Print(format!(
416+
"We're having trouble responding right now, please try again later: {:?}",
417+
err
418+
)),
419+
style::SetForegroundColor(Color::Reset),
420+
style::SetAttribute(Attribute::Reset),
421+
)?;
422+
if self.conversation_state.next_message.is_none() {
423+
self.conversation_state.history.pop_back();
424+
}
425+
},
397426
}
398427
break;
399428
},
@@ -490,6 +519,23 @@ Hi, I'm <g>Amazon Q</g>. Ask me anything.
490519
sigint_recver: &mut tokio::sync::mpsc::Receiver<()>,
491520
should_terminate: &Arc<AtomicBool>,
492521
) -> Result<Option<SendMessageOutput>, error::PromptAndSendError> {
522+
if let Some((tool_use_id, _)) = self.encountered_tool_use_eos.take() {
523+
let tool_results = vec![ToolResult {
524+
tool_use_id,
525+
content: vec![ToolResultContentBlock::Text(
526+
"The generated tool was too large, try again but this time split up the work between multiple tool uses".to_string(),
527+
)],
528+
status: ToolResultStatus::Error,
529+
}];
530+
self.conversation_state.add_tool_results(tool_results);
531+
self.send_tool_use_telemetry().await;
532+
return Ok(Some(
533+
self.client
534+
.send_message(self.conversation_state.as_sendable_conversation_state())
535+
.await?,
536+
));
537+
}
538+
493539
loop {
494540
// Tool uses that need to be executed.
495541
let mut queued_tools: Vec<(String, Tool)> = Vec::new();

crates/q_cli/src/cli/chat/parser.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Instant;
2+
13
use eyre::Result;
24
use fig_api_client::clients::SendMessageOutput;
35
use fig_api_client::model::{
@@ -6,6 +8,7 @@ use fig_api_client::model::{
68
ChatResponseStream,
79
ToolUse as FigToolUse,
810
};
11+
use thiserror::Error;
912
use tracing::{
1013
error,
1114
trace,
@@ -34,6 +37,21 @@ impl From<ToolUse> for FigToolUse {
3437
}
3538
}
3639

40+
#[derive(Debug, Error)]
41+
pub enum RecvError {
42+
#[error("{0}")]
43+
Client(#[from] fig_api_client::Error),
44+
#[error("{0}")]
45+
Json(#[from] serde_json::Error),
46+
/// Unexpected end of stream while receiving a tool use.
47+
#[error("Unexpected end of stream for tool: {} with id: {}", .name, .tool_use_id)]
48+
UnexpectedToolUseEos {
49+
tool_use_id: String,
50+
name: String,
51+
message: Box<Message>,
52+
},
53+
}
54+
3755
/// State associated with parsing a [ConverseStreamResponse] into a [Message].
3856
///
3957
/// # Usage
@@ -70,7 +88,7 @@ impl ResponseParser {
7088
}
7189

7290
/// Consumes the associated [ConverseStreamResponse] until a valid [ResponseEvent] is parsed.
73-
pub async fn recv(&mut self) -> Result<ResponseEvent> {
91+
pub async fn recv(&mut self) -> Result<ResponseEvent, RecvError> {
7492
if let Some((id, name)) = self.parsing_tool_use.take() {
7593
let tool_use = self.parse_tool_use(id, name).await?;
7694
self.tool_uses.push(tool_use.clone());
@@ -150,19 +168,63 @@ impl ResponseParser {
150168
/// Consumes the response stream until a valid [ToolUse] is parsed.
151169
///
152170
/// The arguments are the fields from the first [ChatResponseStream::ToolUseEvent] consumed.
153-
async fn parse_tool_use(&mut self, id: String, name: String) -> Result<ToolUse> {
171+
async fn parse_tool_use(&mut self, id: String, name: String) -> Result<ToolUse, RecvError> {
154172
let mut tool_string = String::new();
173+
let mut stop_seen = false;
174+
let start = Instant::now();
155175
while let Some(ChatResponseStream::ToolUseEvent { .. }) = self.peek().await? {
156176
if let Some(ChatResponseStream::ToolUseEvent { input, stop, .. }) = self.next().await? {
157177
if let Some(i) = input {
158178
tool_string.push_str(&i);
159179
}
160180
if let Some(true) = stop {
181+
stop_seen = true;
161182
break;
162183
}
163184
}
164185
}
165-
let args = serde_json::from_str(&tool_string)?;
186+
let args = match serde_json::from_str(&tool_string) {
187+
Ok(args) => args,
188+
Err(err) => {
189+
// If the stream ended before we saw the final tool use event (and thus failed
190+
// deserializing the tool use), this is most likely due to the backend dropping the
191+
// connection. The tool was too large!
192+
if self.peek().await?.is_none() && !stop_seen {
193+
error!(
194+
"Received an unexpected end of stream after spending ~{}s receiving tool events",
195+
Instant::now().duration_since(start).as_secs_f64()
196+
);
197+
self.tool_uses.push(ToolUse {
198+
id: id.clone(),
199+
name: name.clone(),
200+
args: serde_json::Value::Object(
201+
[(
202+
"key".to_string(),
203+
serde_json::Value::String(
204+
"fake tool use args - actual tool use was too large to include".to_string(),
205+
),
206+
)]
207+
.into_iter()
208+
.collect(),
209+
),
210+
});
211+
let message = Box::new(Message(ChatMessage::AssistantResponseMessage(
212+
AssistantResponseMessage {
213+
message_id: self.message_id.take(),
214+
content: std::mem::take(&mut self.assistant_text),
215+
tool_uses: Some(self.tool_uses.clone().into_iter().map(Into::into).collect()),
216+
},
217+
)));
218+
return Err(RecvError::UnexpectedToolUseEos {
219+
tool_use_id: id,
220+
name,
221+
message,
222+
});
223+
} else {
224+
return Err(err.into());
225+
}
226+
},
227+
};
166228
Ok(ToolUse { id, name, args })
167229
}
168230

0 commit comments

Comments
 (0)