Skip to content

Commit 188f79a

Browse files
authored
feat: drop agent bus and store the agent status in codex directly (openai#8788)
1 parent a0b2d03 commit 188f79a

File tree

9 files changed

+113
-180
lines changed

9 files changed

+113
-180
lines changed

codex-rs/core/src/agent/bus.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

codex-rs/core/src/agent/control.rs

Lines changed: 41 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::CodexConversation;
2-
use crate::agent::AgentBus;
32
use crate::agent::AgentStatus;
43
use crate::conversation_manager::ConversationManagerState;
54
use crate::error::CodexErr;
@@ -20,14 +19,12 @@ pub(crate) struct AgentControl {
2019
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
2120
/// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`.
2221
manager: Weak<ConversationManagerState>,
23-
/// Shared agent status store updated from emitted events.
24-
pub(crate) bus: AgentBus,
2522
}
2623

2724
impl AgentControl {
2825
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
29-
pub(crate) fn new(manager: Weak<ConversationManagerState>, bus: AgentBus) -> Self {
30-
Self { manager, bus }
26+
pub(crate) fn new(manager: Weak<ConversationManagerState>) -> Self {
27+
Self { manager }
3128
}
3229

3330
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
@@ -44,25 +41,13 @@ impl AgentControl {
4441
let state = self.upgrade()?;
4542
let new_conversation = state.spawn_new_conversation(config, self.clone()).await?;
4643

47-
self.bus
48-
.record_status(&new_conversation.conversation_id, AgentStatus::PendingInit)
49-
.await;
50-
5144
if headless {
52-
spawn_headless_drain(
53-
Arc::clone(&new_conversation.conversation),
54-
new_conversation.conversation_id,
55-
self.clone(),
56-
);
45+
spawn_headless_drain(Arc::clone(&new_conversation.conversation));
5746
}
5847

5948
self.send_prompt(new_conversation.conversation_id, prompt)
6049
.await?;
6150

62-
self.bus
63-
.record_status(&new_conversation.conversation_id, AgentStatus::Running)
64-
.await;
65-
6651
Ok(new_conversation.conversation_id)
6752
}
6853

@@ -85,6 +70,19 @@ impl AgentControl {
8570
.await
8671
}
8772

73+
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
74+
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
75+
pub(crate) async fn get_status(&self, agent_id: ConversationId) -> AgentStatus {
76+
let Ok(state) = self.upgrade() else {
77+
// No agent available if upgrade fails.
78+
return AgentStatus::NotFound;
79+
};
80+
let Ok(conversation) = state.get_conversation(agent_id).await else {
81+
return AgentStatus::NotFound;
82+
};
83+
conversation.agent_status().await
84+
}
85+
8886
fn upgrade(&self) -> CodexResult<Arc<ConversationManagerState>> {
8987
self.manager.upgrade().ok_or_else(|| {
9088
CodexErr::UnsupportedOperation("conversation manager dropped".to_string())
@@ -96,11 +94,7 @@ impl AgentControl {
9694
/// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can
9795
/// accumulate events indefinitely. This drain task prevents that memory growth by polling and
9896
/// discarding events until shutdown.
99-
fn spawn_headless_drain(
100-
conversation: Arc<CodexConversation>,
101-
conversation_id: ConversationId,
102-
agent_control: AgentControl,
103-
) {
97+
fn spawn_headless_drain(conversation: Arc<CodexConversation>) {
10498
tokio::spawn(async move {
10599
loop {
106100
match conversation.next_event().await {
@@ -110,10 +104,7 @@ fn spawn_headless_drain(
110104
}
111105
}
112106
Err(err) => {
113-
agent_control
114-
.bus
115-
.record_status(&conversation_id, AgentStatus::Errored(err.to_string()))
116-
.await;
107+
tracing::warn!("failed to receive event from agent: {err:?}");
117108
break;
118109
}
119110
}
@@ -124,6 +115,7 @@ fn spawn_headless_drain(
124115
#[cfg(test)]
125116
mod tests {
126117
use super::*;
118+
use crate::agent::agent_status_from_event;
127119
use codex_protocol::protocol::ErrorEvent;
128120
use codex_protocol::protocol::TaskCompleteEvent;
129121
use codex_protocol::protocol::TaskStartedEvent;
@@ -145,110 +137,53 @@ mod tests {
145137
}
146138

147139
#[tokio::test]
148-
async fn record_status_persists_to_bus() {
140+
async fn get_status_returns_not_found_without_manager() {
149141
let control = AgentControl::default();
150-
let conversation_id = ConversationId::new();
151-
152-
control
153-
.bus
154-
.record_status(&conversation_id, AgentStatus::PendingInit)
155-
.await;
156-
157-
let got = control.bus.status(conversation_id).await;
158-
assert_eq!(got, AgentStatus::PendingInit);
142+
let got = control.get_status(ConversationId::new()).await;
143+
assert_eq!(got, AgentStatus::NotFound);
159144
}
160145

161146
#[tokio::test]
162147
async fn on_event_updates_status_from_task_started() {
163-
let control = AgentControl::default();
164-
let conversation_id = ConversationId::new();
165-
166-
control
167-
.bus
168-
.on_event(
169-
conversation_id,
170-
&EventMsg::TaskStarted(TaskStartedEvent {
171-
model_context_window: None,
172-
}),
173-
)
174-
.await;
175-
176-
let got = control.bus.status(conversation_id).await;
177-
assert_eq!(got, AgentStatus::Running);
148+
let status = agent_status_from_event(&EventMsg::TaskStarted(TaskStartedEvent {
149+
model_context_window: None,
150+
}));
151+
assert_eq!(status, Some(AgentStatus::Running));
178152
}
179153

180154
#[tokio::test]
181155
async fn on_event_updates_status_from_task_complete() {
182-
let control = AgentControl::default();
183-
let conversation_id = ConversationId::new();
184-
185-
control
186-
.bus
187-
.on_event(
188-
conversation_id,
189-
&EventMsg::TaskComplete(TaskCompleteEvent {
190-
last_agent_message: Some("done".to_string()),
191-
}),
192-
)
193-
.await;
194-
156+
let status = agent_status_from_event(&EventMsg::TaskComplete(TaskCompleteEvent {
157+
last_agent_message: Some("done".to_string()),
158+
}));
195159
let expected = AgentStatus::Completed(Some("done".to_string()));
196-
let got = control.bus.status(conversation_id).await;
197-
assert_eq!(got, expected);
160+
assert_eq!(status, Some(expected));
198161
}
199162

200163
#[tokio::test]
201164
async fn on_event_updates_status_from_error() {
202-
let control = AgentControl::default();
203-
let conversation_id = ConversationId::new();
204-
205-
control
206-
.bus
207-
.on_event(
208-
conversation_id,
209-
&EventMsg::Error(ErrorEvent {
210-
message: "boom".to_string(),
211-
codex_error_info: None,
212-
}),
213-
)
214-
.await;
165+
let status = agent_status_from_event(&EventMsg::Error(ErrorEvent {
166+
message: "boom".to_string(),
167+
codex_error_info: None,
168+
}));
215169

216170
let expected = AgentStatus::Errored("boom".to_string());
217-
let got = control.bus.status(conversation_id).await;
218-
assert_eq!(got, expected);
171+
assert_eq!(status, Some(expected));
219172
}
220173

221174
#[tokio::test]
222175
async fn on_event_updates_status_from_turn_aborted() {
223-
let control = AgentControl::default();
224-
let conversation_id = ConversationId::new();
225-
226-
control
227-
.bus
228-
.on_event(
229-
conversation_id,
230-
&EventMsg::TurnAborted(TurnAbortedEvent {
231-
reason: TurnAbortReason::Interrupted,
232-
}),
233-
)
234-
.await;
176+
let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent {
177+
reason: TurnAbortReason::Interrupted,
178+
}));
235179

236180
let expected = AgentStatus::Errored("Interrupted".to_string());
237-
let got = control.bus.status(conversation_id).await;
238-
assert_eq!(got, expected);
181+
assert_eq!(status, Some(expected));
239182
}
240183

241184
#[tokio::test]
242185
async fn on_event_updates_status_from_shutdown_complete() {
243-
let control = AgentControl::default();
244-
let conversation_id = ConversationId::new();
245-
246-
control
247-
.bus
248-
.on_event(conversation_id, &EventMsg::ShutdownComplete)
249-
.await;
250-
251-
let got = control.bus.status(conversation_id).await;
252-
assert_eq!(got, AgentStatus::Shutdown);
186+
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
187+
assert_eq!(status, Some(AgentStatus::Shutdown));
253188
}
254189
}

codex-rs/core/src/agent/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
pub(crate) mod bus;
21
pub(crate) mod control;
2+
pub(crate) mod status;
33

4-
pub(crate) use bus::AgentBus;
5-
pub(crate) use bus::AgentStatus;
4+
pub(crate) use codex_protocol::protocol::AgentStatus;
65
pub(crate) use control::AgentControl;
6+
pub(crate) use status::agent_status_from_event;

codex-rs/core/src/agent/status.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use codex_protocol::protocol::AgentStatus;
2+
use codex_protocol::protocol::EventMsg;
3+
4+
/// Derive the next agent status from a single emitted event.
5+
/// Returns `None` when the event does not affect status tracking.
6+
pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
7+
match msg {
8+
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
9+
EventMsg::TaskComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
10+
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
11+
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
12+
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
13+
_ => None,
14+
}
15+
}

0 commit comments

Comments
 (0)