Skip to content

Commit eda50d8

Browse files
authored
feat: introduce ClientRequest::SendUserTurn (#2345)
This adds a new request type, `SendUserTurn`, that makes it possible to submit a `Op::UserTurn` operation (introduced in #2329) to a conversation. This PR also adds a new integration test that verifies that changing from `AskForApproval::UnlessTrusted` to `AskForApproval::Never` mid-conversation ensures that an elicitation is no longer sent for running `python3 -c print(42)`. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2345). * __->__ #2345 * #2329 * #2343 * #2340 * #2338
1 parent 17aa394 commit eda50d8

File tree

4 files changed

+279
-1
lines changed

4 files changed

+279
-1
lines changed

codex-rs/mcp-server/src/codex_message_processor.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use crate::wire_format::RemoveConversationListenerParams;
4242
use crate::wire_format::RemoveConversationSubscriptionResponse;
4343
use crate::wire_format::SendUserMessageParams;
4444
use crate::wire_format::SendUserMessageResponse;
45+
use crate::wire_format::SendUserTurnParams;
46+
use crate::wire_format::SendUserTurnResponse;
4547
use codex_core::protocol::InputItem as CoreInputItem;
4648
use codex_core::protocol::Op;
4749

@@ -78,6 +80,9 @@ impl CodexMessageProcessor {
7880
ClientRequest::SendUserMessage { request_id, params } => {
7981
self.send_user_message(request_id, params).await;
8082
}
83+
ClientRequest::SendUserTurn { request_id, params } => {
84+
self.send_user_turn(request_id, params).await;
85+
}
8186
ClientRequest::InterruptConversation { request_id, params } => {
8287
self.interrupt_conversation(request_id, params).await;
8388
}
@@ -169,6 +174,58 @@ impl CodexMessageProcessor {
169174
.await;
170175
}
171176

177+
async fn send_user_turn(&self, request_id: RequestId, params: SendUserTurnParams) {
178+
let SendUserTurnParams {
179+
conversation_id,
180+
items,
181+
cwd,
182+
approval_policy,
183+
sandbox_policy,
184+
model,
185+
effort,
186+
summary,
187+
} = params;
188+
189+
let Ok(conversation) = self
190+
.conversation_manager
191+
.get_conversation(conversation_id.0)
192+
.await
193+
else {
194+
let error = JSONRPCErrorError {
195+
code: INVALID_REQUEST_ERROR_CODE,
196+
message: format!("conversation not found: {conversation_id}"),
197+
data: None,
198+
};
199+
self.outgoing.send_error(request_id, error).await;
200+
return;
201+
};
202+
203+
let mapped_items: Vec<CoreInputItem> = items
204+
.into_iter()
205+
.map(|item| match item {
206+
WireInputItem::Text { text } => CoreInputItem::Text { text },
207+
WireInputItem::Image { image_url } => CoreInputItem::Image { image_url },
208+
WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path },
209+
})
210+
.collect();
211+
212+
let _ = conversation
213+
.submit(Op::UserTurn {
214+
items: mapped_items,
215+
cwd,
216+
approval_policy,
217+
sandbox_policy,
218+
model,
219+
effort,
220+
summary,
221+
})
222+
.await;
223+
224+
self.outgoing
225+
.send_response(request_id, SendUserTurnResponse {})
226+
.await;
227+
}
228+
172229
async fn interrupt_conversation(
173230
&mut self,
174231
request_id: RequestId,

codex-rs/mcp-server/src/wire_format.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ use std::collections::HashMap;
22
use std::fmt::Display;
33
use std::path::PathBuf;
44

5+
use codex_core::config_types::ReasoningEffort;
6+
use codex_core::config_types::ReasoningSummary;
7+
use codex_core::protocol::AskForApproval;
58
use codex_core::protocol::FileChange;
69
use codex_core::protocol::ReviewDecision;
10+
use codex_core::protocol::SandboxPolicy;
711
use mcp_types::RequestId;
812
use serde::Deserialize;
913
use serde::Serialize;
@@ -36,6 +40,11 @@ pub enum ClientRequest {
3640
request_id: RequestId,
3741
params: SendUserMessageParams,
3842
},
43+
SendUserTurn {
44+
#[serde(rename = "id")]
45+
request_id: RequestId,
46+
params: SendUserTurnParams,
47+
},
3948
InterruptConversation {
4049
#[serde(rename = "id")]
4150
request_id: RequestId,
@@ -120,6 +129,23 @@ pub struct SendUserMessageParams {
120129
pub items: Vec<InputItem>,
121130
}
122131

132+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
133+
#[serde(rename_all = "camelCase")]
134+
pub struct SendUserTurnParams {
135+
pub conversation_id: ConversationId,
136+
pub items: Vec<InputItem>,
137+
pub cwd: PathBuf,
138+
pub approval_policy: AskForApproval,
139+
pub sandbox_policy: SandboxPolicy,
140+
pub model: String,
141+
pub effort: ReasoningEffort,
142+
pub summary: ReasoningSummary,
143+
}
144+
145+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
146+
#[serde(rename_all = "camelCase")]
147+
pub struct SendUserTurnResponse {}
148+
123149
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
124150
#[serde(rename_all = "camelCase")]
125151
pub struct InterruptConversationParams {

codex-rs/mcp-server/tests/codex_message_processor_flow.rs

Lines changed: 186 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
use std::path::Path;
22

3+
use codex_core::config_types::ReasoningEffort;
4+
use codex_core::config_types::ReasoningSummary;
5+
use codex_core::protocol::AskForApproval;
6+
use codex_core::protocol::SandboxPolicy;
37
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
48
use codex_mcp_server::wire_format::AddConversationListenerParams;
59
use codex_mcp_server::wire_format::AddConversationSubscriptionResponse;
10+
use codex_mcp_server::wire_format::EXEC_COMMAND_APPROVAL_METHOD;
611
use codex_mcp_server::wire_format::NewConversationParams;
712
use codex_mcp_server::wire_format::NewConversationResponse;
813
use codex_mcp_server::wire_format::RemoveConversationListenerParams;
914
use codex_mcp_server::wire_format::RemoveConversationSubscriptionResponse;
1015
use codex_mcp_server::wire_format::SendUserMessageParams;
1116
use codex_mcp_server::wire_format::SendUserMessageResponse;
17+
use codex_mcp_server::wire_format::SendUserTurnParams;
18+
use codex_mcp_server::wire_format::SendUserTurnResponse;
1219
use mcp_test_support::McpProcess;
1320
use mcp_test_support::create_final_assistant_message_sse_response;
1421
use mcp_test_support::create_mock_chat_completions_server;
@@ -167,6 +174,184 @@ fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result
167174
Ok(codex_response)
168175
}
169176

177+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
178+
async fn test_send_user_turn_changes_approval_policy_behavior() {
179+
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
180+
println!(
181+
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
182+
);
183+
return;
184+
}
185+
186+
let tmp = TempDir::new().expect("tmp dir");
187+
let codex_home = tmp.path().join("codex_home");
188+
std::fs::create_dir(&codex_home).expect("create codex home dir");
189+
let working_directory = tmp.path().join("workdir");
190+
std::fs::create_dir(&working_directory).expect("create working directory");
191+
192+
// Mock server will request a python shell call for the first and second turn, then finish.
193+
let responses = vec![
194+
create_shell_sse_response(
195+
vec![
196+
"python3".to_string(),
197+
"-c".to_string(),
198+
"print(42)".to_string(),
199+
],
200+
Some(&working_directory),
201+
Some(5000),
202+
"call1",
203+
)
204+
.expect("create first shell sse response"),
205+
create_final_assistant_message_sse_response("done 1")
206+
.expect("create final assistant message 1"),
207+
create_shell_sse_response(
208+
vec![
209+
"python3".to_string(),
210+
"-c".to_string(),
211+
"print(42)".to_string(),
212+
],
213+
Some(&working_directory),
214+
Some(5000),
215+
"call2",
216+
)
217+
.expect("create second shell sse response"),
218+
create_final_assistant_message_sse_response("done 2")
219+
.expect("create final assistant message 2"),
220+
];
221+
let server = create_mock_chat_completions_server(responses).await;
222+
create_config_toml(&codex_home, &server.uri()).expect("write config");
223+
224+
// Start MCP server and initialize.
225+
let mut mcp = McpProcess::new(&codex_home).await.expect("spawn mcp");
226+
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
227+
.await
228+
.expect("init timeout")
229+
.expect("init error");
230+
231+
// 1) Start conversation with approval_policy=untrusted
232+
let new_conv_id = mcp
233+
.send_new_conversation_request(NewConversationParams {
234+
cwd: Some(working_directory.to_string_lossy().into_owned()),
235+
..Default::default()
236+
})
237+
.await
238+
.expect("send newConversation");
239+
let new_conv_resp: JSONRPCResponse = timeout(
240+
DEFAULT_READ_TIMEOUT,
241+
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
242+
)
243+
.await
244+
.expect("newConversation timeout")
245+
.expect("newConversation resp");
246+
let NewConversationResponse {
247+
conversation_id, ..
248+
} = to_response::<NewConversationResponse>(new_conv_resp)
249+
.expect("deserialize newConversation response");
250+
251+
// 2) addConversationListener
252+
let add_listener_id = mcp
253+
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
254+
.await
255+
.expect("send addConversationListener");
256+
let _: AddConversationSubscriptionResponse =
257+
to_response::<AddConversationSubscriptionResponse>(
258+
timeout(
259+
DEFAULT_READ_TIMEOUT,
260+
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
261+
)
262+
.await
263+
.expect("addConversationListener timeout")
264+
.expect("addConversationListener resp"),
265+
)
266+
.expect("deserialize addConversationListener response");
267+
268+
// 3) sendUserMessage triggers a shell call; approval policy is Untrusted so we should get an elicitation
269+
let send_user_id = mcp
270+
.send_send_user_message_request(SendUserMessageParams {
271+
conversation_id,
272+
items: vec![codex_mcp_server::wire_format::InputItem::Text {
273+
text: "run python".to_string(),
274+
}],
275+
})
276+
.await
277+
.expect("send sendUserMessage");
278+
let _send_user_resp: SendUserMessageResponse = to_response::<SendUserMessageResponse>(
279+
timeout(
280+
DEFAULT_READ_TIMEOUT,
281+
mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)),
282+
)
283+
.await
284+
.expect("sendUserMessage timeout")
285+
.expect("sendUserMessage resp"),
286+
)
287+
.expect("deserialize sendUserMessage response");
288+
289+
// Expect an ExecCommandApproval request (elicitation)
290+
let request = timeout(
291+
DEFAULT_READ_TIMEOUT,
292+
mcp.read_stream_until_request_message(),
293+
)
294+
.await
295+
.expect("waiting for exec approval request timeout")
296+
.expect("exec approval request");
297+
assert_eq!(request.method, EXEC_COMMAND_APPROVAL_METHOD);
298+
299+
// Approve so the first turn can complete
300+
mcp.send_response(
301+
request.id,
302+
serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }),
303+
)
304+
.await
305+
.expect("send approval response");
306+
307+
// Wait for first TaskComplete
308+
let _ = timeout(
309+
DEFAULT_READ_TIMEOUT,
310+
mcp.read_stream_until_notification_message("codex/event/task_complete"),
311+
)
312+
.await
313+
.expect("task_complete 1 timeout")
314+
.expect("task_complete 1 notification");
315+
316+
// 4) sendUserTurn with approval_policy=never should run without elicitation
317+
let send_turn_id = mcp
318+
.send_send_user_turn_request(SendUserTurnParams {
319+
conversation_id,
320+
items: vec![codex_mcp_server::wire_format::InputItem::Text {
321+
text: "run python again".to_string(),
322+
}],
323+
cwd: working_directory.clone(),
324+
approval_policy: AskForApproval::Never,
325+
sandbox_policy: SandboxPolicy::new_read_only_policy(),
326+
model: "mock-model".to_string(),
327+
effort: ReasoningEffort::Medium,
328+
summary: ReasoningSummary::Auto,
329+
})
330+
.await
331+
.expect("send sendUserTurn");
332+
// Acknowledge sendUserTurn
333+
let _send_turn_resp: SendUserTurnResponse = to_response::<SendUserTurnResponse>(
334+
timeout(
335+
DEFAULT_READ_TIMEOUT,
336+
mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)),
337+
)
338+
.await
339+
.expect("sendUserTurn timeout")
340+
.expect("sendUserTurn resp"),
341+
)
342+
.expect("deserialize sendUserTurn response");
343+
344+
// Ensure we do NOT receive an ExecCommandApproval request before the task completes.
345+
// If any Request is seen while waiting for task_complete, the helper will error and the test fails.
346+
let _ = timeout(
347+
DEFAULT_READ_TIMEOUT,
348+
mcp.read_stream_until_notification_message("codex/event/task_complete"),
349+
)
350+
.await
351+
.expect("task_complete 2 timeout")
352+
.expect("task_complete 2 notification");
353+
}
354+
170355
// Helper: minimal config.toml pointing at mock provider.
171356
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
172357
let config_toml = codex_home.join("config.toml");
@@ -175,7 +360,7 @@ fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()
175360
format!(
176361
r#"
177362
model = "mock-model"
178-
approval_policy = "never"
363+
approval_policy = "untrusted"
179364
180365
model_provider = "mock_provider"
181366

codex-rs/mcp-server/tests/common/mcp_process.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use codex_mcp_server::wire_format::AddConversationListenerParams;
2222
use codex_mcp_server::wire_format::NewConversationParams;
2323
use codex_mcp_server::wire_format::RemoveConversationListenerParams;
2424
use codex_mcp_server::wire_format::SendUserMessageParams;
25+
use codex_mcp_server::wire_format::SendUserTurnParams;
2526

2627
use mcp_types::CallToolRequestParams;
2728
use mcp_types::ClientCapabilities;
@@ -281,6 +282,15 @@ impl McpProcess {
281282
.await
282283
}
283284

285+
/// Send a `sendUserTurn` JSON-RPC request.
286+
pub async fn send_send_user_turn_request(
287+
&mut self,
288+
params: SendUserTurnParams,
289+
) -> anyhow::Result<i64> {
290+
let params = Some(serde_json::to_value(params)?);
291+
self.send_request("sendUserTurn", params).await
292+
}
293+
284294
async fn send_request(
285295
&mut self,
286296
method: &str,

0 commit comments

Comments
 (0)