Skip to content

Commit a2ce148

Browse files
aibrahim-oaiJeffCarpenter
authored andcommitted
Handle cancelling/aborting while processing a turn (openai#5543)
Currently we collect all all turn items in a vector, then we add it to the history on success. This result in losing those items on errors including aborting `ctrl+c`. This PR: - Adds the ability for the tool call to handle cancellation - bubble the turn items up to where we are recording this info Admittedly, this logic is an ad-hoc logic that doesn't handle a lot of error edge cases. The right thing to do is recording to the history on the spot as `items`/`tool calls output` come. However, this isn't possible because of having different `task_kind` that has different `conversation_histories`. The `try_run_turn` has no idea what thread are we using. We cannot also pass an `arc` to the `conversation_histories` because it's a private element of `state`. That's said, `abort` is the most common case and we should cover it until we remove `task kind`
1 parent 1d43bbc commit a2ce148

File tree

7 files changed

+339
-128
lines changed

7 files changed

+339
-128
lines changed

codex-rs/core/src/codex.rs

Lines changed: 42 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::function_tool::FunctionCallError;
1010
use crate::mcp::auth::McpAuthStatusEntry;
1111
use crate::parse_command::parse_command;
1212
use crate::parse_turn_item;
13+
use crate::response_processing::process_items;
1314
use crate::review_format::format_review_findings_block;
1415
use crate::terminal;
1516
use crate::user_notification::UserNotifier;
@@ -855,7 +856,7 @@ impl Session {
855856

856857
/// Records input items: always append to conversation history and
857858
/// persist these response items to rollout.
858-
async fn record_conversation_items(&self, items: &[ResponseItem]) {
859+
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
859860
self.record_into_history(items).await;
860861
self.persist_rollout_response_items(items).await;
861862
}
@@ -1608,109 +1609,13 @@ pub(crate) async fn run_task(
16081609
let token_limit_reached = total_usage_tokens
16091610
.map(|tokens| tokens >= limit)
16101611
.unwrap_or(false);
1611-
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
1612-
let mut responses = Vec::<ResponseInputItem>::new();
1613-
for processed_response_item in processed_items {
1614-
let ProcessedResponseItem { item, response } = processed_response_item;
1615-
match (&item, &response) {
1616-
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
1617-
// If the model returned a message, we need to record it.
1618-
items_to_record_in_conversation_history.push(item);
1619-
}
1620-
(
1621-
ResponseItem::LocalShellCall { .. },
1622-
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1623-
) => {
1624-
items_to_record_in_conversation_history.push(item);
1625-
items_to_record_in_conversation_history.push(
1626-
ResponseItem::FunctionCallOutput {
1627-
call_id: call_id.clone(),
1628-
output: output.clone(),
1629-
},
1630-
);
1631-
}
1632-
(
1633-
ResponseItem::FunctionCall { .. },
1634-
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1635-
) => {
1636-
items_to_record_in_conversation_history.push(item);
1637-
items_to_record_in_conversation_history.push(
1638-
ResponseItem::FunctionCallOutput {
1639-
call_id: call_id.clone(),
1640-
output: output.clone(),
1641-
},
1642-
);
1643-
}
1644-
(
1645-
ResponseItem::CustomToolCall { .. },
1646-
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
1647-
) => {
1648-
items_to_record_in_conversation_history.push(item);
1649-
items_to_record_in_conversation_history.push(
1650-
ResponseItem::CustomToolCallOutput {
1651-
call_id: call_id.clone(),
1652-
output: output.clone(),
1653-
},
1654-
);
1655-
}
1656-
(
1657-
ResponseItem::FunctionCall { .. },
1658-
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
1659-
) => {
1660-
items_to_record_in_conversation_history.push(item);
1661-
let output = match result {
1662-
Ok(call_tool_result) => {
1663-
convert_call_tool_result_to_function_call_output_payload(
1664-
call_tool_result,
1665-
)
1666-
}
1667-
Err(err) => FunctionCallOutputPayload {
1668-
content: err.clone(),
1669-
success: Some(false),
1670-
},
1671-
};
1672-
items_to_record_in_conversation_history.push(
1673-
ResponseItem::FunctionCallOutput {
1674-
call_id: call_id.clone(),
1675-
output,
1676-
},
1677-
);
1678-
}
1679-
(
1680-
ResponseItem::Reasoning {
1681-
id,
1682-
summary,
1683-
content,
1684-
encrypted_content,
1685-
},
1686-
None,
1687-
) => {
1688-
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
1689-
id: id.clone(),
1690-
summary: summary.clone(),
1691-
content: content.clone(),
1692-
encrypted_content: encrypted_content.clone(),
1693-
});
1694-
}
1695-
_ => {
1696-
warn!("Unexpected response item: {item:?} with response: {response:?}");
1697-
}
1698-
};
1699-
if let Some(response) = response {
1700-
responses.push(response);
1701-
}
1702-
}
1703-
1704-
// Only attempt to take the lock if there is something to record.
1705-
if !items_to_record_in_conversation_history.is_empty() {
1706-
if is_review_mode {
1707-
review_thread_history
1708-
.record_items(items_to_record_in_conversation_history.iter());
1709-
} else {
1710-
sess.record_conversation_items(&items_to_record_in_conversation_history)
1711-
.await;
1712-
}
1713-
}
1612+
let (responses, items_to_record_in_conversation_history) = process_items(
1613+
processed_items,
1614+
is_review_mode,
1615+
&mut review_thread_history,
1616+
&sess,
1617+
)
1618+
.await;
17141619

