Skip to content

Commit 01c0896

Browse files
authored
Adding interrupt Support to MCP (openai#1646)
1 parent 4082246 commit 01c0896

File tree

8 files changed

+389
-26
lines changed

8 files changed

+389
-26
lines changed

codex-rs/mcp-server/src/codex_tool_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl CodexToolCallParam {
168168

169169
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
170170
#[serde(rename_all = "camelCase")]
171-
pub(crate) struct CodexToolCallReplyParam {
171+
pub struct CodexToolCallReplyParam {
172172
/// The *session id* for this conversation.
173173
pub session_id: String,
174174

codex-rs/mcp-server/src/codex_tool_runner.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use mcp_types::CallToolResult;
2020
use mcp_types::ContentBlock;
2121
use mcp_types::RequestId;
2222
use mcp_types::TextContent;
23+
use serde_json::json;
2324
use tokio::sync::Mutex;
2425
use uuid::Uuid;
2526

@@ -39,6 +40,7 @@ pub async fn run_codex_tool_session(
3940
config: CodexConfig,
4041
outgoing: Arc<OutgoingMessageSender>,
4142
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
43+
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
4244
) {
4345
let (codex, first_event, _ctrl_c, session_id) = match init_codex(config).await {
4446
Ok(res) => res,
@@ -73,7 +75,10 @@ pub async fn run_codex_tool_session(
7375
RequestId::String(s) => s.clone(),
7476
RequestId::Integer(n) => n.to_string(),
7577
};
76-
78+
running_requests_id_to_codex_uuid
79+
.lock()
80+
.await
81+
.insert(id.clone(), session_id);
7782
let submission = Submission {
7883
id: sub_id.clone(),
7984
op: Op::UserInput {
@@ -85,33 +90,55 @@ pub async fn run_codex_tool_session(
8590

8691
if let Err(e) = codex.submit_with_id(submission).await {
8792
tracing::error!("Failed to submit initial prompt: {e}");
93+
// unregister the id so we don't keep it in the map
94+
running_requests_id_to_codex_uuid.lock().await.remove(&id);
95+
return;
8896
}
8997

90-
run_codex_tool_session_inner(codex, outgoing, id).await;
98+
run_codex_tool_session_inner(codex, outgoing, id, running_requests_id_to_codex_uuid).await;
9199
}
92100

93101
pub async fn run_codex_tool_session_reply(
94102
codex: Arc<Codex>,
95103
outgoing: Arc<OutgoingMessageSender>,
96104
request_id: RequestId,
97105
prompt: String,
106+
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
107+
session_id: Uuid,
98108
) {
109+
running_requests_id_to_codex_uuid
110+
.lock()
111+
.await
112+
.insert(request_id.clone(), session_id);
99113
if let Err(e) = codex
100114
.submit(Op::UserInput {
101115
items: vec![InputItem::Text { text: prompt }],
102116
})
103117
.await
104118
{
105119
tracing::error!("Failed to submit user input: {e}");
120+
// unregister the id so we don't keep it in the map
121+
running_requests_id_to_codex_uuid
122+
.lock()
123+
.await
124+
.remove(&request_id);
125+
return;
106126
}
107127

108-
run_codex_tool_session_inner(codex, outgoing, request_id).await;
128+
run_codex_tool_session_inner(
129+
codex,
130+
outgoing,
131+
request_id,
132+
running_requests_id_to_codex_uuid,
133+
)
134+
.await;
109135
}
110136

111137
async fn run_codex_tool_session_inner(
112138
codex: Arc<Codex>,
113139
outgoing: Arc<OutgoingMessageSender>,
114140
request_id: RequestId,
141+
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
115142
) {
116143
let request_id_str = match &request_id {
117144
RequestId::String(s) => s.clone(),
@@ -143,6 +170,14 @@ async fn run_codex_tool_session_inner(
143170
.await;
144171
continue;
145172
}
173+
EventMsg::Error(err_event) => {
174+
// Return a response to conclude the tool call when the Codex session reports an error (e.g., interruption).
175+
let result = json!({
176+
"error": err_event.message,
177+
});
178+
outgoing.send_response(request_id.clone(), result).await;
179+
break;
180+
}
146181
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
147182
reason,
148183
grant_root,
@@ -178,6 +213,11 @@ async fn run_codex_tool_session_inner(
178213
outgoing
179214
.send_response(request_id.clone(), result.into())
180215
.await;
216+
// unregister the id so we don't keep it in the map
217+
running_requests_id_to_codex_uuid
218+
.lock()
219+
.await
220+
.remove(&request_id);
181221
break;
182222
}
183223
EventMsg::SessionConfigured(_) => {
@@ -192,8 +232,7 @@ async fn run_codex_tool_session_inner(
192232
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
193233
// TODO: think how we want to support this in the MCP
194234
}
195-
EventMsg::Error(_)
196-
| EventMsg::TaskStarted
235+
EventMsg::TaskStarted
197236
| EventMsg::TokenCount(_)
198237
| EventMsg::AgentReasoning(_)
199238
| EventMsg::McpToolCallBegin(_)

codex-rs/mcp-server/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::outgoing_message::OutgoingMessage;
2727
use crate::outgoing_message::OutgoingMessageSender;
2828

2929
pub use crate::codex_tool_config::CodexToolCallParam;
30+
pub use crate::codex_tool_config::CodexToolCallReplyParam;
3031
pub use crate::exec_approval::ExecApprovalElicitRequestParams;
3132
pub use crate::exec_approval::ExecApprovalResponse;
3233
pub use crate::patch_approval::PatchApprovalElicitRequestParams;
@@ -81,7 +82,7 @@ pub async fn run_main(codex_linux_sandbox_exe: Option<PathBuf>) -> IoResult<()>
8182
match msg {
8283
JSONRPCMessage::Request(r) => processor.process_request(r).await,
8384
JSONRPCMessage::Response(r) => processor.process_response(r).await,
84-
JSONRPCMessage::Notification(n) => processor.process_notification(n),
85+
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
8586
JSONRPCMessage::Error(e) => processor.process_error(e),
8687
}
8788
}

codex-rs/mcp-server/src/message_processor.rs

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::outgoing_message::OutgoingMessageSender;
1010

1111
use codex_core::Codex;
1212
use codex_core::config::Config as CodexConfig;
13+
use codex_core::protocol::Submission;
1314
use mcp_types::CallToolRequestParams;
1415
use mcp_types::CallToolResult;
1516
use mcp_types::ClientRequest;
@@ -35,6 +36,7 @@ pub(crate) struct MessageProcessor {
3536
initialized: bool,
3637
codex_linux_sandbox_exe: Option<PathBuf>,
3738
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
39+
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
3840
}
3941

4042
impl MessageProcessor {
@@ -49,6 +51,7 @@ impl MessageProcessor {
4951
initialized: false,
5052
codex_linux_sandbox_exe,
5153
session_map: Arc::new(Mutex::new(HashMap::new())),
54+
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
5255
}
5356
}
5457

@@ -116,7 +119,7 @@ impl MessageProcessor {
116119
}
117120

118121
/// Handle a fire-and-forget JSON-RPC notification.
119-
pub(crate) fn process_notification(&mut self, notification: JSONRPCNotification) {
122+
pub(crate) async fn process_notification(&mut self, notification: JSONRPCNotification) {
120123
let server_notification = match ServerNotification::try_from(notification) {
121124
Ok(n) => n,
122125
Err(e) => {
@@ -129,7 +132,7 @@ impl MessageProcessor {
129132
// handler so additional logic can be implemented incrementally.
130133
match server_notification {
131134
ServerNotification::CancelledNotification(params) => {
132-
self.handle_cancelled_notification(params);
135+
self.handle_cancelled_notification(params).await;
133136
}
134137
ServerNotification::ProgressNotification(params) => {
135138
self.handle_progress_notification(params);
@@ -379,6 +382,7 @@ impl MessageProcessor {
379382
// Clone outgoing and session map to move into async task.
380383
let outgoing = self.outgoing.clone();
381384
let session_map = self.session_map.clone();
385+
let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone();
382386

383387
// Spawn an async task to handle the Codex session so that we do not
384388
// block the synchronous message-processing loop.
@@ -390,6 +394,7 @@ impl MessageProcessor {
390394
config,
391395
outgoing,
392396
session_map,
397+
running_requests_id_to_codex_uuid,
393398
)
394399
.await;
395400
});
@@ -464,13 +469,12 @@ impl MessageProcessor {
464469

465470
// Clone outgoing and session map to move into async task.
466471
let outgoing = self.outgoing.clone();
472+
let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone();
467473

468-
// Spawn an async task to handle the Codex session so that we do not
469-
// block the synchronous message-processing loop.
470-
task::spawn(async move {
474+
let codex = {
471475
let session_map = session_map_mutex.lock().await;
472-
let codex = match session_map.get(&session_id) {
473-
Some(codex) => codex,
476+
match session_map.get(&session_id).cloned() {
477+
Some(c) => c,
474478
None => {
475479
tracing::warn!("Session not found for session_id: {session_id}");
476480
let result = CallToolResult {
@@ -482,21 +486,32 @@ impl MessageProcessor {
482486
is_error: Some(true),
483487
structured_content: None,
484488
};
485-
// unwrap_or_default is fine here because we know the result is valid JSON
486489
outgoing
487490
.send_response(request_id, serde_json::to_value(result).unwrap_or_default())
488491
.await;
489492
return;
490493
}
491-
};
494+
}
495+
};
492496

493-
crate::codex_tool_runner::run_codex_tool_session_reply(
494-
codex.clone(),
495-
outgoing,
496-
request_id,
497-
prompt.clone(),
498-
)
499-
.await;
497+
// Spawn the long-running reply handler.
498+
tokio::spawn({
499+
let codex = codex.clone();
500+
let outgoing = outgoing.clone();
501+
let prompt = prompt.clone();
502+
let running_requests_id_to_codex_uuid = running_requests_id_to_codex_uuid.clone();
503+
504+
async move {
505+
crate::codex_tool_runner::run_codex_tool_session_reply(
506+
codex,
507+
outgoing,
508+
request_id,
509+
prompt,
510+
running_requests_id_to_codex_uuid,
511+
session_id,
512+
)
513+
.await;
514+
}
500515
});
501516
}
502517

@@ -518,11 +533,58 @@ impl MessageProcessor {
518533
// Notification handlers
519534
// ---------------------------------------------------------------------
520535

521-
fn handle_cancelled_notification(
536+
async fn handle_cancelled_notification(
522537
&self,
523538
params: <mcp_types::CancelledNotification as mcp_types::ModelContextProtocolNotification>::Params,
524539
) {
525-
tracing::info!("notifications/cancelled -> params: {:?}", params);
540+
let request_id = params.request_id;
541+
// Create a stable string form early for logging and submission id.
542+
let request_id_string = match &request_id {
543+
RequestId::String(s) => s.clone(),
544+
RequestId::Integer(i) => i.to_string(),
545+
};
546+
547+
// Obtain the session_id while holding the first lock, then release.
548+
let session_id = {
549+
let map_guard = self.running_requests_id_to_codex_uuid.lock().await;
550+
match map_guard.get(&request_id) {
551+
Some(id) => *id, // Uuid is Copy
552+
None => {
553+
tracing::warn!("Session not found for request_id: {}", request_id_string);
554+
return;
555+
}
556+
}
557+
};
558+
tracing::info!("session_id: {session_id}");
559+
560+
// Obtain the Codex Arc while holding the session_map lock, then release.
561+
let codex_arc = {
562+
let sessions_guard = self.session_map.lock().await;
563+
match sessions_guard.get(&session_id) {
564+
Some(codex) => Arc::clone(codex),
565+
None => {
566+
tracing::warn!("Session not found for session_id: {session_id}");
567+
return;
568+
}
569+
}
570+
};
571+
572+
// Submit interrupt to Codex.
573+
let err = codex_arc
574+
.submit_with_id(Submission {
575+
id: request_id_string,
576+
op: codex_core::protocol::Op::Interrupt,
577+
})
578+
.await;
579+
if let Err(e) = err {
580+
tracing::error!("Failed to submit interrupt to Codex: {e}");
581+
return;
582+
}
583+
// unregister the id so we don't keep it in the map
584+
self.running_requests_id_to_codex_uuid
585+
.lock()
586+
.await
587+
.remove(&request_id);
526588
}
527589

528590
fn handle_progress_notification(

0 commit comments

Comments
 (0)