Skip to content

Commit fa8dc70

Browse files
aibrahim-oaiHolovkat
authored andcommitted
Handle cancelling/aborting while processing a turn (#5543)
Currently we collect all all turn items in a vector, then we add it to the history on success. This result in losing those items on errors including aborting `ctrl+c`. This PR: - Adds the ability for the tool call to handle cancellation - bubble the turn items up to where we are recording this info Admittedly, this logic is an ad-hoc logic that doesn't handle a lot of error edge cases. The right thing to do is recording to the history on the spot as `items`/`tool calls output` come. However, this isn't possible because of having different `task_kind` that has different `conversation_histories`. The `try_run_turn` has no idea what thread are we using. We cannot also pass an `arc` to the `conversation_histories` because it's a private element of `state`. That's said, `abort` is the most common case and we should cover it until we remove `task kind`
1 parent ff97dbb commit fa8dc70

File tree

7 files changed

+339
-128
lines changed

7 files changed

+339
-128
lines changed

codex-rs/core/src/codex.rs

Lines changed: 42 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::function_tool::FunctionCallError;
1111
use crate::mcp::auth::McpAuthStatusEntry;
1212
use crate::parse_command::parse_command;
1313
use crate::parse_turn_item;
14+
use crate::response_processing::process_items;
1415
use crate::review_format::format_review_findings_block;
1516
use crate::terminal;
1617
use crate::user_notification::UserNotifier;
@@ -893,7 +894,7 @@ impl Session {
893894

894895
/// Records input items: always append to conversation history and
895896
/// persist these response items to rollout.
896-
async fn record_conversation_items(&self, items: &[ResponseItem]) {
897+
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
897898
self.record_into_history(items).await;
898899
self.persist_rollout_response_items(items).await;
899900
}
@@ -1786,109 +1787,13 @@ pub(crate) async fn run_task(
17861787
let token_limit_reached = total_usage_tokens
17871788
.map(|tokens| tokens >= limit)
17881789
.unwrap_or(false);
1789-
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
1790-
let mut responses = Vec::<ResponseInputItem>::new();
1791-
for processed_response_item in processed_items {
1792-
let ProcessedResponseItem { item, response } = processed_response_item;
1793-
match (&item, &response) {
1794-
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
1795-
// If the model returned a message, we need to record it.
1796-
items_to_record_in_conversation_history.push(item);
1797-
}
1798-
(
1799-
ResponseItem::LocalShellCall { .. },
1800-
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1801-
) => {
1802-
items_to_record_in_conversation_history.push(item);
1803-
items_to_record_in_conversation_history.push(
1804-
ResponseItem::FunctionCallOutput {
1805-
call_id: call_id.clone(),
1806-
output: output.clone(),
1807-
},
1808-
);
1809-
}
1810-
(
1811-
ResponseItem::FunctionCall { .. },
1812-
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1813-
) => {
1814-
items_to_record_in_conversation_history.push(item);
1815-
items_to_record_in_conversation_history.push(
1816-
ResponseItem::FunctionCallOutput {
1817-
call_id: call_id.clone(),
1818-
output: output.clone(),
1819-
},
1820-
);
1821-
}
1822-
(
1823-
ResponseItem::CustomToolCall { .. },
1824-
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
1825-
) => {
1826-
items_to_record_in_conversation_history.push(item);
1827-
items_to_record_in_conversation_history.push(
1828-
ResponseItem::CustomToolCallOutput {
1829-
call_id: call_id.clone(),
1830-
output: output.clone(),
1831-
},
1832-
);
1833-
}
1834-
(
1835-
ResponseItem::FunctionCall { .. },
1836-
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
1837-
) => {
1838-
items_to_record_in_conversation_history.push(item);
1839-
let output = match result {
1840-
Ok(call_tool_result) => {
1841-
convert_call_tool_result_to_function_call_output_payload(
1842-
call_tool_result,
1843-
)
1844-
}
1845-
Err(err) => FunctionCallOutputPayload {
1846-
content: err.clone(),
1847-
success: Some(false),
1848-
},
1849-
};
1850-
items_to_record_in_conversation_history.push(
1851-
ResponseItem::FunctionCallOutput {
1852-
call_id: call_id.clone(),
1853-
output,
1854-
},
1855-
);
1856-
}
1857-
(
1858-
ResponseItem::Reasoning {
1859-
id,
1860-
summary,
1861-
content,
1862-
encrypted_content,
1863-
},
1864-
None,
1865-
) => {
1866-
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
1867-
id: id.clone(),
1868-
summary: summary.clone(),
1869-
content: content.clone(),
1870-
encrypted_content: encrypted_content.clone(),
1871-
});
1872-
}
1873-
_ => {
1874-
warn!("Unexpected response item: {item:?} with response: {response:?}");
1875-
}
1876-
};
1877-
if let Some(response) = response {
1878-
responses.push(response);
1879-
}
1880-
}
1881-
1882-
// Only attempt to take the lock if there is something to record.
1883-
if !items_to_record_in_conversation_history.is_empty() {
1884-
if is_review_mode {
1885-
review_thread_history
1886-
.record_items(items_to_record_in_conversation_history.iter());
1887-
} else {
1888-
sess.record_conversation_items(&items_to_record_in_conversation_history)
1889-
.await;
1890-
}
1891-
}
1790+
let (responses, items_to_record_in_conversation_history) = process_items(
1791+
processed_items,
1792+
is_review_mode,
1793+
&mut review_thread_history,
1794+
&sess,
1795+
)
1796+
.await;
18921797

