Skip to content

Commit b581498

Browse files
authored
fix: introduce EventMsg::TurnAborted (#2365)
Introduces `EventMsg::TurnAborted` that should be sent in response to `Op::Interrupt`. In the MCP server, updates the handling of a `ClientRequest::InterruptConversation` request such that it sends the `Op::Interrupt` but does not respond to the request until it sees an `EventMsg::TurnAborted`.
1 parent 71cae06 commit b581498

File tree

9 files changed

+80
-50
lines changed

9 files changed

+80
-50
lines changed

codex-rs/core/src/codex.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use codex_apply_patch::ApplyPatchAction;
1414
use codex_apply_patch::MaybeApplyPatchVerified;
1515
use codex_apply_patch::maybe_parse_apply_patch_verified;
1616
use codex_login::CodexAuth;
17+
use codex_protocol::protocol::TurnAbortReason;
18+
use codex_protocol::protocol::TurnAbortedEvent;
1719
use futures::prelude::*;
1820
use mcp_types::CallToolResult;
1921
use serde::Serialize;
@@ -535,7 +537,7 @@ impl Session {
535537
pub fn set_task(&self, task: AgentTask) {
536538
let mut state = self.state.lock_unchecked();
537539
if let Some(current_task) = state.current_task.take() {
538-
current_task.abort();
540+
current_task.abort(TurnAbortReason::Replaced);
539541
}
540542
state.current_task = Some(task);
541543
}
@@ -852,13 +854,13 @@ impl Session {
852854
.await
853855
}
854856

855-
fn abort(&self) {
856-
info!("Aborting existing session");
857+
fn interrupt_task(&self) {
858+
info!("interrupt received: abort current task, if any");
857859
let mut state = self.state.lock_unchecked();
858860
state.pending_approvals.clear();
859861
state.pending_input.clear();
860862
if let Some(task) = state.current_task.take() {
861-
task.abort();
863+
task.abort(TurnAbortReason::Interrupted);
862864
}
863865
}
864866

@@ -894,7 +896,7 @@ impl Session {
894896

895897
impl Drop for Session {
896898
fn drop(&mut self) {
897-
self.abort();
899+
self.interrupt_task();
898900
}
899901
}
900902

@@ -964,14 +966,13 @@ impl AgentTask {
964966
}
965967
}
966968

967-
fn abort(self) {
969+
fn abort(self, reason: TurnAbortReason) {
970+
// TOCTOU?
968971
if !self.handle.is_finished() {
969972
self.handle.abort();
970973
let event = Event {
971974
id: self.sub_id,
972-
msg: EventMsg::Error(ErrorEvent {
973-
message: " Turn interrupted".to_string(),
974-
}),
975+
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
975976
};
976977
let tx_event = self.sess.tx_event.clone();
977978
tokio::spawn(async move {
@@ -994,7 +995,7 @@ async fn submission_loop(
994995
debug!(?sub, "Submission");
995996
match sub.op {
996997
Op::Interrupt => {
997-
sess.abort();
998+
sess.interrupt_task();
998999
}
9991000
Op::UserInput { items } => {
10001001
// attempt to inject input into current task
@@ -1065,13 +1066,13 @@ async fn submission_loop(
10651066
}
10661067
Op::ExecApproval { id, decision } => match decision {
10671068
ReviewDecision::Abort => {
1068-
sess.abort();
1069+
sess.interrupt_task();
10691070
}
10701071
other => sess.notify_approval(&id, other),
10711072
},
10721073
Op::PatchApproval { id, decision } => match decision {
10731074
ReviewDecision::Abort => {
1074-
sess.abort();
1075+
sess.interrupt_task();
10751076
}
10761077
other => sess.notify_approval(&id, other),
10771078
},

codex-rs/exec/src/event_processor_with_human_output.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use codex_core::protocol::PatchApplyBeginEvent;
2121
use codex_core::protocol::PatchApplyEndEvent;
2222
use codex_core::protocol::SessionConfiguredEvent;
2323
use codex_core::protocol::TaskCompleteEvent;
24+
use codex_core::protocol::TurnAbortReason;
2425
use codex_core::protocol::TurnDiffEvent;
2526
use owo_colors::OwoColorize;
2627
use owo_colors::Style;
@@ -522,6 +523,14 @@ impl EventProcessor for EventProcessorWithHumanOutput {
522523
EventMsg::GetHistoryEntryResponse(_) => {
523524
// Currently ignored in exec output.
524525
}
526+
EventMsg::TurnAborted(abort_reason) => match abort_reason.reason {
527+
TurnAbortReason::Interrupted => {
528+
ts_println!(self, "task interrupted");
529+
}
530+
TurnAbortReason::Replaced => {
531+
ts_println!(self, "task aborted: replaced by a new task");
532+
}
533+
},
525534
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
526535
}
527536
CodexStatus::Running

codex-rs/mcp-server/src/codex_message_processor.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub(crate) struct CodexMessageProcessor {
7777
codex_linux_sandbox_exe: Option<PathBuf>,
7878
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
7979
active_login: Arc<Mutex<Option<ActiveLogin>>>,
80+
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
81+
pending_interrupts: Arc<Mutex<HashMap<Uuid, Vec<RequestId>>>>,
8082
}
8183

8284
impl CodexMessageProcessor {
@@ -91,6 +93,7 @@ impl CodexMessageProcessor {
9193
codex_linux_sandbox_exe,
9294
conversation_listeners: HashMap::new(),
9395
active_login: Arc::new(Mutex::new(None)),
96+
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
9497
}
9598
}
9699

@@ -399,13 +402,14 @@ impl CodexMessageProcessor {
399402
return;
400403
};
401404

402-
let _ = conversation.submit(Op::Interrupt).await;
405+
// Record the pending interrupt so we can reply when TurnAborted arrives.
406+
{
407+
let mut map = self.pending_interrupts.lock().await;
408+
map.entry(conversation_id.0).or_default().push(request_id);
409+
}
403410

404-
// Apparently CodexConversation does not send an ack for Op::Interrupt,
405-
// so we can reply to the request right away.
406-
self.outgoing
407-
.send_response(request_id, InterruptConversationResponse {})
408-
.await;
411+
// Submit the interrupt; we'll respond upon TurnAborted.
412+
let _ = conversation.submit(Op::Interrupt).await;
409413
}
410414

411415
async fn add_conversation_listener(
@@ -433,6 +437,7 @@ impl CodexMessageProcessor {
433437
self.conversation_listeners
434438
.insert(subscription_id, cancel_tx);
435439
let outgoing_for_task = self.outgoing.clone();
440+
let pending_interrupts = self.pending_interrupts.clone();
436441
tokio::spawn(async move {
437442
loop {
438443
tokio::select! {
@@ -473,7 +478,7 @@ impl CodexMessageProcessor {
473478
})
474479
.await;
475480

476-
apply_bespoke_event_handling(event, conversation_id, conversation.clone(), outgoing_for_task.clone()).await;
481+
apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await;
477482
}
478483
}
479484
}
@@ -512,6 +517,7 @@ async fn apply_bespoke_event_handling(
512517
conversation_id: ConversationId,
513518
conversation: Arc<CodexConversation>,
514519
outgoing: Arc<OutgoingMessageSender>,
520+
pending_interrupts: Arc<Mutex<HashMap<Uuid, Vec<RequestId>>>>,
515521
) {
516522
let Event { id: event_id, msg } = event;
517523
match msg {
@@ -560,6 +566,22 @@ async fn apply_bespoke_event_handling(
560566
on_exec_approval_response(event_id, rx, conversation).await;
561567
});
562568
}
569+
// If this is a TurnAborted, reply to any pending interrupt requests.
570+
EventMsg::TurnAborted(turn_aborted_event) => {
571+
let pending = {
572+
let mut map = pending_interrupts.lock().await;
573+
map.remove(&conversation_id.0).unwrap_or_default()
574+
};
575+
if !pending.is_empty() {
576+
let response = InterruptConversationResponse {
577+
abort_reason: turn_aborted_event.reason,
578+
};
579+
for rid in pending {
580+
outgoing.send_response(rid, response.clone()).await;
581+
}
582+
}
583+
}
584+
563585
_ => {}
564586
}
565587
}

codex-rs/mcp-server/src/codex_tool_runner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ async fn run_codex_tool_session_inner(
272272
| EventMsg::TurnDiff(_)
273273
| EventMsg::GetHistoryEntryResponse(_)
274274
| EventMsg::PlanUpdate(_)
275+
| EventMsg::TurnAborted(_)
275276
| EventMsg::ShutdownComplete => {
276277
// For now, we do not do anything extra for these
277278
// events. Note that

codex-rs/mcp-server/src/conversation_loop.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ pub async fn run_conversation_loop(
5252
call_id,
5353
)
5454
.await;
55-
continue;
5655
}
5756
EventMsg::Error(_) => {
5857
error!("Codex runtime error");
@@ -75,7 +74,6 @@ pub async fn run_conversation_loop(
7574
event.id.clone(),
7675
)
7776
.await;
78-
continue;
7977
}
8078
EventMsg::TaskComplete(_) => {}
8179
EventMsg::SessionConfigured(_) => {
@@ -107,6 +105,7 @@ pub async fn run_conversation_loop(
107105
| EventMsg::PatchApplyEnd(_)
108106
| EventMsg::GetHistoryEntryResponse(_)
109107
| EventMsg::PlanUpdate(_)
108+
| EventMsg::TurnAborted(_)
110109
| EventMsg::ShutdownComplete => {
111110
// For now, we do not do anything extra for these
112111
// events. Note that

codex-rs/mcp-server/src/wire_format.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use codex_core::protocol::AskForApproval;
66
use codex_core::protocol::FileChange;
77
use codex_core::protocol::ReviewDecision;
88
use codex_core::protocol::SandboxPolicy;
9+
use codex_core::protocol::TurnAbortReason;
910
use codex_core::protocol_config_types::ReasoningEffort;
1011
use codex_core::protocol_config_types::ReasoningSummary;
1112
use mcp_types::RequestId;
@@ -191,9 +192,11 @@ pub struct InterruptConversationParams {
191192
pub conversation_id: ConversationId,
192193
}
193194

194-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
195+
#[derive(Serialize, Deserialize, Debug, Clone)]
195196
#[serde(rename_all = "camelCase")]
196-
pub struct InterruptConversationResponse {}
197+
pub struct InterruptConversationResponse {
198+
pub abort_reason: TurnAbortReason,
199+
}
197200

198201
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
199202
#[serde(rename_all = "camelCase")]

codex-rs/mcp-server/tests/interrupt.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use std::path::Path;
55

66
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
77
use codex_mcp_server::CodexToolCallParam;
8-
use mcp_types::JSONRPCResponse;
9-
use mcp_types::RequestId;
108
use serde_json::json;
119
use tempfile::TempDir;
1210
use tokio::time::timeout;
@@ -100,22 +98,13 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
10098
)
10199
.await?;
102100

103-
// Expect Codex to return an error or interruption response
104-
let codex_response: JSONRPCResponse = timeout(
101+
// Expect Codex to emit a TurnAborted event notification
102+
let _turn_aborted = timeout(
105103
DEFAULT_READ_TIMEOUT,
106-
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)),
104+
mcp_process.read_stream_until_notification_message("turn_aborted"),
107105
)
108106
.await??;
109107

110-
assert!(
111-
codex_response
112-
.result
113-
.as_object()
114-
.map(|o| o.contains_key("error"))
115-
.unwrap_or(false),
116-
"Expected an interruption or error result, got: {codex_response:?}"
117-
);
118-
119108
let codex_reply_request_id = mcp_process
120109
.send_codex_reply_tool_call(&session_id, "Second Run: run `sleep 60`")
121110
.await?;
@@ -131,21 +120,12 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
131120
)
132121
.await?;
133122

134-
// Expect Codex to return an error or interruption response
135-
let codex_response: JSONRPCResponse = timeout(
123+
// Expect Codex to emit a TurnAborted event notification
124+
let _turn_aborted = timeout(
136125
DEFAULT_READ_TIMEOUT,
137-
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_reply_request_id)),
126+
mcp_process.read_stream_until_notification_message("turn_aborted"),
138127
)
139128
.await??;
140-
141-
assert!(
142-
codex_response
143-
.result
144-
.as_object()
145-
.map(|o| o.contains_key("error"))
146-
.unwrap_or(false),
147-
"Expected an interruption or error result, got: {codex_response:?}"
148-
);
149129
Ok(())
150130
}
151131

codex-rs/protocol/src/protocol.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct Submission {
3939
#[non_exhaustive]
4040
pub enum Op {
4141
/// Abort current task.
42-
/// This server sends no corresponding Event
42+
/// This server sends [`EventMsg::TurnAborted`] in response.
4343
Interrupt,
4444

4545
/// Input from the user
@@ -422,6 +422,8 @@ pub enum EventMsg {
422422

423423
PlanUpdate(UpdatePlanArgs),
424424

425+
TurnAborted(TurnAbortedEvent),
426+
425427
/// Notification that the agent is shutting down.
426428
ShutdownComplete,
427429
}
@@ -745,6 +747,18 @@ pub struct Chunk {
745747
pub inserted_lines: Vec<String>,
746748
}
747749

750+
#[derive(Debug, Clone, Deserialize, Serialize)]
751+
pub struct TurnAbortedEvent {
752+
pub reason: TurnAbortReason,
753+
}
754+
755+
#[derive(Debug, Clone, Deserialize, Serialize)]
756+
#[serde(rename_all = "snake_case")]
757+
pub enum TurnAbortReason {
758+
Interrupted,
759+
Replaced,
760+
}
761+
748762
#[cfg(test)]
749763
mod tests {
750764
use super::*;

codex-rs/tui/src/chatwidget.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ impl ChatWidget<'_> {
635635
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => self.on_task_complete(),
636636
EventMsg::TokenCount(token_usage) => self.on_token_count(token_usage),
637637
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
638+
EventMsg::TurnAborted(_) => self.on_error("Turn interrupted".to_owned()),
638639
EventMsg::PlanUpdate(update) => self.on_plan_update(update),
639640
EventMsg::ExecApprovalRequest(ev) => self.on_exec_approval_request(id, ev),
640641
EventMsg::ApplyPatchApprovalRequest(ev) => self.on_apply_patch_approval_request(id, ev),

0 commit comments

Comments
 (0)