Skip to content

Commit 28ff364

Browse files
authored
feat: update process ID for event handling (openai#7261)
1 parent 981e2f7 commit 28ff364

File tree

17 files changed

+473
-163
lines changed

17 files changed

+473
-163
lines changed

codex-rs/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/app-server-protocol/src/protocol/v2.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,8 @@ pub enum ThreadItem {
10201020
command: String,
10211021
/// The command's working directory.
10221022
cwd: PathBuf,
1023+
/// Identifier for the underlying PTY process (when available).
1024+
process_id: Option<String>,
10231025
status: CommandExecutionStatus,
10241026
/// A best-effort parsing of the command to understand the action(s) it will perform.
10251027
/// This returns a list of CommandAction objects because a single shell command may

codex-rs/app-server/src/bespoke_event_handling.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,11 +449,13 @@ pub(crate) async fn apply_bespoke_event_handling(
449449
.collect::<Vec<_>>();
450450
let command = shlex_join(&exec_command_begin_event.command);
451451
let cwd = exec_command_begin_event.cwd;
452+
let process_id = exec_command_begin_event.process_id;
452453

453454
let item = ThreadItem::CommandExecution {
454455
id: item_id,
455456
command,
456457
cwd,
458+
process_id,
457459
status: CommandExecutionStatus::InProgress,
458460
command_actions,
459461
aggregated_output: None,
@@ -486,6 +488,7 @@ pub(crate) async fn apply_bespoke_event_handling(
486488
command,
487489
cwd,
488490
parsed_cmd,
491+
process_id,
489492
aggregated_output,
490493
exit_code,
491494
duration,
@@ -514,6 +517,7 @@ pub(crate) async fn apply_bespoke_event_handling(
514517
id: call_id,
515518
command: shlex_join(&command),
516519
cwd,
520+
process_id,
517521
status,
518522
command_actions,
519523
aggregated_output,
@@ -649,6 +653,7 @@ async fn complete_command_execution_item(
649653
item_id: String,
650654
command: String,
651655
cwd: PathBuf,
656+
process_id: Option<String>,
652657
command_actions: Vec<V2ParsedCommand>,
653658
status: CommandExecutionStatus,
654659
outgoing: &OutgoingMessageSender,
@@ -657,6 +662,7 @@ async fn complete_command_execution_item(
657662
id: item_id,
658663
command,
659664
cwd,
665+
process_id,
660666
status,
661667
command_actions,
662668
aggregated_output: None,
@@ -1015,6 +1021,7 @@ async fn on_command_execution_request_approval_response(
10151021
item_id.clone(),
10161022
command.clone(),
10171023
cwd.clone(),
1024+
None,
10181025
command_actions.clone(),
10191026
status,
10201027
outgoing.as_ref(),

codex-rs/app-server/tests/common/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub use mcp_process::McpProcess;
1515
pub use mock_model_server::create_mock_chat_completions_server;
1616
pub use mock_model_server::create_mock_chat_completions_server_unchecked;
1717
pub use responses::create_apply_patch_sse_response;
18+
pub use responses::create_exec_command_sse_response;
1819
pub use responses::create_final_assistant_message_sse_response;
1920
pub use responses::create_shell_command_sse_response;
2021
pub use rollout::create_fake_rollout;

codex-rs/app-server/tests/common/responses.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,42 @@ pub fn create_apply_patch_sse_response(
9494
);
9595
Ok(sse)
9696
}
97+
98+
pub fn create_exec_command_sse_response(call_id: &str) -> anyhow::Result<String> {
99+
let (cmd, args) = if cfg!(windows) {
100+
("cmd.exe", vec!["/d", "/c", "echo hi"])
101+
} else {
102+
("/bin/sh", vec!["-c", "echo hi"])
103+
};
104+
let command = std::iter::once(cmd.to_string())
105+
.chain(args.into_iter().map(str::to_string))
106+
.collect::<Vec<_>>();
107+
let tool_call_arguments = serde_json::to_string(&json!({
108+
"cmd": command.join(" "),
109+
"yield_time_ms": 500
110+
}))?;
111+
let tool_call = json!({
112+
"choices": [
113+
{
114+
"delta": {
115+
"tool_calls": [
116+
{
117+
"id": call_id,
118+
"function": {
119+
"name": "exec_command",
120+
"arguments": tool_call_arguments
121+
}
122+
}
123+
]
124+
},
125+
"finish_reason": "tool_calls"
126+
}
127+
]
128+
});
129+
130+
let sse = format!(
131+
"data: {}\n\ndata: DONE\n\n",
132+
serde_json::to_string(&tool_call)?
133+
);
134+
Ok(sse)
135+
}

codex-rs/app-server/tests/suite/v2/turn_start.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::Result;
22
use app_test_support::McpProcess;
33
use app_test_support::create_apply_patch_sse_response;
4+
use app_test_support::create_exec_command_sse_response;
45
use app_test_support::create_final_assistant_message_sse_response;
56
use app_test_support::create_mock_chat_completions_server;
67
use app_test_support::create_mock_chat_completions_server_unchecked;
@@ -907,6 +908,134 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
907908
Ok(())
908909
}
909910

911+
#[tokio::test]
912+
#[cfg_attr(windows, ignore = "process id reporting differs on Windows")]
913+
async fn command_execution_notifications_include_process_id() -> Result<()> {
914+
skip_if_no_network!(Ok(()));
915+
916+
let responses = vec![
917+
create_exec_command_sse_response("uexec-1")?,
918+
create_final_assistant_message_sse_response("done")?,
919+
];
920+
let server = create_mock_chat_completions_server(responses).await;
921+
let codex_home = TempDir::new()?;
922+
create_config_toml(codex_home.path(), &server.uri(), "never")?;
923+
let config_toml = codex_home.path().join("config.toml");
924+
let mut config_contents = std::fs::read_to_string(&config_toml)?;
925+
config_contents.push_str(
926+
r#"
927+
[features]
928+
unified_exec = true
929+
"#,
930+
);
931+
std::fs::write(&config_toml, config_contents)?;
932+
933+
let mut mcp = McpProcess::new(codex_home.path()).await?;
934+
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
935+
936+
let start_id = mcp
937+
.send_thread_start_request(ThreadStartParams {
938+
model: Some("mock-model".to_string()),
939+
..Default::default()
940+
})
941+
.await?;
942+
let start_resp: JSONRPCResponse = timeout(
943+
DEFAULT_READ_TIMEOUT,
944+
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
945+
)
946+
.await??;
947+
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
948+
949+
let turn_id = mcp
950+
.send_turn_start_request(TurnStartParams {
951+
thread_id: thread.id.clone(),
952+
input: vec![V2UserInput::Text {
953+
text: "run a command".to_string(),
954+
}],
955+
..Default::default()
956+
})
957+
.await?;
958+
let turn_resp: JSONRPCResponse = timeout(
959+
DEFAULT_READ_TIMEOUT,
960+
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
961+
)
962+
.await??;
963+
let TurnStartResponse { turn: _turn } = to_response::<TurnStartResponse>(turn_resp)?;
964+
965+
let started_command = timeout(DEFAULT_READ_TIMEOUT, async {
966+
loop {
967+
let notif = mcp
968+
.read_stream_until_notification_message("item/started")
969+
.await?;
970+
let started: ItemStartedNotification = serde_json::from_value(
971+
notif
972+
.params
973+
.clone()
974+
.expect("item/started should include params"),
975+
)?;
976+
if let ThreadItem::CommandExecution { .. } = started.item {
977+
return Ok::<ThreadItem, anyhow::Error>(started.item);
978+
}
979+
}
980+
})
981+
.await??;
982+
let ThreadItem::CommandExecution {
983+
id,
984+
process_id: started_process_id,
985+
status,
986+
..
987+
} = started_command
988+
else {
989+
unreachable!("loop ensures we break on command execution items");
990+
};
991+
assert_eq!(id, "uexec-1");
992+
assert_eq!(status, CommandExecutionStatus::InProgress);
993+
let started_process_id = started_process_id.expect("process id should be present");
994+
995+
let completed_command = timeout(DEFAULT_READ_TIMEOUT, async {
996+
loop {
997+
let notif = mcp
998+
.read_stream_until_notification_message("item/completed")
999+
.await?;
1000+
let completed: ItemCompletedNotification = serde_json::from_value(
1001+
notif
1002+
.params
1003+
.clone()
1004+
.expect("item/completed should include params"),
1005+
)?;
1006+
if let ThreadItem::CommandExecution { .. } = completed.item {
1007+
return Ok::<ThreadItem, anyhow::Error>(completed.item);
1008+
}
1009+
}
1010+
})
1011+
.await??;
1012+
let ThreadItem::CommandExecution {
1013+
id: completed_id,
1014+
process_id: completed_process_id,
1015+
status: completed_status,
1016+
exit_code,
1017+
..
1018+
} = completed_command
1019+
else {
1020+
unreachable!("loop ensures we break on command execution items");
1021+
};
1022+
assert_eq!(completed_id, "uexec-1");
1023+
assert_eq!(completed_status, CommandExecutionStatus::Completed);
1024+
assert_eq!(exit_code, Some(0));
1025+
assert_eq!(
1026+
completed_process_id.as_deref(),
1027+
Some(started_process_id.as_str())
1028+
);
1029+
1030+
timeout(
1031+
DEFAULT_READ_TIMEOUT,
1032+
mcp.read_stream_until_notification_message("turn/completed"),
1033+
)
1034+
.await??;
1035+
1036+
Ok(())
1037+
}
1038+
9101039
// Helper to create a config.toml pointing at the mock model server.
9111040
fn create_config_toml(
9121041
codex_home: &Path,

codex-rs/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ uuid = { workspace = true, features = ["serde", "v4", "v5"] }
8787
which = { workspace = true }
8888
wildmatch = { workspace = true }
8989

90+
[features]
91+
deterministic_process_ids = []
92+
9093

9194
[target.'cfg(target_os = "linux")'.dependencies]
9295
landlock = { workspace = true }
@@ -115,6 +118,7 @@ keyring = { workspace = true, features = ["sync-secret-service"] }
115118
assert_cmd = { workspace = true }
116119
assert_matches = { workspace = true }
117120
codex-arg0 = { workspace = true }
121+
codex-core = { path = ".", features = ["deterministic_process_ids"] }
118122
core_test_support = { workspace = true }
119123
ctor = { workspace = true }
120124
escargot = { workspace = true }

codex-rs/core/src/tasks/user_shell.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl SessionTask for UserShellCommandTask {
8181
turn_context.as_ref(),
8282
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
8383
call_id: call_id.clone(),
84+
process_id: None,
8485
turn_id: turn_context.sub_id.clone(),
8586
command: command.clone(),
8687
cwd: cwd.clone(),
@@ -139,6 +140,7 @@ impl SessionTask for UserShellCommandTask {
139140
turn_context.as_ref(),
140141
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
141142
call_id,
143+
process_id: None,
142144
turn_id: turn_context.sub_id.clone(),
143145
command: command.clone(),
144146
cwd: cwd.clone(),
@@ -161,6 +163,7 @@ impl SessionTask for UserShellCommandTask {
161163
turn_context.as_ref(),
162164
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
163165
call_id: call_id.clone(),
166+
process_id: None,
164167
turn_id: turn_context.sub_id.clone(),
165168
command: command.clone(),
166169
cwd: cwd.clone(),
@@ -205,6 +208,7 @@ impl SessionTask for UserShellCommandTask {
205208
turn_context.as_ref(),
206209
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
207210
call_id,
211+
process_id: None,
208212
turn_id: turn_context.sub_id.clone(),
209213
command,
210214
cwd,

0 commit comments

Comments
 (0)