From afa4f6ff732e24e47209004a53897a94819fc45b Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 6 Jan 2026 13:30:29 +0000 Subject: [PATCH 1/2] feat: agent controller --- codex-rs/core/src/agent/bus.rs | 61 ++++ codex-rs/core/src/agent/control.rs | 254 +++++++++++++++ codex-rs/core/src/agent/mod.rs | 6 + codex-rs/core/src/codex.rs | 16 +- codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/conversation_manager.rs | 346 ++++++++++----------- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/state/service.rs | 2 + codex-rs/core/tests/common/test_codex.rs | 3 +- codex-rs/core/tests/suite/remote_models.rs | 3 +- 10 files changed, 515 insertions(+), 178 deletions(-) create mode 100644 codex-rs/core/src/agent/bus.rs create mode 100644 codex-rs/core/src/agent/control.rs create mode 100644 codex-rs/core/src/agent/mod.rs diff --git a/codex-rs/core/src/agent/bus.rs b/codex-rs/core/src/agent/bus.rs new file mode 100644 index 00000000000..0b15f415f1a --- /dev/null +++ b/codex-rs/core/src/agent/bus.rs @@ -0,0 +1,61 @@ +use codex_protocol::ConversationId; +use codex_protocol::protocol::EventMsg; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Status store for globally-tracked agents. +#[derive(Clone, Default)] +pub(crate) struct AgentBus { + /// In-memory map of conversation id to the latest derived status. + statuses: Arc>>, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum AgentStatus { + PendingInit, + Running, + Completed(Option), + Errored(String), + Shutdown, + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + NotFound, +} + +impl AgentBus { + /// Fetch the last known status for `agent_id`, returning `NotFound` if unseen. + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + pub(crate) async fn status(&self, agent_id: ConversationId) -> AgentStatus { + let statuses = self.statuses.read().await; + statuses + .get(&agent_id) + .cloned() + .unwrap_or(AgentStatus::NotFound) + } + + /// Derive and record agent status from a single emitted event. + pub(crate) async fn on_event(&self, conversation_id: ConversationId, msg: &EventMsg) { + let next_status = match msg { + EventMsg::TaskStarted(_) => Some(AgentStatus::Running), + EventMsg::TaskComplete(ev) => { + Some(AgentStatus::Completed(ev.last_agent_message.clone())) + } + EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))), + EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())), + EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown), + _ => None, + }; + if let Some(status) = next_status { + self.record_status(&conversation_id, status).await; + } + } + + /// Force-set the tracked status for an agent conversation. + pub(crate) async fn record_status( + &self, + conversation_id: &ConversationId, + status: AgentStatus, + ) { + self.statuses.write().await.insert(*conversation_id, status); + } +} diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs new file mode 100644 index 00000000000..4a2619c9bf4 --- /dev/null +++ b/codex-rs/core/src/agent/control.rs @@ -0,0 +1,254 @@ +use crate::CodexConversation; +use crate::agent::AgentBus; +use crate::agent::AgentStatus; +use crate::conversation_manager::ConversationManagerState; +use crate::error::CodexErr; +use crate::error::Result as CodexResult; +use codex_protocol::ConversationId; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use codex_protocol::user_input::UserInput; +use std::sync::Arc; +use std::sync::Weak; + +/// Control-plane handle for multi-agent operations. +/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to +/// spawn new agents and the inter-agent communication layer. +#[derive(Clone, Default)] +pub(crate) struct AgentControl { + /// Weak handle back to the global conversation registry/state. + /// This is `Weak` to avoid reference cycles and shadow persistence of the form + /// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`. + manager: Weak, + /// Shared agent status store updated from emitted events. + pub(crate) bus: AgentBus, +} + +impl AgentControl { + /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. + pub(crate) fn new(manager: Weak, bus: AgentBus) -> Self { + Self { manager, bus } + } + + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + /// Spawn a new agent conversation and submit the initial prompt. + /// + /// If `headless` is true, a background drain task is spawned to prevent unbounded event growth + /// of the channel queue when there is no client actively reading the conversation events. + pub(crate) async fn spawn_agent( + &self, + config: crate::config::Config, + prompt: String, + headless: bool, + ) -> CodexResult { + let state = self.upgrade()?; + let new_conversation = state.spawn_new_conversation(config, self.clone()).await?; + + self.bus + .record_status(&new_conversation.conversation_id, AgentStatus::PendingInit) + .await; + + if headless { + spawn_headless_drain( + Arc::clone(&new_conversation.conversation), + new_conversation.conversation_id, + self.clone(), + ); + } + + self.send_prompt(new_conversation.conversation_id, prompt) + .await?; + + self.bus + .record_status(&new_conversation.conversation_id, AgentStatus::Running) + .await; + + Ok(new_conversation.conversation_id) + } + + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + /// Send a `user` prompt to an existing agent conversation. + pub(crate) async fn send_prompt( + &self, + agent_id: ConversationId, + prompt: String, + ) -> CodexResult { + let state = self.upgrade()?; + state + .send_op( + agent_id, + Op::UserInput { + items: vec![UserInput::Text { text: prompt }], + final_output_json_schema: None, + }, + ) + .await + } + + fn upgrade(&self) -> CodexResult> { + self.manager.upgrade().ok_or_else(|| { + CodexErr::UnsupportedOperation("conversation manager dropped".to_string()) + }) + } +} + +/// When an agent is spawned "headless" (no UI/view attached), there may be no consumer polling +/// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can +/// accumulate events indefinitely. This drain task prevents that memory growth while still allowing +/// `AgentBus` to be updated from the received events. +fn spawn_headless_drain( + conversation: Arc, + conversation_id: ConversationId, + agent_control: AgentControl, +) { + tokio::spawn(async move { + loop { + match conversation.next_event().await { + Ok(event) => { + if matches!(event.msg, EventMsg::ShutdownComplete) { + break; + } + } + Err(err) => { + agent_control + .bus + .record_status(&conversation_id, AgentStatus::Errored(err.to_string())) + .await; + break; + } + } + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::protocol::ErrorEvent; + use codex_protocol::protocol::TaskCompleteEvent; + use codex_protocol::protocol::TaskStartedEvent; + use codex_protocol::protocol::TurnAbortReason; + use codex_protocol::protocol::TurnAbortedEvent; + use pretty_assertions::assert_eq; + + #[tokio::test] + async fn send_prompt_errors_when_manager_dropped() { + let control = AgentControl::default(); + let err = control + .send_prompt(ConversationId::new(), "hello".to_string()) + .await + .expect_err("send_prompt should fail without a manager"); + assert_eq!( + err.to_string(), + "unsupported operation: conversation manager dropped" + ); + } + + #[tokio::test] + async fn record_status_persists_to_bus() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .record_status(&conversation_id, AgentStatus::PendingInit) + .await; + + let got = control.bus.status(conversation_id).await; + assert_eq!(got, AgentStatus::PendingInit); + } + + #[tokio::test] + async fn on_event_updates_status_from_task_started() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::TaskStarted(TaskStartedEvent { + model_context_window: None, + }), + ) + .await; + + let got = control.bus.status(conversation_id).await; + assert_eq!(got, AgentStatus::Running); + } + + #[tokio::test] + async fn on_event_updates_status_from_task_complete() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: Some("done".to_string()), + }), + ) + .await; + + let expected = AgentStatus::Completed(Some("done".to_string())); + let got = control.bus.status(conversation_id).await; + assert_eq!(got, expected); + } + + #[tokio::test] + async fn on_event_updates_status_from_error() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::Error(ErrorEvent { + message: "boom".to_string(), + codex_error_info: None, + }), + ) + .await; + + let expected = AgentStatus::Errored("boom".to_string()); + let got = control.bus.status(conversation_id).await; + assert_eq!(got, expected); + } + + #[tokio::test] + async fn on_event_updates_status_from_turn_aborted() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::TurnAborted(TurnAbortedEvent { + reason: TurnAbortReason::Interrupted, + }), + ) + .await; + + let expected = AgentStatus::Errored("Interrupted".to_string()); + let got = control.bus.status(conversation_id).await; + assert_eq!(got, expected); + } + + #[tokio::test] + async fn on_event_updates_status_from_shutdown_complete() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event(conversation_id, &EventMsg::ShutdownComplete) + .await; + + let got = control.bus.status(conversation_id).await; + assert_eq!(got, AgentStatus::Shutdown); + } +} diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs new file mode 100644 index 00000000000..29eb9577a29 --- /dev/null +++ b/codex-rs/core/src/agent/mod.rs @@ -0,0 +1,6 @@ +pub(crate) mod bus; +pub(crate) mod control; + +pub(crate) use bus::AgentBus; +pub(crate) use bus::AgentStatus; +pub(crate) use control::AgentControl; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index bb4a45a75df..222f60d5f37 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -8,6 +8,7 @@ use std::sync::atomic::Ordering; use crate::AuthManager; use crate::SandboxState; +use crate::agent::AgentControl; use crate::client_common::REVIEW_PROMPT; use crate::compact; use crate::compact::run_inline_auto_compact_task; @@ -207,13 +208,14 @@ fn maybe_push_chat_wire_api_deprecation( impl Codex { /// Spawn a new [`Codex`] and initialize the session. - pub async fn spawn( + pub(crate) async fn spawn( config: Config, auth_manager: Arc, models_manager: Arc, skills_manager: Arc, conversation_history: InitialHistory, session_source: SessionSource, + agent_control: AgentControl, ) -> CodexResult { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); @@ -284,6 +286,7 @@ impl Codex { conversation_history, session_source_clone, skills_manager, + agent_control, ) .await .map_err(|e| { @@ -549,6 +552,7 @@ impl Session { initial_history: InitialHistory, session_source: SessionSource, skills_manager: Arc, + agent_control: AgentControl, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -670,6 +674,7 @@ impl Session { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + agent_control, }; let sess = Arc::new(Session { @@ -977,6 +982,11 @@ impl Session { /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) { + self.services + .agent_control + .bus + .on_event(self.conversation_id, &msg) + .await; let legacy_source = msg.clone(); let event = Event { id: turn_context.sub_id.clone(), @@ -3225,6 +3235,7 @@ mod tests { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); let models_manager = Arc::new(ModelsManager::new(auth_manager.clone())); + let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); let model = ModelsManager::get_model_offline(config.model.as_deref()); let session_configuration = SessionConfiguration { @@ -3271,6 +3282,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + agent_control, }; let turn_context = Session::make_turn_context( @@ -3312,6 +3324,7 @@ mod tests { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); let models_manager = Arc::new(ModelsManager::new(auth_manager.clone())); + let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); let model = ModelsManager::get_model_offline(config.model.as_deref()); let session_configuration = SessionConfiguration { @@ -3358,6 +3371,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + agent_control, }; let turn_context = Arc::new(Session::make_turn_context( diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index e767abf4ad2..10d45190d29 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -52,6 +52,7 @@ pub(crate) async fn run_codex_conversation_interactive( Arc::clone(&parent_session.services.skills_manager), initial_history.unwrap_or(InitialHistory::New), SessionSource::SubAgent(SubAgentSource::Review), + parent_session.services.agent_control.clone(), ) .await?; let codex = Arc::new(codex); diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 5093e03c60f..8c56a136d8a 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -3,6 +3,8 @@ use crate::AuthManager; use crate::CodexAuth; #[cfg(any(test, feature = "test-support"))] use crate::ModelProviderInfo; +use crate::agent::AgentBus; +use crate::agent::AgentControl; use crate::codex::Codex; use crate::codex::CodexSpawnOk; use crate::codex::INITIAL_SUBMIT_ID; @@ -21,6 +23,7 @@ use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelPreset; use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use std::collections::HashMap; @@ -41,24 +44,36 @@ pub struct NewConversation { /// [`ConversationManager`] is responsible for creating conversations and /// maintaining them in memory. pub struct ConversationManager { + state: Arc, + #[cfg(any(test, feature = "test-support"))] + _test_codex_home_guard: Option, +} + +/// Shared, `Arc`-owned state for [`ConversationManager`]. This `Arc` is required to have a single +/// `Arc` reference that can be downgraded to by `AgentControl` while preventing every single +/// function to require an `Arc<&Self>`. +pub(crate) struct ConversationManagerState { conversations: Arc>>>, auth_manager: Arc, models_manager: Arc, skills_manager: Arc, session_source: SessionSource, - #[cfg(any(test, feature = "test-support"))] - _test_codex_home_guard: Option, + agent_bus: AgentBus, } impl ConversationManager { pub fn new(auth_manager: Arc, session_source: SessionSource) -> Self { - let skills_manager = Arc::new(SkillsManager::new(auth_manager.codex_home().to_path_buf())); Self { - conversations: Arc::new(RwLock::new(HashMap::new())), - auth_manager: auth_manager.clone(), - session_source, - models_manager: Arc::new(ModelsManager::new(auth_manager)), - skills_manager, + state: Arc::new(ConversationManagerState { + conversations: Arc::new(RwLock::new(HashMap::new())), + models_manager: Arc::new(ModelsManager::new(auth_manager.clone())), + skills_manager: Arc::new(SkillsManager::new( + auth_manager.codex_home().to_path_buf(), + )), + auth_manager, + session_source, + agent_bus: AgentBus::default(), + }), #[cfg(any(test, feature = "test-support"))] _test_codex_home_guard: None, } @@ -83,100 +98,62 @@ impl ConversationManager { provider: ModelProviderInfo, codex_home: PathBuf, ) -> Self { - let auth_manager = crate::AuthManager::from_auth_for_testing_with_home(auth, codex_home); - let skills_manager = Arc::new(SkillsManager::new(auth_manager.codex_home().to_path_buf())); + let auth_manager = AuthManager::from_auth_for_testing_with_home(auth, codex_home); Self { - conversations: Arc::new(RwLock::new(HashMap::new())), - auth_manager: auth_manager.clone(), - session_source: SessionSource::Exec, - models_manager: Arc::new(ModelsManager::with_provider(auth_manager, provider)), - skills_manager, + state: Arc::new(ConversationManagerState { + conversations: Arc::new(RwLock::new(HashMap::new())), + models_manager: Arc::new(ModelsManager::with_provider( + auth_manager.clone(), + provider, + )), + skills_manager: Arc::new(SkillsManager::new( + auth_manager.codex_home().to_path_buf(), + )), + auth_manager, + session_source: SessionSource::Exec, + agent_bus: AgentBus::default(), + }), _test_codex_home_guard: None, } } pub fn session_source(&self) -> SessionSource { - self.session_source.clone() + self.state.session_source.clone() } pub fn skills_manager(&self) -> Arc { - self.skills_manager.clone() + self.state.skills_manager.clone() } - pub async fn new_conversation(&self, config: Config) -> CodexResult { - self.spawn_conversation( - config, - self.auth_manager.clone(), - self.models_manager.clone(), - ) - .await + pub fn get_models_manager(&self) -> Arc { + self.state.models_manager.clone() } - async fn spawn_conversation( - &self, - config: Config, - auth_manager: Arc, - models_manager: Arc, - ) -> CodexResult { - let CodexSpawnOk { - codex, - conversation_id, - } = Codex::spawn( - config, - auth_manager, - models_manager, - self.skills_manager.clone(), - InitialHistory::New, - self.session_source.clone(), - ) - .await?; - self.finalize_spawn(codex, conversation_id).await + pub async fn list_models(&self, config: &Config) -> Vec { + self.state.models_manager.list_models(config).await } - async fn finalize_spawn( + pub async fn get_conversation( &self, - codex: Codex, conversation_id: ConversationId, - ) -> CodexResult { - // The first event must be `SessionInitialized`. Validate and forward it - // to the caller so that they can display it in the conversation - // history. - let event = codex.next_event().await?; - let session_configured = match event { - Event { - id, - msg: EventMsg::SessionConfigured(session_configured), - } if id == INITIAL_SUBMIT_ID => session_configured, - _ => { - return Err(CodexErr::SessionConfiguredNotFirstEvent); - } - }; - - let conversation = Arc::new(CodexConversation::new( - codex, - session_configured.rollout_path.clone(), - )); - self.conversations - .write() - .await - .insert(conversation_id, conversation.clone()); + ) -> CodexResult> { + self.state.get_conversation(conversation_id).await + } - Ok(NewConversation { - conversation_id, - conversation, - session_configured, - }) + /// Submit an `Op` to an existing conversation. + pub async fn send_op(&self, conversation_id: ConversationId, op: Op) -> CodexResult { + self.state.send_op(conversation_id, op).await } - pub async fn get_conversation( - &self, - conversation_id: ConversationId, - ) -> CodexResult> { - let conversations = self.conversations.read().await; - conversations - .get(&conversation_id) - .cloned() - .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) + pub async fn new_conversation(&self, config: Config) -> CodexResult { + self.state + .spawn_conversation( + config, + InitialHistory::New, + Arc::clone(&self.state.auth_manager), + self.agent_control(), + ) + .await } pub async fn resume_conversation_from_rollout( @@ -196,70 +173,143 @@ impl ConversationManager { initial_history: InitialHistory, auth_manager: Arc, ) -> CodexResult { - let CodexSpawnOk { - codex, - conversation_id, - } = Codex::spawn( - config, - auth_manager, - self.models_manager.clone(), - self.skills_manager.clone(), - initial_history, - self.session_source.clone(), - ) - .await?; - self.finalize_spawn(codex, conversation_id).await + self.state + .spawn_conversation(config, initial_history, auth_manager, self.agent_control()) + .await } - /// Removes the conversation from the manager's internal map, though the - /// conversation is stored as `Arc`, it is possible that - /// other references to it exist elsewhere. Returns the conversation if the - /// conversation was found and removed. + /// Removes the conversation from the manager's internal map, though the conversation is stored + /// as `Arc`, it is possible that other references to it exist elsewhere. + /// Returns the conversation if the conversation was found and removed. pub async fn remove_conversation( &self, conversation_id: &ConversationId, ) -> Option> { - self.conversations.write().await.remove(conversation_id) + self.state + .conversations + .write() + .await + .remove(conversation_id) } - /// Fork an existing conversation by taking messages up to the given position - /// (not including the message at the given position) and starting a new - /// conversation with identical configuration (unless overridden by the - /// caller's `config`). The new conversation will have a fresh id. + /// Fork an existing conversation by taking messages up to the given position (not including + /// the message at the given position) and starting a new conversation with identical + /// configuration (unless overridden by the caller's `config`). The new conversation will have + /// a fresh id. pub async fn fork_conversation( &self, nth_user_message: usize, config: Config, path: PathBuf, ) -> CodexResult { - // Compute the prefix up to the cut point. let history = RolloutRecorder::get_rollout_history(&path).await?; let history = truncate_before_nth_user_message(history, nth_user_message); + self.state + .spawn_conversation( + config, + history, + Arc::clone(&self.state.auth_manager), + self.agent_control(), + ) + .await + } + + fn agent_control(&self) -> AgentControl { + AgentControl::new(Arc::downgrade(&self.state), self.state.agent_bus.clone()) + } +} + +impl ConversationManagerState { + pub(crate) async fn get_conversation( + &self, + conversation_id: ConversationId, + ) -> CodexResult> { + let conversations = self.conversations.read().await; + conversations + .get(&conversation_id) + .cloned() + .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) + } + + pub(crate) async fn send_op( + &self, + conversation_id: ConversationId, + op: Op, + ) -> CodexResult { + self.get_conversation(conversation_id) + .await? + .submit(op) + .await + } - // Spawn a new conversation with the computed initial history. - let auth_manager = self.auth_manager.clone(); + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + pub(crate) async fn spawn_new_conversation( + &self, + config: Config, + agent_control: AgentControl, + ) -> CodexResult { + self.spawn_conversation( + config, + InitialHistory::New, + Arc::clone(&self.auth_manager), + agent_control, + ) + .await + } + + pub(crate) async fn spawn_conversation( + &self, + config: Config, + initial_history: InitialHistory, + auth_manager: Arc, + agent_control: AgentControl, + ) -> CodexResult { let CodexSpawnOk { codex, conversation_id, } = Codex::spawn( config, auth_manager, - self.models_manager.clone(), - self.skills_manager.clone(), - history, + Arc::clone(&self.models_manager), + Arc::clone(&self.skills_manager), + initial_history, self.session_source.clone(), + agent_control, ) .await?; - self.finalize_spawn(codex, conversation_id).await } - pub async fn list_models(&self, config: &Config) -> Vec { - self.models_manager.list_models(config).await - } + async fn finalize_spawn( + &self, + codex: Codex, + conversation_id: ConversationId, + ) -> CodexResult { + let event = codex.next_event().await?; + let session_configured = match event { + Event { + id, + msg: EventMsg::SessionConfigured(session_configured), + } if id == INITIAL_SUBMIT_ID => session_configured, + _ => { + return Err(CodexErr::SessionConfiguredNotFirstEvent); + } + }; - pub fn get_models_manager(&self) -> Arc { - self.models_manager.clone() + let conversation = Arc::new(CodexConversation::new( + codex, + session_configured.rollout_path.clone(), + )); + self.conversations + .write() + .await + .insert(conversation_id, conversation.clone()); + + Ok(NewConversation { + conversation_id, + conversation, + session_configured, + }) } } @@ -301,12 +351,10 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia #[cfg(test)] mod tests { use super::*; - use crate::codex::make_session_and_context; use assert_matches::assert_matches; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; - use pretty_assertions::assert_eq; fn user_msg(text: &str) -> ResponseItem { ResponseItem::Message { @@ -345,67 +393,15 @@ mod tests { }, ResponseItem::FunctionCall { id: None, + call_id: "c1".to_string(), name: "tool".to_string(), arguments: "{}".to_string(), - call_id: "c1".to_string(), }, - assistant_msg("a4"), - ]; - - // Wrap as InitialHistory::Forked with response items only. - let initial: Vec = items - .iter() - .cloned() - .map(RolloutItem::ResponseItem) - .collect(); - let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1); - let got_items = truncated.get_rollout_items(); - let expected_items = vec![ - RolloutItem::ResponseItem(items[0].clone()), - RolloutItem::ResponseItem(items[1].clone()), - RolloutItem::ResponseItem(items[2].clone()), - ]; - assert_eq!( - serde_json::to_value(&got_items).unwrap(), - serde_json::to_value(&expected_items).unwrap() - ); - - let initial2: Vec = items - .iter() - .cloned() - .map(RolloutItem::ResponseItem) - .collect(); - let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2); - assert_matches!(truncated2, InitialHistory::New); - } - - #[tokio::test] - async fn ignores_session_prefix_messages_when_truncating() { - let (session, turn_context) = make_session_and_context().await; - let mut items = session.build_initial_context(&turn_context); - items.push(user_msg("feature request")); - items.push(assistant_msg("ack")); - items.push(user_msg("second question")); - items.push(assistant_msg("answer")); - - let rollout_items: Vec = items - .iter() - .cloned() - .map(RolloutItem::ResponseItem) - .collect(); - - let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1); - let got_items = truncated.get_rollout_items(); - - let expected: Vec = vec![ - RolloutItem::ResponseItem(items[0].clone()), - RolloutItem::ResponseItem(items[1].clone()), - RolloutItem::ResponseItem(items[2].clone()), ]; - assert_eq!( - serde_json::to_value(&got_items).unwrap(), - serde_json::to_value(&expected).unwrap() - ); + let history = + InitialHistory::Forked(items.into_iter().map(RolloutItem::ResponseItem).collect()); + let forked = truncate_before_nth_user_message(history, 1); + assert_matches!(forked, InitialHistory::Forked(rolled) if rolled.len() == 3); } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 87944840835..b9f2645b13e 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -15,6 +15,7 @@ pub mod codex; mod codex_conversation; mod compact_remote; pub use codex_conversation::CodexConversation; +mod agent; mod codex_delegate; mod command_safety; pub mod config; diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 63b66647e5b..8520dbb2526 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::AuthManager; use crate::RolloutRecorder; +use crate::agent::AgentControl; use crate::exec_policy::ExecPolicyManager; use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; @@ -28,4 +29,5 @@ pub(crate) struct SessionServices { pub(crate) otel_manager: OtelManager, pub(crate) tool_approvals: Mutex, pub(crate) skills_manager: Arc, + pub(crate) agent_control: AgentControl, } diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index a081a6bf12d..7ae3ea159f9 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -143,6 +143,7 @@ impl TestCodexBuilder { config.model_provider.clone(), config.codex_home.clone(), ); + let conversation_manager = Arc::new(conversation_manager); let new_conversation = match resume_from { Some(path) => { @@ -164,7 +165,7 @@ impl TestCodexBuilder { config, codex: new_conversation.conversation, session_configured: new_conversation.session_configured, - conversation_manager: Arc::new(conversation_manager), + conversation_manager, }) } diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index 201d61c9e0d..e4db774ddbc 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -442,7 +442,8 @@ where mutate_config(&mut config); - let conversation_manager = Arc::new(ConversationManager::with_models_provider(auth, provider)); + let conversation_manager = ConversationManager::with_models_provider(auth, provider); + let conversation_manager = Arc::new(conversation_manager); let new_conversation = conversation_manager .new_conversation(config.clone()) From 3e071c460daf2016bb6acbdc65e82ec4c8baa39b Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 6 Jan 2026 13:46:04 +0000 Subject: [PATCH 2/2] NITs --- codex-rs/core/src/agent/control.rs | 4 +- codex-rs/core/src/codex.rs | 10 ++-- codex-rs/core/src/conversation_manager.rs | 66 +++++++++++++++++++---- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 4a2619c9bf4..181051b46c4 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -94,8 +94,8 @@ impl AgentControl { /// When an agent is spawned "headless" (no UI/view attached), there may be no consumer polling /// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can -/// accumulate events indefinitely. This drain task prevents that memory growth while still allowing -/// `AgentBus` to be updated from the received events. +/// accumulate events indefinitely. This drain task prevents that memory growth by polling and +/// discarding events until shutdown. fn spawn_headless_drain( conversation: Arc, conversation_id: ConversationId, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 222f60d5f37..2d73b47f5a7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -982,11 +982,6 @@ impl Session { /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) { - self.services - .agent_control - .bus - .on_event(self.conversation_id, &msg) - .await; let legacy_source = msg.clone(); let event = Event { id: turn_context.sub_id.clone(), @@ -1005,6 +1000,11 @@ impl Session { } pub(crate) async fn send_event_raw(&self, event: Event) { + self.services + .agent_control + .bus + .on_event(self.conversation_id, &event.msg) + .await; // Persist the event into rollout (recorder filters as needed) let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; self.persist_rollout_items(&rollout_items).await; diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 8c56a136d8a..1092a488b42 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -140,11 +140,6 @@ impl ConversationManager { self.state.get_conversation(conversation_id).await } - /// Submit an `Op` to an existing conversation. - pub async fn send_op(&self, conversation_id: ConversationId, op: Op) -> CodexResult { - self.state.send_op(conversation_id, op).await - } - pub async fn new_conversation(&self, config: Config) -> CodexResult { self.state .spawn_conversation( @@ -351,10 +346,12 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia #[cfg(test)] mod tests { use super::*; + use crate::codex::make_session_and_context; use assert_matches::assert_matches; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; + use pretty_assertions::assert_eq; fn user_msg(text: &str) -> ResponseItem { ResponseItem::Message { @@ -397,11 +394,62 @@ mod tests { name: "tool".to_string(), arguments: "{}".to_string(), }, + assistant_msg("a4"), + ]; + + let initial: Vec = items + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); + let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1); + let got_items = truncated.get_rollout_items(); + let expected_items = vec![ + RolloutItem::ResponseItem(items[0].clone()), + RolloutItem::ResponseItem(items[1].clone()), + RolloutItem::ResponseItem(items[2].clone()), + ]; + assert_eq!( + serde_json::to_value(&got_items).unwrap(), + serde_json::to_value(&expected_items).unwrap() + ); + + let initial2: Vec = items + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); + let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2); + assert_matches!(truncated2, InitialHistory::New); + } + + #[tokio::test] + async fn ignores_session_prefix_messages_when_truncating() { + let (session, turn_context) = make_session_and_context().await; + let mut items = session.build_initial_context(&turn_context); + items.push(user_msg("feature request")); + items.push(assistant_msg("ack")); + items.push(user_msg("second question")); + items.push(assistant_msg("answer")); + + let rollout_items: Vec = items + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); + + let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1); + let got_items = truncated.get_rollout_items(); + + let expected: Vec = vec![ + RolloutItem::ResponseItem(items[0].clone()), + RolloutItem::ResponseItem(items[1].clone()), + RolloutItem::ResponseItem(items[2].clone()), ]; - let history = - InitialHistory::Forked(items.into_iter().map(RolloutItem::ResponseItem).collect()); - let forked = truncate_before_nth_user_message(history, 1); - assert_matches!(forked, InitialHistory::Forked(rolled) if rolled.len() == 3); + assert_eq!( + serde_json::to_value(&got_items).unwrap(), + serde_json::to_value(&expected).unwrap() + ); } }