18931798
if token_limit_reached {
18941799
if auto_compact_recently_attempted {
@@ -1927,7 +1832,16 @@ pub(crate) async fn run_task(
19271832
}
19281833
continue;
19291834
}
1930-
Err(CodexErr::TurnAborted) => {
1835+
Err(CodexErr::TurnAborted {
1836+
dangling_artifacts: processed_items,
1837+
}) => {
1838+
let _ = process_items(
1839+
processed_items,
1840+
is_review_mode,
1841+
&mut review_thread_history,
1842+
&sess,
1843+
)
1844+
.await;
19311845
// Aborted turn is reported via a different event.
19321846
break;
19331847
}
@@ -2068,7 +1982,13 @@ async fn run_turn(
20681982
.await
20691983
{
20701984
Ok(output) => return Ok(output),
2071-
Err(CodexErr::TurnAborted) => return Err(CodexErr::TurnAborted),
1985+
Err(CodexErr::TurnAborted {
1986+
dangling_artifacts: processed_items,
1987+
}) => {
1988+
return Err(CodexErr::TurnAborted {
1989+
dangling_artifacts: processed_items,
1990+
});
1991+
}
20721992
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
20731993
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
20741994
Err(e @ CodexErr::Fatal(_)) => return Err(e),
@@ -2121,9 +2041,9 @@ async fn run_turn(
21212041
/// "handled" such that it produces a `ResponseInputItem` that needs to be
21222042
/// sent back to the model on the next turn.
21232043
#[derive(Debug)]
2124-
pub(crate) struct ProcessedResponseItem {
2125-
pub(crate) item: ResponseItem,
2126-
pub(crate) response: Option<ResponseInputItem>,
2044+
pub struct ProcessedResponseItem {
2045+
pub item: ResponseItem,
2046+
pub response: Option<ResponseInputItem>,
21272047
}
21282048

21292049
#[derive(Debug)]
@@ -2172,7 +2092,15 @@ async fn try_run_turn(
21722092
// Poll the next item from the model stream. We must inspect *both* Ok and Err
21732093
// cases so that transient stream failures (e.g., dropped SSE connection before
21742094
// `response.completed`) bubble up and trigger the caller's retry logic.
2175-
let event = stream.next().or_cancel(&cancellation_token).await?;
2095+
let event = match stream.next().or_cancel(&cancellation_token).await {
2096+
Ok(event) => event,
2097+
Err(codex_async_utils::CancelErr::Cancelled) => {
2098+
let processed_items = output.try_collect().await?;
2099+
return Err(CodexErr::TurnAborted {
2100+
dangling_artifacts: processed_items,
2101+
});
2102+
}
2103+
};
21762104

21772105
let event = match event {
21782106
Some(res) => res?,
@@ -2196,7 +2124,8 @@ async fn try_run_turn(
21962124
let payload_preview = call.payload.log_payload().into_owned();
21972125
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
21982126

2199-
let response = tool_runtime.handle_tool_call(call);
2127+
let response =
2128+
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
22002129

22012130
output.push_back(
22022131
async move {
@@ -2278,12 +2207,7 @@ async fn try_run_turn(
22782207
} => {
22792208
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
22802209
.await;
2281-
2282-
let processed_items = output
2283-
.try_collect()
2284-
.or_cancel(&cancellation_token)
2285-
.await??;
2286-
2210+
let processed_items = output.try_collect().await?;
22872211
let unified_diff = {
22882212
let mut tracker = turn_diff_tracker.lock().await;
22892213
tracker.get_unified_diff()
@@ -2387,7 +2311,7 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
23872311
}
23882312
})
23892313
}
2390-
fn convert_call_tool_result_to_function_call_output_payload(
2314+
pub(crate) fn convert_call_tool_result_to_function_call_output_payload(
23912315
call_tool_result: &CallToolResult,
23922316
) -> FunctionCallOutputPayload {
23932317
let CallToolResult {

codex-rs/core/src/error.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::codex::ProcessedResponseItem;
12
use crate::exec::ExecToolCallOutput;
23
use crate::token_data::KnownPlan;
34
use crate::token_data::PlanType;
@@ -53,8 +54,11 @@ pub enum SandboxErr {
5354

5455
#[derive(Error, Debug)]
5556
pub enum CodexErr {
57+
// todo(aibrahim): git rid of this error carrying the dangling artifacts
5658
#[error("turn aborted")]
57-
TurnAborted,
59+
TurnAborted {
60+
dangling_artifacts: Vec<ProcessedResponseItem>,
61+
},
5862

5963
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
6064
/// handshake has succeeded but **before** it finished emitting `response.completed`.
@@ -158,7 +162,9 @@ pub enum CodexErr {
158162

159163
impl From<CancelErr> for CodexErr {
160164
fn from(_: CancelErr) -> Self {
161-
CodexErr::TurnAborted
165+
CodexErr::TurnAborted {
166+
dangling_artifacts: Vec::new(),
167+
}
162168
}
163169
}
164170

codex-rs/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod memory;
3737
mod message_history;
3838
mod model_provider_info;
3939
pub mod parse_command;
40+
mod response_processing;
4041
pub mod sandboxing;
4142
pub mod token_data;
4243
mod truncate;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use crate::codex::Session;
2+
use crate::conversation_history::ConversationHistory;
3+
use codex_protocol::models::FunctionCallOutputPayload;
4+
use codex_protocol::models::ResponseInputItem;
5+
use codex_protocol::models::ResponseItem;
6+
use tracing::warn;
7+
8+
/// Process streamed `ResponseItem`s from the model into the pair of:
9+
/// - items we should record in conversation history; and
10+
/// - `ResponseInputItem`s to send back to the model on the next turn.
11+
pub(crate) async fn process_items(
12+
processed_items: Vec<crate::codex::ProcessedResponseItem>,
13+
is_review_mode: bool,
14+
review_thread_history: &mut ConversationHistory,
15+
sess: &Session,
16+
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
17+
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
18+
let mut responses = Vec::<ResponseInputItem>::new();
19+
for processed_response_item in processed_items {
20+
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
21+
match (&item, &response) {
22+
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
23+
// If the model returned a message, we need to record it.
24+
items_to_record_in_conversation_history.push(item);
25+
}
26+
(
27+
ResponseItem::LocalShellCall { .. },
28+
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
29+
) => {
30+
items_to_record_in_conversation_history.push(item);
31+
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
32+
call_id: call_id.clone(),
33+
output: output.clone(),
34+
});
35+
}
36+
(
37+
ResponseItem::FunctionCall { .. },
38+
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
39+
) => {
40+
items_to_record_in_conversation_history.push(item);
41+
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
42+
call_id: call_id.clone(),
43+
output: output.clone(),
44+
});
45+
}
46+
(
47+
ResponseItem::CustomToolCall { .. },
48+
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
49+
) => {
50+
items_to_record_in_conversation_history.push(item);
51+
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
52+
call_id: call_id.clone(),
53+
output: output.clone(),
54+
});
55+
}
56+
(
57+
ResponseItem::FunctionCall { .. },
58+
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
59+
) => {
60+
items_to_record_in_conversation_history.push(item);
61+
let output = match result {
62+
Ok(call_tool_result) => {
63+
crate::codex::convert_call_tool_result_to_function_call_output_payload(
64+
call_tool_result,
65+
)
66+
}
67+
Err(err) => FunctionCallOutputPayload {
68+
content: err.clone(),
69+
success: Some(false),
70+
},
71+
};
72+
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
73+
call_id: call_id.clone(),
74+
output,
75+
});
76+
}
77+
(
78+
ResponseItem::Reasoning {
79+
id,
80+
summary,
81+
content,
82+
encrypted_content,
83+
},
84+
None,
85+
) => {
86+
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
87+
id: id.clone(),
88+
summary: summary.clone(),
89+
content: content.clone(),
90+
encrypted_content: encrypted_content.clone(),
91+
});
92+
}
93+
_ => {
94+
warn!("Unexpected response item: {item:?} with response: {response:?}");
95+
}
96+
};
97+
if let Some(response) = response {
98+
responses.push(response);
99+
}
100+
}
101+
102+
// Only attempt to take the lock if there is something to record.
103+
if !items_to_record_in_conversation_history.is_empty() {
104+
if is_review_mode {
105+
review_thread_history.record_items(items_to_record_in_conversation_history.iter());
106+
} else {
107+
sess.record_conversation_items(&items_to_record_in_conversation_history)
108+
.await;
109+
}
110+
}
111+
(responses, items_to_record_in_conversation_history)
112+
}

0 commit comments

Comments
 (0)