Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 85 additions & 44 deletions codex-rs/core/src/codex_delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::Submission;
use codex_protocol::user_input::UserInput;
use std::time::Duration;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

use crate::AuthManager;
Expand Down Expand Up @@ -60,14 +62,13 @@ pub(crate) async fn run_codex_conversation_interactive(
let parent_ctx_clone = Arc::clone(&parent_ctx);
let codex_for_events = Arc::clone(&codex);
tokio::spawn(async move {
let _ = forward_events(
forward_events(
codex_for_events,
tx_sub,
parent_session_clone,
parent_ctx_clone,
cancel_token_events.clone(),
cancel_token_events,
)
.or_cancel(&cancel_token_events)
.await;
});

Expand Down Expand Up @@ -156,53 +157,93 @@ async fn forward_events(
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) {
while let Ok(event) = codex.next_event().await {
match event {
// ignore all legacy delta events
Event {
id: _,
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
} => continue,
Event {
id: _,
msg: EventMsg::SessionConfigured(_),
} => continue,
Event {
id,
msg: EventMsg::ExecApprovalRequest(event),
} => {
// Initiate approval via parent session; do not surface to consumer.
handle_exec_approval(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
Event {
id,
msg: EventMsg::ApplyPatchApprovalRequest(event),
} => {
handle_patch_approval(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
let cancelled = cancel_token.cancelled();
tokio::pin!(cancelled);

loop {
tokio::select! {
_ = &mut cancelled => {
shutdown_delegate(&codex).await;
break;
}
other => {
let _ = tx_sub.send(other).await;
event = codex.next_event() => {
let event = match event {
Ok(event) => event,
Err(_) => break,
};
match event {
// ignore all legacy delta events
Event {
id: _,
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
} => {}
Event {
id: _,
msg: EventMsg::SessionConfigured(_),
} => {}
Event {
id,
msg: EventMsg::ExecApprovalRequest(event),
} => {
// Initiate approval via parent session; do not surface to consumer.
handle_exec_approval(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
Event {
id,
msg: EventMsg::ApplyPatchApprovalRequest(event),
} => {
handle_patch_approval(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
other => {
match tx_sub.send(other).or_cancel(&cancel_token).await {
Ok(Ok(())) => {}
Ok(Err(_)) => break,
Err(_) => {
shutdown_delegate(&codex).await;
break;
}
}
}
}
}
}
}
}

/// Ask the delegate to stop and drain its events so background sends do not hit a closed channel.
async fn shutdown_delegate(codex: &Codex) {
let _ = codex.submit(Op::Interrupt).await;
let _ = codex.submit(Op::Shutdown {}).await;

let _ = timeout(Duration::from_millis(500), async {
while let Ok(event) = codex.next_event().await {
if matches!(
event.msg,
EventMsg::TurnAborted(_) | EventMsg::TaskComplete(_)
) {
break;
}
}
})
.await;
}

/// Forward ops from a caller to a sub-agent, respecting cancellation.
async fn forward_ops(
codex: Arc<Codex>,
Expand Down
Loading