Skip to content

Commit 983362b

Browse files
fix: stream timeout handling (#732)
1 parent 9d449df commit 983362b

File tree

2 files changed

+63
-6
lines changed

2 files changed

+63
-6
lines changed

crates/q_cli/src/cli/chat/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,34 @@ where
677677
},
678678
}
679679
},
680+
Err(RecvError::StreamTimeout { source, duration }) => {
681+
error!(
682+
?source,
683+
"Encountered a stream timeout after waiting for {}s",
684+
duration.as_secs()
685+
);
686+
if self.interactive {
687+
execute!(self.output, cursor::Hide)?;
688+
self.spinner = Some(Spinner::new(Spinners::Dots, "Dividing up the work...".to_string()));
689+
}
690+
// For stream timeouts, we'll tell the model to try and split its response into
691+
// smaller chunks.
692+
self.conversation_state
693+
.push_assistant_message(AssistantResponseMessage {
694+
message_id: None,
695+
content: "Fake message - actual message took too long to generate".to_string(),
696+
tool_uses: None,
697+
});
698+
self.conversation_state.append_new_user_message(
699+
"You took too long to respond - try to split up the work into smaller steps.".to_string(),
700+
);
701+
self.send_tool_use_telemetry().await;
702+
return Ok(ChatState::HandleResponseStream(
703+
self.client
704+
.send_message(self.conversation_state.as_sendable_conversation_state())
705+
.await?,
706+
));
707+
},
680708
Err(RecvError::UnexpectedToolUseEos {
681709
tool_use_id,
682710
name,

crates/q_cli/src/cli/chat/parser.rs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,23 @@ pub enum RecvError {
4646
Client(#[from] fig_api_client::Error),
4747
#[error("{0}")]
4848
Json(#[from] serde_json::Error),
49+
/// An error was encountered while waiting for the next event in the stream after a noticeably
50+
/// long wait time.
51+
///
52+
/// *Context*: the client can throw an error after ~100s of waiting with no response, likely due
53+
/// to an exceptionally complex tool use taking too long to generate.
54+
#[error("The stream ended after {}s: {source}", .duration.as_secs())]
55+
StreamTimeout {
56+
source: fig_api_client::Error,
57+
duration: std::time::Duration,
58+
},
4959
/// Unexpected end of stream while receiving a tool use.
60+
///
61+
/// *Context*: the stream can unexpectedly end with `Ok(None)` while waiting for an
62+
/// exceptionally complex tool use. This is due to some proxy server dropping idle
63+
/// connections after some timeout is reached.
64+
///
65+
/// TODO: should this be removed?
5066
#[error("Unexpected end of stream for tool: {} with id: {}", .name, .tool_use_id)]
5167
UnexpectedToolUseEos {
5268
tool_use_id: String,
@@ -154,7 +170,7 @@ impl ResponseParser {
154170
};
155171
return Ok(ResponseEvent::EndStream { message });
156172
},
157-
Err(err) => return Err(err.into()),
173+
Err(err) => return Err(err),
158174
}
159175
}
160176
}
@@ -221,7 +237,7 @@ impl ResponseParser {
221237
}
222238

223239
/// Returns the next event in the [SendMessageOutput] without consuming it.
224-
async fn peek(&mut self) -> Result<Option<&ChatResponseStream>, fig_api_client::Error> {
240+
async fn peek(&mut self) -> Result<Option<&ChatResponseStream>, RecvError> {
225241
if self.peek.is_some() {
226242
return Ok(self.peek.as_ref());
227243
}
@@ -235,14 +251,27 @@ impl ResponseParser {
235251
}
236252

237253
/// Consumes the next [SendMessageOutput] event.
238-
async fn next(&mut self) -> Result<Option<ChatResponseStream>, fig_api_client::Error> {
254+
async fn next(&mut self) -> Result<Option<ChatResponseStream>, RecvError> {
239255
if let Some(ev) = self.peek.take() {
240256
return Ok(Some(ev));
241257
}
242258
trace!("Attempting to recv next event");
243-
let r = self.response.recv().await?;
244-
trace!(?r, "Received new event");
245-
Ok(r)
259+
let start = std::time::Instant::now();
260+
let result = self.response.recv().await;
261+
let duration = std::time::Instant::now().duration_since(start);
262+
match result {
263+
Ok(r) => {
264+
trace!(?r, "Received new event");
265+
Ok(r)
266+
},
267+
Err(err) => {
268+
if duration.as_secs() >= 59 {
269+
Err(RecvError::StreamTimeout { source: err, duration })
270+
} else {
271+
Err(err.into())
272+
}
273+
},
274+
}
246275
}
247276
}
248277

0 commit comments

Comments
 (0)