diff --git a/codex-rs/core/src/agent/bus.rs b/codex-rs/core/src/agent/bus.rs new file mode 100644 index 00000000000..0b15f415f1a --- /dev/null +++ b/codex-rs/core/src/agent/bus.rs @@ -0,0 +1,61 @@ +use codex_protocol::ConversationId; +use codex_protocol::protocol::EventMsg; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Status store for globally-tracked agents. +#[derive(Clone, Default)] +pub(crate) struct AgentBus { + /// In-memory map of conversation id to the latest derived status. + statuses: Arc>>, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum AgentStatus { + PendingInit, + Running, + Completed(Option), + Errored(String), + Shutdown, + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + NotFound, +} + +impl AgentBus { + /// Fetch the last known status for `agent_id`, returning `NotFound` if unseen. + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + pub(crate) async fn status(&self, agent_id: ConversationId) -> AgentStatus { + let statuses = self.statuses.read().await; + statuses + .get(&agent_id) + .cloned() + .unwrap_or(AgentStatus::NotFound) + } + + /// Derive and record agent status from a single emitted event. + pub(crate) async fn on_event(&self, conversation_id: ConversationId, msg: &EventMsg) { + let next_status = match msg { + EventMsg::TaskStarted(_) => Some(AgentStatus::Running), + EventMsg::TaskComplete(ev) => { + Some(AgentStatus::Completed(ev.last_agent_message.clone())) + } + EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))), + EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())), + EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown), + _ => None, + }; + if let Some(status) = next_status { + self.record_status(&conversation_id, status).await; + } + } + + /// Force-set the tracked status for an agent conversation. + pub(crate) async fn record_status( + &self, + conversation_id: &ConversationId, + status: AgentStatus, + ) { + self.statuses.write().await.insert(*conversation_id, status); + } +} diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs new file mode 100644 index 00000000000..181051b46c4 --- /dev/null +++ b/codex-rs/core/src/agent/control.rs @@ -0,0 +1,254 @@ +use crate::CodexConversation; +use crate::agent::AgentBus; +use crate::agent::AgentStatus; +use crate::conversation_manager::ConversationManagerState; +use crate::error::CodexErr; +use crate::error::Result as CodexResult; +use codex_protocol::ConversationId; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use codex_protocol::user_input::UserInput; +use std::sync::Arc; +use std::sync::Weak; + +/// Control-plane handle for multi-agent operations. +/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to +/// spawn new agents and the inter-agent communication layer. +#[derive(Clone, Default)] +pub(crate) struct AgentControl { + /// Weak handle back to the global conversation registry/state. + /// This is `Weak` to avoid reference cycles and shadow persistence of the form + /// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`. + manager: Weak, + /// Shared agent status store updated from emitted events. + pub(crate) bus: AgentBus, +} + +impl AgentControl { + /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. + pub(crate) fn new(manager: Weak, bus: AgentBus) -> Self { + Self { manager, bus } + } + + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + /// Spawn a new agent conversation and submit the initial prompt. + /// + /// If `headless` is true, a background drain task is spawned to prevent unbounded event growth + /// of the channel queue when there is no client actively reading the conversation events. + pub(crate) async fn spawn_agent( + &self, + config: crate::config::Config, + prompt: String, + headless: bool, + ) -> CodexResult { + let state = self.upgrade()?; + let new_conversation = state.spawn_new_conversation(config, self.clone()).await?; + + self.bus + .record_status(&new_conversation.conversation_id, AgentStatus::PendingInit) + .await; + + if headless { + spawn_headless_drain( + Arc::clone(&new_conversation.conversation), + new_conversation.conversation_id, + self.clone(), + ); + } + + self.send_prompt(new_conversation.conversation_id, prompt) + .await?; + + self.bus + .record_status(&new_conversation.conversation_id, AgentStatus::Running) + .await; + + Ok(new_conversation.conversation_id) + } + + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + /// Send a `user` prompt to an existing agent conversation. + pub(crate) async fn send_prompt( + &self, + agent_id: ConversationId, + prompt: String, + ) -> CodexResult { + let state = self.upgrade()?; + state + .send_op( + agent_id, + Op::UserInput { + items: vec![UserInput::Text { text: prompt }], + final_output_json_schema: None, + }, + ) + .await + } + + fn upgrade(&self) -> CodexResult> { + self.manager.upgrade().ok_or_else(|| { + CodexErr::UnsupportedOperation("conversation manager dropped".to_string()) + }) + } +} + +/// When an agent is spawned "headless" (no UI/view attached), there may be no consumer polling +/// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can +/// accumulate events indefinitely. This drain task prevents that memory growth by polling and +/// discarding events until shutdown. +fn spawn_headless_drain( + conversation: Arc, + conversation_id: ConversationId, + agent_control: AgentControl, +) { + tokio::spawn(async move { + loop { + match conversation.next_event().await { + Ok(event) => { + if matches!(event.msg, EventMsg::ShutdownComplete) { + break; + } + } + Err(err) => { + agent_control + .bus + .record_status(&conversation_id, AgentStatus::Errored(err.to_string())) + .await; + break; + } + } + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::protocol::ErrorEvent; + use codex_protocol::protocol::TaskCompleteEvent; + use codex_protocol::protocol::TaskStartedEvent; + use codex_protocol::protocol::TurnAbortReason; + use codex_protocol::protocol::TurnAbortedEvent; + use pretty_assertions::assert_eq; + + #[tokio::test] + async fn send_prompt_errors_when_manager_dropped() { + let control = AgentControl::default(); + let err = control + .send_prompt(ConversationId::new(), "hello".to_string()) + .await + .expect_err("send_prompt should fail without a manager"); + assert_eq!( + err.to_string(), + "unsupported operation: conversation manager dropped" + ); + } + + #[tokio::test] + async fn record_status_persists_to_bus() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .record_status(&conversation_id, AgentStatus::PendingInit) + .await; + + let got = control.bus.status(conversation_id).await; + assert_eq!(got, AgentStatus::PendingInit); + } + + #[tokio::test] + async fn on_event_updates_status_from_task_started() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::TaskStarted(TaskStartedEvent { + model_context_window: None, + }), + ) + .await; + + let got = control.bus.status(conversation_id).await; + assert_eq!(got, AgentStatus::Running); + } + + #[tokio::test] + async fn on_event_updates_status_from_task_complete() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: Some("done".to_string()), + }), + ) + .await; + + let expected = AgentStatus::Completed(Some("done".to_string())); + let got = control.bus.status(conversation_id).await; + assert_eq!(got, expected); + } + + #[tokio::test] + async fn on_event_updates_status_from_error() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::Error(ErrorEvent { + message: "boom".to_string(), + codex_error_info: None, + }), + ) + .await; + + let expected = AgentStatus::Errored("boom".to_string()); + let got = control.bus.status(conversation_id).await; + assert_eq!(got, expected); + } + + #[tokio::test] + async fn on_event_updates_status_from_turn_aborted() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event( + conversation_id, + &EventMsg::TurnAborted(TurnAbortedEvent { + reason: TurnAbortReason::Interrupted, + }), + ) + .await; + + let expected = AgentStatus::Errored("Interrupted".to_string()); + let got = control.bus.status(conversation_id).await; + assert_eq!(got, expected); + } + + #[tokio::test] + async fn on_event_updates_status_from_shutdown_complete() { + let control = AgentControl::default(); + let conversation_id = ConversationId::new(); + + control + .bus + .on_event(conversation_id, &EventMsg::ShutdownComplete) + .await; + + let got = control.bus.status(conversation_id).await; + assert_eq!(got, AgentStatus::Shutdown); + } +} diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs new file mode 100644 index 00000000000..29eb9577a29 --- /dev/null +++ b/codex-rs/core/src/agent/mod.rs @@ -0,0 +1,6 @@ +pub(crate) mod bus; +pub(crate) mod control; + +pub(crate) use bus::AgentBus; +pub(crate) use bus::AgentStatus; +pub(crate) use control::AgentControl; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index bb4a45a75df..2d73b47f5a7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -8,6 +8,7 @@ use std::sync::atomic::Ordering; use crate::AuthManager; use crate::SandboxState; +use crate::agent::AgentControl; use crate::client_common::REVIEW_PROMPT; use crate::compact; use crate::compact::run_inline_auto_compact_task; @@ -207,13 +208,14 @@ fn maybe_push_chat_wire_api_deprecation( impl Codex { /// Spawn a new [`Codex`] and initialize the session. - pub async fn spawn( + pub(crate) async fn spawn( config: Config, auth_manager: Arc, models_manager: Arc, skills_manager: Arc, conversation_history: InitialHistory, session_source: SessionSource, + agent_control: AgentControl, ) -> CodexResult { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); @@ -284,6 +286,7 @@ impl Codex { conversation_history, session_source_clone, skills_manager, + agent_control, ) .await .map_err(|e| { @@ -549,6 +552,7 @@ impl Session { initial_history: InitialHistory, session_source: SessionSource, skills_manager: Arc, + agent_control: AgentControl, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -670,6 +674,7 @@ impl Session { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + agent_control, }; let sess = Arc::new(Session { @@ -995,6 +1000,11 @@ impl Session { } pub(crate) async fn send_event_raw(&self, event: Event) { + self.services + .agent_control + .bus + .on_event(self.conversation_id, &event.msg) + .await; // Persist the event into rollout (recorder filters as needed) let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; self.persist_rollout_items(&rollout_items).await; @@ -3225,6 +3235,7 @@ mod tests { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); let models_manager = Arc::new(ModelsManager::new(auth_manager.clone())); + let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); let model = ModelsManager::get_model_offline(config.model.as_deref()); let session_configuration = SessionConfiguration { @@ -3271,6 +3282,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + agent_control, }; let turn_context = Session::make_turn_context( @@ -3312,6 +3324,7 @@ mod tests { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); let models_manager = Arc::new(ModelsManager::new(auth_manager.clone())); + let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); let model = ModelsManager::get_model_offline(config.model.as_deref()); let session_configuration = SessionConfiguration { @@ -3358,6 +3371,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + agent_control, }; let turn_context = Arc::new(Session::make_turn_context( diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index e767abf4ad2..10d45190d29 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -52,6 +52,7 @@ pub(crate) async fn run_codex_conversation_interactive( Arc::clone(&parent_session.services.skills_manager), initial_history.unwrap_or(InitialHistory::New), SessionSource::SubAgent(SubAgentSource::Review), + parent_session.services.agent_control.clone(), ) .await?; let codex = Arc::new(codex); diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 5093e03c60f..1092a488b42 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -3,6 +3,8 @@ use crate::AuthManager; use crate::CodexAuth; #[cfg(any(test, feature = "test-support"))] use crate::ModelProviderInfo; +use crate::agent::AgentBus; +use crate::agent::AgentControl; use crate::codex::Codex; use crate::codex::CodexSpawnOk; use crate::codex::INITIAL_SUBMIT_ID; @@ -21,6 +23,7 @@ use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelPreset; use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use std::collections::HashMap; @@ -41,24 +44,36 @@ pub struct NewConversation { /// [`ConversationManager`] is responsible for creating conversations and /// maintaining them in memory. pub struct ConversationManager { + state: Arc, + #[cfg(any(test, feature = "test-support"))] + _test_codex_home_guard: Option, +} + +/// Shared, `Arc`-owned state for [`ConversationManager`]. This `Arc` is required to have a single +/// `Arc` reference that can be downgraded to by `AgentControl` while preventing every single +/// function to require an `Arc<&Self>`. +pub(crate) struct ConversationManagerState { conversations: Arc>>>, auth_manager: Arc, models_manager: Arc, skills_manager: Arc, session_source: SessionSource, - #[cfg(any(test, feature = "test-support"))] - _test_codex_home_guard: Option, + agent_bus: AgentBus, } impl ConversationManager { pub fn new(auth_manager: Arc, session_source: SessionSource) -> Self { - let skills_manager = Arc::new(SkillsManager::new(auth_manager.codex_home().to_path_buf())); Self { - conversations: Arc::new(RwLock::new(HashMap::new())), - auth_manager: auth_manager.clone(), - session_source, - models_manager: Arc::new(ModelsManager::new(auth_manager)), - skills_manager, + state: Arc::new(ConversationManagerState { + conversations: Arc::new(RwLock::new(HashMap::new())), + models_manager: Arc::new(ModelsManager::new(auth_manager.clone())), + skills_manager: Arc::new(SkillsManager::new( + auth_manager.codex_home().to_path_buf(), + )), + auth_manager, + session_source, + agent_bus: AgentBus::default(), + }), #[cfg(any(test, feature = "test-support"))] _test_codex_home_guard: None, } @@ -83,100 +98,57 @@ impl ConversationManager { provider: ModelProviderInfo, codex_home: PathBuf, ) -> Self { - let auth_manager = crate::AuthManager::from_auth_for_testing_with_home(auth, codex_home); - let skills_manager = Arc::new(SkillsManager::new(auth_manager.codex_home().to_path_buf())); + let auth_manager = AuthManager::from_auth_for_testing_with_home(auth, codex_home); Self { - conversations: Arc::new(RwLock::new(HashMap::new())), - auth_manager: auth_manager.clone(), - session_source: SessionSource::Exec, - models_manager: Arc::new(ModelsManager::with_provider(auth_manager, provider)), - skills_manager, + state: Arc::new(ConversationManagerState { + conversations: Arc::new(RwLock::new(HashMap::new())), + models_manager: Arc::new(ModelsManager::with_provider( + auth_manager.clone(), + provider, + )), + skills_manager: Arc::new(SkillsManager::new( + auth_manager.codex_home().to_path_buf(), + )), + auth_manager, + session_source: SessionSource::Exec, + agent_bus: AgentBus::default(), + }), _test_codex_home_guard: None, } } pub fn session_source(&self) -> SessionSource { - self.session_source.clone() + self.state.session_source.clone() } pub fn skills_manager(&self) -> Arc { - self.skills_manager.clone() - } - - pub async fn new_conversation(&self, config: Config) -> CodexResult { - self.spawn_conversation( - config, - self.auth_manager.clone(), - self.models_manager.clone(), - ) - .await + self.state.skills_manager.clone() } - async fn spawn_conversation( - &self, - config: Config, - auth_manager: Arc, - models_manager: Arc, - ) -> CodexResult { - let CodexSpawnOk { - codex, - conversation_id, - } = Codex::spawn( - config, - auth_manager, - models_manager, - self.skills_manager.clone(), - InitialHistory::New, - self.session_source.clone(), - ) - .await?; - self.finalize_spawn(codex, conversation_id).await + pub fn get_models_manager(&self) -> Arc { + self.state.models_manager.clone() } - async fn finalize_spawn( - &self, - codex: Codex, - conversation_id: ConversationId, - ) -> CodexResult { - // The first event must be `SessionInitialized`. Validate and forward it - // to the caller so that they can display it in the conversation - // history. - let event = codex.next_event().await?; - let session_configured = match event { - Event { - id, - msg: EventMsg::SessionConfigured(session_configured), - } if id == INITIAL_SUBMIT_ID => session_configured, - _ => { - return Err(CodexErr::SessionConfiguredNotFirstEvent); - } - }; - - let conversation = Arc::new(CodexConversation::new( - codex, - session_configured.rollout_path.clone(), - )); - self.conversations - .write() - .await - .insert(conversation_id, conversation.clone()); - - Ok(NewConversation { - conversation_id, - conversation, - session_configured, - }) + pub async fn list_models(&self, config: &Config) -> Vec { + self.state.models_manager.list_models(config).await } pub async fn get_conversation( &self, conversation_id: ConversationId, ) -> CodexResult> { - let conversations = self.conversations.read().await; - conversations - .get(&conversation_id) - .cloned() - .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) + self.state.get_conversation(conversation_id).await + } + + pub async fn new_conversation(&self, config: Config) -> CodexResult { + self.state + .spawn_conversation( + config, + InitialHistory::New, + Arc::clone(&self.state.auth_manager), + self.agent_control(), + ) + .await } pub async fn resume_conversation_from_rollout( @@ -196,70 +168,143 @@ impl ConversationManager { initial_history: InitialHistory, auth_manager: Arc, ) -> CodexResult { - let CodexSpawnOk { - codex, - conversation_id, - } = Codex::spawn( - config, - auth_manager, - self.models_manager.clone(), - self.skills_manager.clone(), - initial_history, - self.session_source.clone(), - ) - .await?; - self.finalize_spawn(codex, conversation_id).await + self.state + .spawn_conversation(config, initial_history, auth_manager, self.agent_control()) + .await } - /// Removes the conversation from the manager's internal map, though the - /// conversation is stored as `Arc`, it is possible that - /// other references to it exist elsewhere. Returns the conversation if the - /// conversation was found and removed. + /// Removes the conversation from the manager's internal map, though the conversation is stored + /// as `Arc`, it is possible that other references to it exist elsewhere. + /// Returns the conversation if the conversation was found and removed. pub async fn remove_conversation( &self, conversation_id: &ConversationId, ) -> Option> { - self.conversations.write().await.remove(conversation_id) + self.state + .conversations + .write() + .await + .remove(conversation_id) } - /// Fork an existing conversation by taking messages up to the given position - /// (not including the message at the given position) and starting a new - /// conversation with identical configuration (unless overridden by the - /// caller's `config`). The new conversation will have a fresh id. + /// Fork an existing conversation by taking messages up to the given position (not including + /// the message at the given position) and starting a new conversation with identical + /// configuration (unless overridden by the caller's `config`). The new conversation will have + /// a fresh id. pub async fn fork_conversation( &self, nth_user_message: usize, config: Config, path: PathBuf, ) -> CodexResult { - // Compute the prefix up to the cut point. let history = RolloutRecorder::get_rollout_history(&path).await?; let history = truncate_before_nth_user_message(history, nth_user_message); + self.state + .spawn_conversation( + config, + history, + Arc::clone(&self.state.auth_manager), + self.agent_control(), + ) + .await + } + + fn agent_control(&self) -> AgentControl { + AgentControl::new(Arc::downgrade(&self.state), self.state.agent_bus.clone()) + } +} - // Spawn a new conversation with the computed initial history. - let auth_manager = self.auth_manager.clone(); +impl ConversationManagerState { + pub(crate) async fn get_conversation( + &self, + conversation_id: ConversationId, + ) -> CodexResult> { + let conversations = self.conversations.read().await; + conversations + .get(&conversation_id) + .cloned() + .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) + } + + pub(crate) async fn send_op( + &self, + conversation_id: ConversationId, + op: Op, + ) -> CodexResult { + self.get_conversation(conversation_id) + .await? + .submit(op) + .await + } + + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + pub(crate) async fn spawn_new_conversation( + &self, + config: Config, + agent_control: AgentControl, + ) -> CodexResult { + self.spawn_conversation( + config, + InitialHistory::New, + Arc::clone(&self.auth_manager), + agent_control, + ) + .await + } + + pub(crate) async fn spawn_conversation( + &self, + config: Config, + initial_history: InitialHistory, + auth_manager: Arc, + agent_control: AgentControl, + ) -> CodexResult { let CodexSpawnOk { codex, conversation_id, } = Codex::spawn( config, auth_manager, - self.models_manager.clone(), - self.skills_manager.clone(), - history, + Arc::clone(&self.models_manager), + Arc::clone(&self.skills_manager), + initial_history, self.session_source.clone(), + agent_control, ) .await?; - self.finalize_spawn(codex, conversation_id).await } - pub async fn list_models(&self, config: &Config) -> Vec { - self.models_manager.list_models(config).await - } + async fn finalize_spawn( + &self, + codex: Codex, + conversation_id: ConversationId, + ) -> CodexResult { + let event = codex.next_event().await?; + let session_configured = match event { + Event { + id, + msg: EventMsg::SessionConfigured(session_configured), + } if id == INITIAL_SUBMIT_ID => session_configured, + _ => { + return Err(CodexErr::SessionConfiguredNotFirstEvent); + } + }; - pub fn get_models_manager(&self) -> Arc { - self.models_manager.clone() + let conversation = Arc::new(CodexConversation::new( + codex, + session_configured.rollout_path.clone(), + )); + self.conversations + .write() + .await + .insert(conversation_id, conversation.clone()); + + Ok(NewConversation { + conversation_id, + conversation, + session_configured, + }) } } @@ -345,14 +390,13 @@ mod tests { }, ResponseItem::FunctionCall { id: None, + call_id: "c1".to_string(), name: "tool".to_string(), arguments: "{}".to_string(), - call_id: "c1".to_string(), }, assistant_msg("a4"), ]; - // Wrap as InitialHistory::Forked with response items only. let initial: Vec = items .iter() .cloned() diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 87944840835..b9f2645b13e 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -15,6 +15,7 @@ pub mod codex; mod codex_conversation; mod compact_remote; pub use codex_conversation::CodexConversation; +mod agent; mod codex_delegate; mod command_safety; pub mod config; diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 63b66647e5b..8520dbb2526 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::AuthManager; use crate::RolloutRecorder; +use crate::agent::AgentControl; use crate::exec_policy::ExecPolicyManager; use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; @@ -28,4 +29,5 @@ pub(crate) struct SessionServices { pub(crate) otel_manager: OtelManager, pub(crate) tool_approvals: Mutex, pub(crate) skills_manager: Arc, + pub(crate) agent_control: AgentControl, } diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index a081a6bf12d..7ae3ea159f9 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -143,6 +143,7 @@ impl TestCodexBuilder { config.model_provider.clone(), config.codex_home.clone(), ); + let conversation_manager = Arc::new(conversation_manager); let new_conversation = match resume_from { Some(path) => { @@ -164,7 +165,7 @@ impl TestCodexBuilder { config, codex: new_conversation.conversation, session_configured: new_conversation.session_configured, - conversation_manager: Arc::new(conversation_manager), + conversation_manager, }) } diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index 201d61c9e0d..e4db774ddbc 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -442,7 +442,8 @@ where mutate_config(&mut config); - let conversation_manager = Arc::new(ConversationManager::with_models_provider(auth, provider)); + let conversation_manager = ConversationManager::with_models_provider(auth, provider); + let conversation_manager = Arc::new(conversation_manager); let new_conversation = conversation_manager .new_conversation(config.clone())