Skip to content

Commit 1dd1355

Browse files
authored
feat: agent controller (#8783)
Added an agent control plane that lets sessions spawn or message other conversations via `AgentControl`. `AgentBus` (core/src/agent/bus.rs) keeps track of the last known status of a conversation. ConversationManager now holds shared state behind an Arc so AgentControl keeps only a weak back-reference, the goal is just to avoid explicit cycle reference. Follow-ups: * Build a small tool in the TUI to be able to see every agent and send manual message to each of them * Handle approval requests in this TUI * Add tools to spawn/communicate between agents (see related design) * Define agent types
1 parent 915352b commit 1dd1355

File tree

10 files changed

+507
-122
lines changed

10 files changed

+507
-122
lines changed

codex-rs/core/src/agent/bus.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use codex_protocol::ConversationId;
2+
use codex_protocol::protocol::EventMsg;
3+
use std::collections::HashMap;
4+
use std::sync::Arc;
5+
use tokio::sync::RwLock;
6+
7+
/// Status store for globally-tracked agents.
8+
#[derive(Clone, Default)]
9+
pub(crate) struct AgentBus {
10+
/// In-memory map of conversation id to the latest derived status.
11+
statuses: Arc<RwLock<HashMap<ConversationId, AgentStatus>>>,
12+
}
13+
14+
#[derive(Clone, Debug, PartialEq, Eq)]
15+
pub(crate) enum AgentStatus {
16+
PendingInit,
17+
Running,
18+
Completed(Option<String>),
19+
Errored(String),
20+
Shutdown,
21+
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
22+
NotFound,
23+
}
24+
25+
impl AgentBus {
26+
/// Fetch the last known status for `agent_id`, returning `NotFound` if unseen.
27+
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
28+
pub(crate) async fn status(&self, agent_id: ConversationId) -> AgentStatus {
29+
let statuses = self.statuses.read().await;
30+
statuses
31+
.get(&agent_id)
32+
.cloned()
33+
.unwrap_or(AgentStatus::NotFound)
34+
}
35+
36+
/// Derive and record agent status from a single emitted event.
37+
pub(crate) async fn on_event(&self, conversation_id: ConversationId, msg: &EventMsg) {
38+
let next_status = match msg {
39+
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
40+
EventMsg::TaskComplete(ev) => {
41+
Some(AgentStatus::Completed(ev.last_agent_message.clone()))
42+
}
43+
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
44+
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
45+
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
46+
_ => None,
47+
};
48+
if let Some(status) = next_status {
49+
self.record_status(&conversation_id, status).await;
50+
}
51+
}
52+
53+
/// Force-set the tracked status for an agent conversation.
54+
pub(crate) async fn record_status(
55+
&self,
56+
conversation_id: &ConversationId,
57+
status: AgentStatus,
58+
) {
59+
self.statuses.write().await.insert(*conversation_id, status);
60+
}
61+
}

codex-rs/core/src/agent/control.rs

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
use crate::CodexConversation;
2+
use crate::agent::AgentBus;
3+
use crate::agent::AgentStatus;
4+
use crate::conversation_manager::ConversationManagerState;
5+
use crate::error::CodexErr;
6+
use crate::error::Result as CodexResult;
7+
use codex_protocol::ConversationId;
8+
use codex_protocol::protocol::EventMsg;
9+
use codex_protocol::protocol::Op;
10+
use codex_protocol::user_input::UserInput;
11+
use std::sync::Arc;
12+
use std::sync::Weak;
13+
14+
/// Control-plane handle for multi-agent operations.
15+
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
16+
/// spawn new agents and the inter-agent communication layer.
17+
#[derive(Clone, Default)]
18+
pub(crate) struct AgentControl {
19+
/// Weak handle back to the global conversation registry/state.
20+
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
21+
/// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`.
22+
manager: Weak<ConversationManagerState>,
23+
/// Shared agent status store updated from emitted events.
24+
pub(crate) bus: AgentBus,
25+
}
26+
27+
impl AgentControl {
28+
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
29+
pub(crate) fn new(manager: Weak<ConversationManagerState>, bus: AgentBus) -> Self {
30+
Self { manager, bus }
31+
}
32+
33+
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
34+
/// Spawn a new agent conversation and submit the initial prompt.
35+
///
36+
/// If `headless` is true, a background drain task is spawned to prevent unbounded event growth
37+
/// of the channel queue when there is no client actively reading the conversation events.
38+
pub(crate) async fn spawn_agent(
39+
&self,
40+
config: crate::config::Config,
41+
prompt: String,
42+
headless: bool,
43+
) -> CodexResult<ConversationId> {
44+
let state = self.upgrade()?;
45+
let new_conversation = state.spawn_new_conversation(config, self.clone()).await?;
46+
47+
self.bus
48+
.record_status(&new_conversation.conversation_id, AgentStatus::PendingInit)
49+
.await;
50+
51+
if headless {
52+
spawn_headless_drain(
53+
Arc::clone(&new_conversation.conversation),
54+
new_conversation.conversation_id,
55+
self.clone(),
56+
);
57+
}
58+
59+
self.send_prompt(new_conversation.conversation_id, prompt)
60+
.await?;
61+
62+
self.bus
63+
.record_status(&new_conversation.conversation_id, AgentStatus::Running)
64+
.await;
65+
66+
Ok(new_conversation.conversation_id)
67+
}
68+
69+
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
70+
/// Send a `user` prompt to an existing agent conversation.
71+
pub(crate) async fn send_prompt(
72+
&self,
73+
agent_id: ConversationId,
74+
prompt: String,
75+
) -> CodexResult<String> {
76+
let state = self.upgrade()?;
77+
state
78+
.send_op(
79+
agent_id,
80+
Op::UserInput {
81+
items: vec![UserInput::Text { text: prompt }],
82+
final_output_json_schema: None,
83+
},
84+
)
85+
.await
86+
}
87+
88+
fn upgrade(&self) -> CodexResult<Arc<ConversationManagerState>> {
89+
self.manager.upgrade().ok_or_else(|| {
90+
CodexErr::UnsupportedOperation("conversation manager dropped".to_string())
91+
})
92+
}
93+
}
94+
95+
/// When an agent is spawned "headless" (no UI/view attached), there may be no consumer polling
96+
/// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can
97+
/// accumulate events indefinitely. This drain task prevents that memory growth by polling and
98+
/// discarding events until shutdown.
99+
fn spawn_headless_drain(
100+
conversation: Arc<CodexConversation>,
101+
conversation_id: ConversationId,
102+
agent_control: AgentControl,
103+
) {
104+
tokio::spawn(async move {
105+
loop {
106+
match conversation.next_event().await {
107+
Ok(event) => {
108+
if matches!(event.msg, EventMsg::ShutdownComplete) {
109+
break;
110+
}
111+
}
112+
Err(err) => {
113+
agent_control
114+
.bus
115+
.record_status(&conversation_id, AgentStatus::Errored(err.to_string()))
116+
.await;
117+
break;
118+
}
119+
}
120+
}
121+
});
122+
}
123+
124+
#[cfg(test)]
125+
mod tests {
126+
use super::*;
127+
use codex_protocol::protocol::ErrorEvent;
128+
use codex_protocol::protocol::TaskCompleteEvent;
129+
use codex_protocol::protocol::TaskStartedEvent;
130+
use codex_protocol::protocol::TurnAbortReason;
131+
use codex_protocol::protocol::TurnAbortedEvent;
132+
use pretty_assertions::assert_eq;
133+
134+
#[tokio::test]
135+
async fn send_prompt_errors_when_manager_dropped() {
136+
let control = AgentControl::default();
137+
let err = control
138+
.send_prompt(ConversationId::new(), "hello".to_string())
139+
.await
140+
.expect_err("send_prompt should fail without a manager");
141+
assert_eq!(
142+
err.to_string(),
143+
"unsupported operation: conversation manager dropped"
144+
);
145+
}
146+
147+
#[tokio::test]
148+
async fn record_status_persists_to_bus() {
149+
let control = AgentControl::default();
150+
let conversation_id = ConversationId::new();
151+
152+
control
153+
.bus
154+
.record_status(&conversation_id, AgentStatus::PendingInit)
155+
.await;
156+
157+
let got = control.bus.status(conversation_id).await;
158+
assert_eq!(got, AgentStatus::PendingInit);
159+
}
160+
161+
#[tokio::test]
162+
async fn on_event_updates_status_from_task_started() {
163+
let control = AgentControl::default();
164+
let conversation_id = ConversationId::new();
165+
166+
control
167+
.bus
168+
.on_event(
169+
conversation_id,
170+
&EventMsg::TaskStarted(TaskStartedEvent {
171+
model_context_window: None,
172+
}),
173+
)
174+
.await;
175+
176+
let got = control.bus.status(conversation_id).await;
177+
assert_eq!(got, AgentStatus::Running);
178+
}
179+
180+
#[tokio::test]
181+
async fn on_event_updates_status_from_task_complete() {
182+
let control = AgentControl::default();
183+
let conversation_id = ConversationId::new();
184+
185+
control
186+
.bus
187+
.on_event(
188+
conversation_id,
189+
&EventMsg::TaskComplete(TaskCompleteEvent {
190+
last_agent_message: Some("done".to_string()),
191+
}),
192+
)
193+
.await;
194+
195+
let expected = AgentStatus::Completed(Some("done".to_string()));
196+
let got = control.bus.status(conversation_id).await;
197+
assert_eq!(got, expected);
198+
}
199+
200+
#[tokio::test]
201+
async fn on_event_updates_status_from_error() {
202+
let control = AgentControl::default();
203+
let conversation_id = ConversationId::new();
204+
205+
control
206+
.bus
207+
.on_event(
208+
conversation_id,
209+
&EventMsg::Error(ErrorEvent {
210+
message: "boom".to_string(),
211+
codex_error_info: None,
212+
}),
213+
)
214+
.await;
215+
216+
let expected = AgentStatus::Errored("boom".to_string());
217+
let got = control.bus.status(conversation_id).await;
218+
assert_eq!(got, expected);
219+
}
220+
221+
#[tokio::test]
222+
async fn on_event_updates_status_from_turn_aborted() {
223+
let control = AgentControl::default();
224+
let conversation_id = ConversationId::new();
225+
226+
control
227+
.bus
228+
.on_event(
229+
conversation_id,
230+
&EventMsg::TurnAborted(TurnAbortedEvent {
231+
reason: TurnAbortReason::Interrupted,
232+
}),
233+
)
234+
.await;
235+
236+
let expected = AgentStatus::Errored("Interrupted".to_string());
237+
let got = control.bus.status(conversation_id).await;
238+
assert_eq!(got, expected);
239+
}
240+
241+
#[tokio::test]
242+
async fn on_event_updates_status_from_shutdown_complete() {
243+
let control = AgentControl::default();
244+
let conversation_id = ConversationId::new();
245+
246+
control
247+
.bus
248+
.on_event(conversation_id, &EventMsg::ShutdownComplete)
249+
.await;
250+
251+
let got = control.bus.status(conversation_id).await;
252+
assert_eq!(got, AgentStatus::Shutdown);
253+
}
254+
}

codex-rs/core/src/agent/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pub(crate) mod bus;
2+
pub(crate) mod control;
3+
4+
pub(crate) use bus::AgentBus;
5+
pub(crate) use bus::AgentStatus;
6+
pub(crate) use control::AgentControl;

0 commit comments

Comments
 (0)