Skip to content

Commit 3cff882

Browse files
jif-oaiHolovkat
authored andcommitted
feat: end events on unified exec (#5551)
1 parent 752ad14 commit 3cff882

File tree

5 files changed

+602
-53
lines changed

5 files changed

+602
-53
lines changed

codex-rs/core/src/tools/events.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,51 @@ impl ToolEmitter {
200200
) => {
201201
emit_patch_end(ctx, String::new(), (*message).to_string(), false).await;
202202
}
203-
(Self::UnifiedExec { command, cwd, .. }, _) => {
204-
// TODO(jif) add end and failures.
203+
(Self::UnifiedExec { command, cwd, .. }, ToolEventStage::Begin) => {
205204
emit_exec_command_begin(ctx, &[command.to_string()], cwd.as_path()).await;
206205
}
206+
(Self::UnifiedExec { .. }, ToolEventStage::Success(output)) => {
207+
emit_exec_end(
208+
ctx,
209+
output.stdout.text.clone(),
210+
output.stderr.text.clone(),
211+
output.aggregated_output.text.clone(),
212+
output.exit_code,
213+
output.duration,
214+
format_exec_output_str(&output),
215+
)
216+
.await;
217+
}
218+
(
219+
Self::UnifiedExec { .. },
220+
ToolEventStage::Failure(ToolEventFailure::Output(output)),
221+
) => {
222+
emit_exec_end(
223+
ctx,
224+
output.stdout.text.clone(),
225+
output.stderr.text.clone(),
226+
output.aggregated_output.text.clone(),
227+
output.exit_code,
228+
output.duration,
229+
format_exec_output_str(&output),
230+
)
231+
.await;
232+
}
233+
(
234+
Self::UnifiedExec { .. },
235+
ToolEventStage::Failure(ToolEventFailure::Message(message)),
236+
) => {
237+
emit_exec_end(
238+
ctx,
239+
String::new(),
240+
(*message).to_string(),
241+
(*message).to_string(),
242+
-1,
243+
Duration::ZERO,
244+
format_exec_output(&message),
245+
)
246+
.await;
247+
}
207248
}
208249
}
209250

codex-rs/core/src/tools/handlers/unified_exec.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ use serde::Deserialize;
55
use serde::Serialize;
66

77
use crate::function_tool::FunctionCallError;
8+
use crate::protocol::EventMsg;
9+
use crate::protocol::ExecCommandOutputDeltaEvent;
10+
use crate::protocol::ExecOutputStream;
811
use crate::tools::context::ToolInvocation;
912
use crate::tools::context::ToolOutput;
1013
use crate::tools::context::ToolPayload;
@@ -87,11 +90,7 @@ impl ToolHandler for UnifiedExecHandler {
8790
};
8891

8992
let manager: &UnifiedExecSessionManager = &session.services.unified_exec_manager;
90-
let context = UnifiedExecContext {
91-
session: &session,
92-
turn: turn.as_ref(),
93-
call_id: &call_id,
94-
};
93+
let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone());
9594

9695
let response = match tool_name.as_str() {
9796
"exec_command" => {
@@ -101,8 +100,12 @@ impl ToolHandler for UnifiedExecHandler {
101100
))
102101
})?;
103102

104-
let event_ctx =
105-
ToolEventCtx::new(context.session, context.turn, context.call_id, None);
103+
let event_ctx = ToolEventCtx::new(
104+
context.session.as_ref(),
105+
context.turn.as_ref(),
106+
&context.call_id,
107+
None,
108+
);
106109
let emitter =
107110
ToolEmitter::unified_exec(args.cmd.clone(), context.turn.cwd.clone(), true);
108111
emitter.emit(event_ctx, ToolEventStage::Begin).await;
@@ -148,6 +151,18 @@ impl ToolHandler for UnifiedExecHandler {
148151
}
149152
};
150153

154+
// Emit a delta event with the chunk of output we just produced, if any.
155+
if !response.output.is_empty() {
156+
let delta = ExecCommandOutputDeltaEvent {
157+
call_id: response.event_call_id.clone(),
158+
stream: ExecOutputStream::Stdout,
159+
chunk: response.output.as_bytes().to_vec(),
160+
};
161+
session
162+
.send_event(turn.as_ref(), EventMsg::ExecCommandOutputDelta(delta))
163+
.await;
164+
}
165+
151166
let content = serialize_response(&response).map_err(|err| {
152167
FunctionCallError::RespondToModel(format!(
153168
"failed to serialize unified exec output: {err:?}"

codex-rs/core/src/unified_exec/mod.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
//! - `session_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
2323
2424
use std::collections::HashMap;
25+
use std::path::PathBuf;
26+
use std::sync::Arc;
2527
use std::sync::atomic::AtomicI32;
2628
use std::time::Duration;
2729

@@ -45,10 +47,20 @@ pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000;
4547
pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
4648
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
4749

48-
pub(crate) struct UnifiedExecContext<'a> {
49-
pub session: &'a Session,
50-
pub turn: &'a TurnContext,
51-
pub call_id: &'a str,
50+
pub(crate) struct UnifiedExecContext {
51+
pub session: Arc<Session>,
52+
pub turn: Arc<TurnContext>,
53+
pub call_id: String,
54+
}
55+
56+
impl UnifiedExecContext {
57+
pub fn new(session: Arc<Session>, turn: Arc<TurnContext>, call_id: String) -> Self {
58+
Self {
59+
session,
60+
turn,
61+
call_id,
62+
}
63+
}
5264
}
5365

5466
#[derive(Debug)]
@@ -70,6 +82,7 @@ pub(crate) struct WriteStdinRequest<'a> {
7082

7183
#[derive(Debug, Clone, PartialEq)]
7284
pub(crate) struct UnifiedExecResponse {
85+
pub event_call_id: String,
7386
pub chunk_id: String,
7487
pub wall_time: Duration,
7588
pub output: String,
@@ -78,10 +91,20 @@ pub(crate) struct UnifiedExecResponse {
7891
pub original_token_count: Option<usize>,
7992
}
8093

81-
#[derive(Debug, Default)]
94+
#[derive(Default)]
8295
pub(crate) struct UnifiedExecSessionManager {
8396
next_session_id: AtomicI32,
84-
sessions: Mutex<HashMap<i32, session::UnifiedExecSession>>,
97+
sessions: Mutex<HashMap<i32, SessionEntry>>,
98+
}
99+
100+
struct SessionEntry {
101+
session: session::UnifiedExecSession,
102+
session_ref: Arc<Session>,
103+
turn_ref: Arc<TurnContext>,
104+
call_id: String,
105+
command: String,
106+
cwd: PathBuf,
107+
started_at: tokio::time::Instant,
85108
}
86109

87110
pub(crate) fn clamp_yield_time(yield_time_ms: Option<u64>) -> u64 {
@@ -163,11 +186,8 @@ mod tests {
163186
cmd: &str,
164187
yield_time_ms: Option<u64>,
165188
) -> Result<UnifiedExecResponse, UnifiedExecError> {
166-
let context = UnifiedExecContext {
167-
session,
168-
turn: turn.as_ref(),
169-
call_id: "call",
170-
};
189+
let context =
190+
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
171191

172192
session
173193
.services

0 commit comments

Comments
 (0)