17151620
if token_limit_reached {
17161621
if auto_compact_recently_attempted {
@@ -1749,7 +1654,16 @@ pub(crate) async fn run_task(
17491654
}
17501655
continue;
17511656
}
1752-
Err(CodexErr::TurnAborted) => {
1657+
Err(CodexErr::TurnAborted {
1658+
dangling_artifacts: processed_items,
1659+
}) => {
1660+
let _ = process_items(
1661+
processed_items,
1662+
is_review_mode,
1663+
&mut review_thread_history,
1664+
&sess,
1665+
)
1666+
.await;
17531667
// Aborted turn is reported via a different event.
17541668
break;
17551669
}
@@ -1850,7 +1764,13 @@ async fn run_turn(
18501764
.await
18511765
{
18521766
Ok(output) => return Ok(output),
1853-
Err(CodexErr::TurnAborted) => return Err(CodexErr::TurnAborted),
1767+
Err(CodexErr::TurnAborted {
1768+
dangling_artifacts: processed_items,
1769+
}) => {
1770+
return Err(CodexErr::TurnAborted {
1771+
dangling_artifacts: processed_items,
1772+
});
1773+
}
18541774
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
18551775
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
18561776
Err(e @ CodexErr::Fatal(_)) => return Err(e),
@@ -1903,9 +1823,9 @@ async fn run_turn(
19031823
/// "handled" such that it produces a `ResponseInputItem` that needs to be
19041824
/// sent back to the model on the next turn.
19051825
#[derive(Debug)]
1906-
pub(crate) struct ProcessedResponseItem {
1907-
pub(crate) item: ResponseItem,
1908-
pub(crate) response: Option<ResponseInputItem>,
1826+
pub struct ProcessedResponseItem {
1827+
pub item: ResponseItem,
1828+
pub response: Option<ResponseInputItem>,
19091829
}
19101830

19111831
#[derive(Debug)]
@@ -1954,7 +1874,15 @@ async fn try_run_turn(
19541874
// Poll the next item from the model stream. We must inspect *both* Ok and Err
19551875
// cases so that transient stream failures (e.g., dropped SSE connection before
19561876
// `response.completed`) bubble up and trigger the caller's retry logic.
1957-
let event = stream.next().or_cancel(&cancellation_token).await?;
1877+
let event = match stream.next().or_cancel(&cancellation_token).await {
1878+
Ok(event) => event,
1879+
Err(codex_async_utils::CancelErr::Cancelled) => {
1880+
let processed_items = output.try_collect().await?;
1881+
return Err(CodexErr::TurnAborted {
1882+
dangling_artifacts: processed_items,
1883+
});
1884+
}
1885+
};
19581886

19591887
let event = match event {
19601888
Some(res) => res?,
@@ -1978,7 +1906,8 @@ async fn try_run_turn(
19781906
let payload_preview = call.payload.log_payload().into_owned();
19791907
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
19801908

1981-
let response = tool_runtime.handle_tool_call(call);
1909+
let response =
1910+
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
19821911

19831912
output.push_back(
19841913
async move {
@@ -2060,12 +1989,7 @@ async fn try_run_turn(
20601989
} => {
20611990
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
20621991
.await;
2063-
2064-
let processed_items = output
2065-
.try_collect()
2066-
.or_cancel(&cancellation_token)
2067-
.await??;
2068-
1992+
let processed_items = output.try_collect().await?;
20691993
let unified_diff = {
20701994
let mut tracker = turn_diff_tracker.lock().await;
20711995
tracker.get_unified_diff()
@@ -2169,7 +2093,7 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
21692093
}
21702094
})
21712095
}
2172-
fn convert_call_tool_result_to_function_call_output_payload(
2096+
pub(crate) fn convert_call_tool_result_to_function_call_output_payload(
21732097
call_tool_result: &CallToolResult,
21742098
) -> FunctionCallOutputPayload {
21752099
let CallToolResult {

codex-rs/core/src/error.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::codex::ProcessedResponseItem;
12
use crate::exec::ExecToolCallOutput;
23
use crate::token_data::KnownPlan;
34
use crate::token_data::PlanType;
@@ -53,8 +54,11 @@ pub enum SandboxErr {
5354

5455
#[derive(Error, Debug)]
5556
pub enum CodexErr {
57+
// todo(aibrahim): git rid of this error carrying the dangling artifacts
5658
#[error("turn aborted")]
57-
TurnAborted,
59+
TurnAborted {
60+
dangling_artifacts: Vec<ProcessedResponseItem>,
61+
},
5862

5963
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
6064
/// handshake has succeeded but **before** it finished emitting `response.completed`.
@@ -158,7 +162,9 @@ pub enum CodexErr {
158162

159163
impl From<CancelErr> for CodexErr {
160164
fn from(_: CancelErr) -> Self {
161-
CodexErr::TurnAborted
165+
CodexErr::TurnAborted {
166+
dangling_artifacts: Vec::new(),
167+
}
162168
}
163169
}
164170

codex-rs/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod mcp_tool_call;
3636
mod message_history;
3737
mod model_provider_info;
3838
pub mod parse_command;
39+
mod response_processing;
3940
pub mod sandboxing;
4041
pub mod token_data;
4142
mod truncate;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use crate::codex::Session;
2+
use crate::conversation_history::ConversationHistory;
3+
use codex_protocol::models::FunctionCallOutputPayload;
4+
use codex_protocol::models::ResponseInputItem;
5+
use codex_protocol::models::ResponseItem;
6+
use tracing::warn;
7+
8+
/// Process streamed `ResponseItem`s from the model into the pair of:
9+
/// - items we should record in conversation history; and
10+
/// - `ResponseInputItem`s to send back to the model on the next turn.
11+
pub(crate) async fn process_items(
12+
processed_items: Vec<crate::codex::ProcessedResponseItem>,
13+
is_review_mode: bool,
14+
review_thread_history: &mut ConversationHistory,
15+
sess: &Session,
16+
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
17+
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
18+
let mut responses = Vec::<ResponseInputItem>::new();
19+
for processed_response_item in processed_items {
20+
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
21+
match (&item, &response) {
22+
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
23+
// If the model returned a message, we need to record it.
24+
items_to_record_in_conversation_history.push(item);
25+
}
26+
(
27+
ResponseItem::LocalShellCall { .. },
28+
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
29+
) => {
30+
items_to_record_in_conversation_history.push(item);
31+
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
32+
call_id: call_id.clone(),
33+
output: output.clone(),
34+
});
35+
}
36+
(
37+
ResponseItem::FunctionCall { .. },
38+
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
39+
) => {
40+
items_to_record_in_conversation_history.push(item);
41+
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
42+
call_id: call_id.clone(),
43+
output: output.clone(),
44+
});
45+
}
46+
(
47+
ResponseItem::CustomToolCall { .. },
48+
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
49+
) => {
50+
items_to_record_in_conversation_history.push(item);
51+
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
52+
call_id: call_id.clone(),
53+
output: output.clone(),
54+
});
55+
}
56+
(
57+
ResponseItem::FunctionCall { .. },
58+
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
59+
) => {
60+
items_to_record_in_conversation_history.push(item);
61+
let output = match result {
62+
Ok(call_tool_result) => {
63+
crate::codex::convert_call_tool_result_to_function_call_output_payload(
64+
call_tool_result,
65+
)
66+
}
67+
Err(err) => FunctionCallOutputPayload {
68+
content: err.clone(),
69+
success: Some(false),
70+
},
71+
};
72+
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
73+
call_id: call_id.clone(),
74+
output,
75+
});
76+
}
77+
(
78+
ResponseItem::Reasoning {
79+
id,
80+
summary,
81+
content,
82+
encrypted_content,
83+
},
84+
None,
85+
) => {
86+
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
87+
id: id.clone(),
88+
summary: summary.clone(),
89+
content: content.clone(),
90+
encrypted_content: encrypted_content.clone(),
91+
});
92+
}
93+
_ => {
94+
warn!("Unexpected response item: {item:?} with response: {response:?}");
95+
}
96+
};
97+
if let Some(response) = response {
98+
responses.push(response);
99+
}
100+
}
101+
102+
// Only attempt to take the lock if there is something to record.
103+
if !items_to_record_in_conversation_history.is_empty() {
104+
if is_review_mode {
105+
review_thread_history.record_items(items_to_record_in_conversation_history.iter());
106+
} else {
107+
sess.record_conversation_items(&items_to_record_in_conversation_history)
108+
.await;
109+
}
110+
}
111+
(responses, items_to_record_in_conversation_history)
112+
}

0 commit comments

Comments
 (0)