diff --git a/codex-rs/cli/src/proto.rs b/codex-rs/cli/src/proto.rs index 6c1de7eaa9..3bc4d81618 100644 --- a/codex-rs/cli/src/proto.rs +++ b/codex-rs/cli/src/proto.rs @@ -1,15 +1,14 @@ use std::io::IsTerminal; -use std::sync::Arc; use clap::Parser; use codex_common::CliConfigOverrides; -use codex_core::Codex; -use codex_core::CodexSpawnOk; +use codex_core::ConversationManager; +use codex_core::NewConversation; use codex_core::config::Config; use codex_core::config::ConfigOverrides; +use codex_core::protocol::Event; +use codex_core::protocol::EventMsg; use codex_core::protocol::Submission; -use codex_core::util::notify_on_sigint; -use codex_login::CodexAuth; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tracing::error; @@ -36,22 +35,38 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> { .map_err(anyhow::Error::msg)?; let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?; - let auth = CodexAuth::from_codex_home(&config.codex_home)?; - let ctrl_c = notify_on_sigint(); - let CodexSpawnOk { codex, .. } = Codex::spawn(config, auth, ctrl_c.clone()).await?; - let codex = Arc::new(codex); + // Use conversation_manager API to start a conversation + let conversation_manager = ConversationManager::default(); + let NewConversation { + conversation_id: _, + conversation, + session_configured, + } = conversation_manager.new_conversation(config).await?; + + // Simulate streaming the session_configured event. + let synthetic_event = Event { + // Fake id value. + id: "".to_string(), + msg: EventMsg::SessionConfigured(session_configured), + }; + let session_configured_event = match serde_json::to_string(&synthetic_event) { + Ok(s) => s, + Err(e) => { + error!("Failed to serialize session_configured: {e}"); + return Err(anyhow::Error::from(e)); + } + }; + println!("{session_configured_event}"); // Task that reads JSON lines from stdin and forwards to Submission Queue let sq_fut = { - let codex = codex.clone(); - let ctrl_c = ctrl_c.clone(); + let conversation = conversation.clone(); async move { let stdin = BufReader::new(tokio::io::stdin()); let mut lines = stdin.lines(); loop { let result = tokio::select! { - _ = ctrl_c.notified() => { - info!("Interrupted, exiting"); + _ = tokio::signal::ctrl_c() => { break }, res = lines.next_line() => res, @@ -65,7 +80,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> { } match serde_json::from_str::(line) { Ok(sub) => { - if let Err(e) = codex.submit_with_id(sub).await { + if let Err(e) = conversation.submit_with_id(sub).await { error!("{e:#}"); break; } @@ -88,8 +103,8 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> { let eq_fut = async move { loop { let event = tokio::select! { - _ = ctrl_c.notified() => break, - event = codex.next_event() => event, + _ = tokio::signal::ctrl_c() => break, + event = conversation.next_event() => event, }; match event { Ok(event) => { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b0905e34ab..5ac1392bee 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -20,7 +20,6 @@ use futures::prelude::*; use mcp_types::CallToolResult; use serde::Serialize; use serde_json; -use tokio::sync::Notify; use tokio::sync::oneshot; use tokio::task::AbortHandle; use tracing::debug; @@ -124,11 +123,7 @@ pub struct CodexSpawnOk { impl Codex { /// Spawn a new [`Codex`] and initialize the session. - pub async fn spawn( - config: Config, - auth: Option, - ctrl_c: Arc, - ) -> CodexResult { + pub async fn spawn(config: Config, auth: Option) -> CodexResult { // experimental resume path (undocumented) let resume_path = config.experimental_resume.clone(); info!("resume_path: {resume_path:?}"); @@ -156,9 +151,9 @@ impl Codex { // Generate a unique ID for the lifetime of this Codex session. let session_id = Uuid::new_v4(); - tokio::spawn(submission_loop( - session_id, config, auth, rx_sub, tx_event, ctrl_c, - )); + + // This task will run until Op::Shutdown is received. + tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, @@ -210,7 +205,6 @@ impl Codex { pub(crate) struct Session { client: ModelClient, pub(crate) tx_event: Sender, - ctrl_c: Arc, /// The session's current working directory. All relative paths provided by /// the model as well as sandbox policies are resolved against this path @@ -493,7 +487,6 @@ impl Session { let result = process_exec_tool_call( exec_args.params, exec_args.sandbox_type, - exec_args.ctrl_c, exec_args.sandbox_policy, exec_args.codex_linux_sandbox_exe, exec_args.stdout_stream, @@ -578,7 +571,7 @@ impl Session { .await } - pub fn abort(&self) { + fn abort(&self) { info!("Aborting existing session"); let mut state = self.state.lock().unwrap(); state.pending_approvals.clear(); @@ -709,7 +702,6 @@ async fn submission_loop( auth: Option, rx_sub: Receiver, tx_event: Sender, - ctrl_c: Arc, ) { let mut sess: Option> = None; // shorthand - send an event when there is no active session @@ -724,21 +716,8 @@ async fn submission_loop( tx_event.send(event).await.ok(); }; - loop { - let interrupted = ctrl_c.notified(); - let sub = tokio::select! { - res = rx_sub.recv() => match res { - Ok(sub) => sub, - Err(_) => break, - }, - _ = interrupted => { - if let Some(sess) = sess.as_ref(){ - sess.abort(); - } - continue; - }, - }; - + // To break out of this loop, send Op::Shutdown. + while let Ok(sub) = rx_sub.recv().await { debug!(?sub, "Submission"); match sub.op { Op::Interrupt => { @@ -877,7 +856,6 @@ async fn submission_loop( config.include_plan_tool, ), tx_event: tx_event.clone(), - ctrl_c: Arc::clone(&ctrl_c), user_instructions, base_instructions, approval_policy, @@ -1787,7 +1765,6 @@ fn parse_container_exec_arguments( pub struct ExecInvokeArgs<'a> { pub params: ExecParams, pub sandbox_type: SandboxType, - pub ctrl_c: Arc, pub sandbox_policy: &'a SandboxPolicy, pub codex_linux_sandbox_exe: &'a Option, pub stdout_stream: Option, @@ -1972,7 +1949,6 @@ async fn handle_container_exec_with_params( ExecInvokeArgs { params: params.clone(), sandbox_type, - ctrl_c: sess.ctrl_c.clone(), sandbox_policy: &sess.sandbox_policy, codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe, stdout_stream: Some(StdoutStream { @@ -2104,7 +2080,6 @@ async fn handle_sandbox_error( ExecInvokeArgs { params, sandbox_type: SandboxType::None, - ctrl_c: sess.ctrl_c.clone(), sandbox_policy: &sess.sandbox_policy, codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe, stdout_stream: Some(StdoutStream { diff --git a/codex-rs/core/src/codex_conversation.rs b/codex-rs/core/src/codex_conversation.rs new file mode 100644 index 0000000000..d3b00046fc --- /dev/null +++ b/codex-rs/core/src/codex_conversation.rs @@ -0,0 +1,30 @@ +use crate::codex::Codex; +use crate::error::Result as CodexResult; +use crate::protocol::Event; +use crate::protocol::Op; +use crate::protocol::Submission; + +pub struct CodexConversation { + codex: Codex, +} + +/// Conduit for the bidirectional stream of messages that compose a conversation +/// in Codex. +impl CodexConversation { + pub(crate) fn new(codex: Codex) -> Self { + Self { codex } + } + + pub async fn submit(&self, op: Op) -> CodexResult { + self.codex.submit(op).await + } + + /// Use sparingly: this is intended to be removed soon. + pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { + self.codex.submit_with_id(sub).await + } + + pub async fn next_event(&self) -> CodexResult { + self.codex.next_event().await + } +} diff --git a/codex-rs/core/src/codex_wrapper.rs b/codex-rs/core/src/codex_wrapper.rs deleted file mode 100644 index dc10ec8d84..0000000000 --- a/codex-rs/core/src/codex_wrapper.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::sync::Arc; - -use crate::Codex; -use crate::CodexSpawnOk; -use crate::config::Config; -use crate::protocol::Event; -use crate::protocol::EventMsg; -use crate::util::notify_on_sigint; -use codex_login::CodexAuth; -use tokio::sync::Notify; -use uuid::Uuid; - -/// Represents an active Codex conversation, including the first event -/// (which is [`EventMsg::SessionConfigured`]). -pub struct CodexConversation { - pub codex: Codex, - pub session_id: Uuid, - pub session_configured: Event, - pub ctrl_c: Arc, -} - -/// Spawn a new [`Codex`] and initialize the session. -/// -/// Returns the wrapped [`Codex`] **and** the `SessionInitialized` event that -/// is received as a response to the initial `ConfigureSession` submission so -/// that callers can surface the information to the UI. -pub async fn init_codex(config: Config) -> anyhow::Result { - let ctrl_c = notify_on_sigint(); - let auth = CodexAuth::from_codex_home(&config.codex_home)?; - let CodexSpawnOk { - codex, - init_id, - session_id, - } = Codex::spawn(config, auth, ctrl_c.clone()).await?; - - // The first event must be `SessionInitialized`. Validate and forward it to - // the caller so that they can display it in the conversation history. - let event = codex.next_event().await?; - if event.id != init_id - || !matches!( - &event, - Event { - id: _id, - msg: EventMsg::SessionConfigured(_), - } - ) - { - return Err(anyhow::anyhow!( - "expected SessionInitialized but got {event:?}" - )); - } - - Ok(CodexConversation { - codex, - session_id, - session_configured: event, - ctrl_c, - }) -} diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs new file mode 100644 index 0000000000..4a6cdc1bf1 --- /dev/null +++ b/codex-rs/core/src/conversation_manager.rs @@ -0,0 +1,96 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use codex_login::CodexAuth; +use tokio::sync::RwLock; +use uuid::Uuid; + +use crate::codex::Codex; +use crate::codex::CodexSpawnOk; +use crate::codex_conversation::CodexConversation; +use crate::config::Config; +use crate::error::CodexErr; +use crate::error::Result as CodexResult; +use crate::protocol::Event; +use crate::protocol::EventMsg; +use crate::protocol::SessionConfiguredEvent; + +/// Represents a newly created Codex conversation, including the first event +/// (which is [`EventMsg::SessionConfigured`]). +pub struct NewConversation { + pub conversation_id: Uuid, + pub conversation: Arc, + pub session_configured: SessionConfiguredEvent, +} + +/// [`ConversationManager`] is responsible for creating conversations and +/// maintaining them in memory. +pub struct ConversationManager { + conversations: Arc>>>, +} + +impl Default for ConversationManager { + fn default() -> Self { + Self { + conversations: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl ConversationManager { + pub async fn new_conversation(&self, config: Config) -> CodexResult { + let auth = CodexAuth::from_codex_home(&config.codex_home)?; + self.new_conversation_with_auth(config, auth).await + } + + /// Used for integration tests: should not be used by ordinary business + /// logic. + pub async fn new_conversation_with_auth( + &self, + config: Config, + auth: Option, + ) -> CodexResult { + let CodexSpawnOk { + codex, + init_id, + session_id: conversation_id, + } = Codex::spawn(config, auth).await?; + + // The first event must be `SessionInitialized`. Validate and forward it + // to the caller so that they can display it in the conversation + // history. + let event = codex.next_event().await?; + let session_configured = match event { + Event { + id, + msg: EventMsg::SessionConfigured(session_configured), + } if id == init_id => session_configured, + _ => { + return Err(CodexErr::SessionConfiguredNotFirstEvent); + } + }; + + let conversation = Arc::new(CodexConversation::new(codex)); + self.conversations + .write() + .await + .insert(conversation_id, conversation.clone()); + + Ok(NewConversation { + conversation_id, + conversation, + session_configured, + }) + } + + pub async fn get_conversation( + &self, + conversation_id: Uuid, + ) -> CodexResult> { + let conversations = self.conversations.read().await; + conversations + .get(&conversation_id) + .cloned() + .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) + } +} diff --git a/codex-rs/core/src/error.rs b/codex-rs/core/src/error.rs index 2931d30636..4df6663394 100644 --- a/codex-rs/core/src/error.rs +++ b/codex-rs/core/src/error.rs @@ -3,6 +3,7 @@ use serde_json; use std::io; use thiserror::Error; use tokio::task::JoinError; +use uuid::Uuid; pub type Result = std::result::Result; @@ -44,6 +45,12 @@ pub enum CodexErr { #[error("stream disconnected before completion: {0}")] Stream(String), + #[error("no conversation with id: {0}")] + ConversationNotFound(Uuid), + + #[error("session configured event was not the first event in the stream")] + SessionConfiguredNotFirstEvent, + /// Returned by run_command_stream when the spawned child process timed out (10s). #[error("timeout waiting for child process to exit")] Timeout, diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index c964466f78..50bba1b4b7 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -6,7 +6,6 @@ use std::io; use std::path::Path; use std::path::PathBuf; use std::process::ExitStatus; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -15,7 +14,6 @@ use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; use tokio::process::Child; -use tokio::sync::Notify; use crate::error::CodexErr; use crate::error::Result; @@ -80,7 +78,6 @@ pub struct StdoutStream { pub async fn process_exec_tool_call( params: ExecParams, sandbox_type: SandboxType, - ctrl_c: Arc, sandbox_policy: &SandboxPolicy, codex_linux_sandbox_exe: &Option, stdout_stream: Option, @@ -89,7 +86,7 @@ pub async fn process_exec_tool_call( let raw_output_result: std::result::Result = match sandbox_type { - SandboxType::None => exec(params, sandbox_policy, ctrl_c, stdout_stream.clone()).await, + SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await, SandboxType::MacosSeatbelt => { let timeout = params.timeout_duration(); let ExecParams { @@ -103,7 +100,7 @@ pub async fn process_exec_tool_call( env, ) .await?; - consume_truncated_output(child, ctrl_c, timeout, stdout_stream.clone()).await + consume_truncated_output(child, timeout, stdout_stream.clone()).await } SandboxType::LinuxSeccomp => { let timeout = params.timeout_duration(); @@ -124,7 +121,7 @@ pub async fn process_exec_tool_call( ) .await?; - consume_truncated_output(child, ctrl_c, timeout, stdout_stream).await + consume_truncated_output(child, timeout, stdout_stream).await } }; let duration = start.elapsed(); @@ -286,7 +283,6 @@ pub struct ExecToolCallOutput { async fn exec( params: ExecParams, sandbox_policy: &SandboxPolicy, - ctrl_c: Arc, stdout_stream: Option, ) -> Result { let timeout = params.timeout_duration(); @@ -311,14 +307,13 @@ async fn exec( env, ) .await?; - consume_truncated_output(child, ctrl_c, timeout, stdout_stream).await + consume_truncated_output(child, timeout, stdout_stream).await } /// Consumes the output of a child process, truncating it so it is suitable for /// use as the output of a `shell` tool call. Also enforces specified timeout. pub(crate) async fn consume_truncated_output( mut child: Child, - ctrl_c: Arc, timeout: Duration, stdout_stream: Option, ) -> Result { @@ -352,7 +347,6 @@ pub(crate) async fn consume_truncated_output( true, )); - let interrupted = ctrl_c.notified(); let exit_status = tokio::select! { result = tokio::time::timeout(timeout, child.wait()) => { match result { @@ -366,7 +360,7 @@ pub(crate) async fn consume_truncated_output( } } } - _ = interrupted => { + _ = tokio::signal::ctrl_c() => { child.start_kill()?; synthetic_exit_status(128 + SIGKILL_CODE) } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index f3247c3887..aeab49702c 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -11,9 +11,8 @@ mod chat_completions; mod client; mod client_common; pub mod codex; -pub use codex::Codex; -pub use codex::CodexSpawnOk; -pub mod codex_wrapper; +mod codex_conversation; +pub use codex_conversation::CodexConversation; pub mod config; pub mod config_profile; pub mod config_types; @@ -34,6 +33,9 @@ pub use model_provider_info::ModelProviderInfo; pub use model_provider_info::WireApi; pub use model_provider_info::built_in_model_providers; pub use model_provider_info::create_oss_provider_with_base_url; +mod conversation_manager; +pub use conversation_manager::ConversationManager; +pub use conversation_manager::NewConversation; pub mod model_family; mod models; mod openai_model_info; diff --git a/codex-rs/core/src/shell.rs b/codex-rs/core/src/shell.rs index cc58eb7460..4a4e0146da 100644 --- a/codex-rs/core/src/shell.rs +++ b/codex-rs/core/src/shell.rs @@ -167,9 +167,6 @@ mod tests { for (input, expected_cmd, expected_output) in cases { use std::collections::HashMap; use std::path::PathBuf; - use std::sync::Arc; - - use tokio::sync::Notify; use crate::exec::ExecParams; use crate::exec::SandboxType; @@ -219,7 +216,6 @@ mod tests { justification: None, }, SandboxType::None, - Arc::new(Notify::new()), &SandboxPolicy::DangerFullAccess, &None, None, diff --git a/codex-rs/core/src/util.rs b/codex-rs/core/src/util.rs index fb5f45de6f..e1248a4867 100644 --- a/codex-rs/core/src/util.rs +++ b/codex-rs/core/src/util.rs @@ -1,32 +1,11 @@ use std::path::Path; -use std::sync::Arc; use std::time::Duration; use rand::Rng; -use tokio::sync::Notify; -use tracing::debug; const INITIAL_DELAY_MS: u64 = 200; const BACKOFF_FACTOR: f64 = 2.0; -/// Make a CancellationToken that is fulfilled when SIGINT occurs. -pub fn notify_on_sigint() -> Arc { - let notify = Arc::new(Notify::new()); - - tokio::spawn({ - let notify = Arc::clone(¬ify); - async move { - loop { - tokio::signal::ctrl_c().await.ok(); - debug!("Keyboard interrupt"); - notify.notify_waiters(); - } - } - }); - - notify -} - pub(crate) fn backoff(attempt: u64) -> Duration { let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32); let base = (INITIAL_DELAY_MS as f64 * exp) as u64; diff --git a/codex-rs/core/tests/client.rs b/codex-rs/core/tests/client.rs index bd7ad2feef..1bcddf0796 100644 --- a/codex-rs/core/tests/client.rs +++ b/codex-rs/core/tests/client.rs @@ -1,14 +1,13 @@ #![allow(clippy::expect_used, clippy::unwrap_used)] -use codex_core::Codex; -use codex_core::CodexSpawnOk; +use codex_core::ConversationManager; use codex_core::ModelProviderInfo; +use codex_core::NewConversation; use codex_core::WireApi; use codex_core::built_in_model_providers; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; -use codex_core::protocol::SessionConfiguredEvent; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_login::CodexAuth; use core_test_support::load_default_config_for_test; @@ -90,14 +89,15 @@ async fn includes_session_id_and_model_headers_in_request() { let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - ctrl_c.clone(), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let NewConversation { + conversation: codex, + conversation_id, + session_configured: _, + } = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .expect("create new conversation"); codex .submit(Op::UserInput { @@ -108,13 +108,6 @@ async fn includes_session_id_and_model_headers_in_request() { .await .unwrap(); - let EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, .. }) = - wait_for_event(&codex, |ev| matches!(ev, EventMsg::SessionConfigured(_))).await - else { - unreachable!() - }; - - let current_session_id = Some(session_id.to_string()); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // get request from the server @@ -123,10 +116,9 @@ async fn includes_session_id_and_model_headers_in_request() { let request_authorization = request.headers.get("authorization").unwrap(); let request_originator = request.headers.get("originator").unwrap(); - assert!(current_session_id.is_some()); assert_eq!( request_session_id.to_str().unwrap(), - current_session_id.as_ref().unwrap() + conversation_id.to_string() ); assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs"); assert_eq!( @@ -164,14 +156,12 @@ async fn includes_base_instructions_override_in_request() { config.base_instructions = Some("test instructions".to_string()); config.model_provider = model_provider; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - ctrl_c.clone(), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .expect("create new conversation") + .conversation; codex .submit(Op::UserInput { @@ -223,14 +213,12 @@ async fn originator_config_override_is_used() { config.model_provider = model_provider; config.internal_originator = Some("my_override".to_string()); - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - ctrl_c.clone(), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .expect("create new conversation") + .conversation; codex .submit(Op::UserInput { @@ -283,11 +271,15 @@ async fn chatgpt_auth_sends_correct_request() { let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = - Codex::spawn(config, Some(create_dummy_codex_auth()), ctrl_c.clone()) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let NewConversation { + conversation: codex, + conversation_id, + session_configured: _, + } = conversation_manager + .new_conversation_with_auth(config, Some(create_dummy_codex_auth())) + .await + .expect("create new conversation"); codex .submit(Op::UserInput { @@ -298,13 +290,6 @@ async fn chatgpt_auth_sends_correct_request() { .await .unwrap(); - let EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, .. }) = - wait_for_event(&codex, |ev| matches!(ev, EventMsg::SessionConfigured(_))).await - else { - unreachable!() - }; - - let current_session_id = Some(session_id.to_string()); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // get request from the server @@ -315,10 +300,9 @@ async fn chatgpt_auth_sends_correct_request() { let request_chatgpt_account_id = request.headers.get("chatgpt-account-id").unwrap(); let request_body = request.body_json::().unwrap(); - assert!(current_session_id.is_some()); assert_eq!( request_session_id.to_str().unwrap(), - current_session_id.as_ref().unwrap() + conversation_id.to_string() ); assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs"); assert_eq!( @@ -361,14 +345,12 @@ async fn includes_user_instructions_message_in_request() { config.model_provider = model_provider; config.user_instructions = Some("be nice".to_string()); - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - ctrl_c.clone(), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .expect("create new conversation") + .conversation; codex .submit(Op::UserInput { @@ -457,8 +439,12 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { let mut config = load_default_config_for_test(&codex_home); config.model_provider = provider; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn(config, None, ctrl_c.clone()).await.unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, None) + .await + .expect("create new conversation") + .conversation; codex .submit(Op::UserInput { @@ -531,11 +517,12 @@ async fn env_var_overrides_loaded_auth() { let mut config = load_default_config_for_test(&codex_home); config.model_provider = provider; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = - Codex::spawn(config, Some(create_dummy_codex_auth()), ctrl_c.clone()) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(create_dummy_codex_auth())) + .await + .expect("create new conversation") + .conversation; codex .submit(Op::UserInput { diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 18bae310be..04b6dc4b62 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -2,6 +2,7 @@ use tempfile::TempDir; +use codex_core::CodexConversation; use codex_core::config::Config; use codex_core::config::ConfigOverrides; use codex_core::config::ConfigToml; @@ -72,7 +73,7 @@ pub fn load_sse_fixture_with_id(path: impl AsRef, id: &str) -> } pub async fn wait_for_event( - codex: &codex_core::Codex, + codex: &CodexConversation, predicate: F, ) -> codex_core::protocol::EventMsg where @@ -83,7 +84,7 @@ where } pub async fn wait_for_event_with_timeout( - codex: &codex_core::Codex, + codex: &CodexConversation, mut predicate: F, wait_time: tokio::time::Duration, ) -> codex_core::protocol::EventMsg diff --git a/codex-rs/core/tests/compact.rs b/codex-rs/core/tests/compact.rs index fa5c81d883..28b1ca8d74 100644 --- a/codex-rs/core/tests/compact.rs +++ b/codex-rs/core/tests/compact.rs @@ -1,7 +1,6 @@ #![expect(clippy::unwrap_used)] -use codex_core::Codex; -use codex_core::CodexSpawnOk; +use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::built_in_model_providers; use codex_core::protocol::EventMsg; @@ -142,14 +141,12 @@ async fn summarize_context_three_requests_and_instructions() { let home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&home); config.model_provider = model_provider; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("dummy")), - ctrl_c.clone(), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("dummy"))) + .await + .unwrap() + .conversation; // 1) Normal user input – should hit server once. codex diff --git a/codex-rs/core/tests/exec.rs b/codex-rs/core/tests/exec.rs index 9bead7ef60..ff5d2b7d54 100644 --- a/codex-rs/core/tests/exec.rs +++ b/codex-rs/core/tests/exec.rs @@ -2,7 +2,6 @@ #![expect(clippy::unwrap_used, clippy::expect_used)] use std::collections::HashMap; -use std::sync::Arc; use codex_core::exec::ExecParams; use codex_core::exec::ExecToolCallOutput; @@ -11,7 +10,6 @@ use codex_core::exec::process_exec_tool_call; use codex_core::protocol::SandboxPolicy; use codex_core::spawn::CODEX_SANDBOX_ENV_VAR; use tempfile::TempDir; -use tokio::sync::Notify; use codex_core::error::Result; @@ -39,10 +37,9 @@ async fn run_test_cmd(tmp: TempDir, cmd: Vec<&str>) -> Result) -> Vec { let mut out = Vec::new(); @@ -57,13 +55,11 @@ async fn test_exec_stdout_stream_events_echo() { justification: None, }; - let ctrl_c = Arc::new(Notify::new()); let policy = SandboxPolicy::new_read_only_policy(); let result = process_exec_tool_call( params, SandboxType::None, - ctrl_c, &policy, &None, Some(stdout_stream), @@ -109,13 +105,11 @@ async fn test_exec_stderr_stream_events_echo() { justification: None, }; - let ctrl_c = Arc::new(Notify::new()); let policy = SandboxPolicy::new_read_only_policy(); let result = process_exec_tool_call( params, SandboxType::None, - ctrl_c, &policy, &None, Some(stdout_stream), diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs deleted file mode 100644 index 81b3bb2a12..0000000000 --- a/codex-rs/core/tests/live_agent.rs +++ /dev/null @@ -1,209 +0,0 @@ -#![expect(clippy::unwrap_used, clippy::expect_used)] - -//! Live integration tests that exercise the full [`Agent`] stack **against the real -//! OpenAI `/v1/responses` API**. These tests complement the lightweight mock‑based -//! unit tests by verifying that the agent can drive an end‑to‑end conversation, -//! stream incremental events, execute function‑call tool invocations and safely -//! chain multiple turns inside a single session – the exact scenarios that have -//! historically been brittle. -//! -//! The live tests are **ignored by default** so CI remains deterministic and free -//! of external dependencies. Developers can opt‑in locally with e.g. -//! -//! ```bash -//! OPENAI_API_KEY=sk‑... cargo test --test live_agent -- --ignored --nocapture -//! ``` -//! -//! Make sure your key has access to the experimental *Responses* API and that -//! any billable usage is acceptable. - -use std::time::Duration; - -use codex_core::Codex; -use codex_core::CodexSpawnOk; -use codex_core::error::CodexErr; -use codex_core::protocol::AgentMessageEvent; -use codex_core::protocol::ErrorEvent; -use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; -use codex_core::protocol::Op; -use core_test_support::load_default_config_for_test; -use tempfile::TempDir; -use tokio::sync::Notify; -use tokio::time::timeout; - -fn api_key_available() -> bool { - std::env::var("OPENAI_API_KEY").is_ok() -} - -/// Helper that spawns a fresh Agent and sends the mandatory *ConfigureSession* -/// submission. The caller receives the constructed [`Agent`] plus the unique -/// submission id used for the initialization message. -async fn spawn_codex() -> Result { - assert!( - api_key_available(), - "OPENAI_API_KEY must be set for live tests" - ); - - let codex_home = TempDir::new().unwrap(); - let mut config = load_default_config_for_test(&codex_home); - config.model_provider.request_max_retries = Some(2); - config.model_provider.stream_max_retries = Some(2); - let CodexSpawnOk { codex: agent, .. } = - Codex::spawn(config, None, std::sync::Arc::new(Notify::new())).await?; - - Ok(agent) -} - -/// Verifies that the agent streams incremental *AgentMessage* events **before** -/// emitting `TaskComplete` and that a second task inside the same session does -/// not get tripped up by a stale `previous_response_id`. -#[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn live_streaming_and_prev_id_reset() { - if !api_key_available() { - eprintln!("skipping live_streaming_and_prev_id_reset – OPENAI_API_KEY not set"); - return; - } - - let codex = spawn_codex().await.unwrap(); - - // ---------- Task 1 ---------- - codex - .submit(Op::UserInput { - items: vec![InputItem::Text { - text: "Say the words 'stream test'".into(), - }], - }) - .await - .unwrap(); - - let mut saw_message_before_complete = false; - loop { - let ev = timeout(Duration::from_secs(60), codex.next_event()) - .await - .expect("timeout waiting for task1 events") - .expect("agent closed"); - - match ev.msg { - EventMsg::AgentMessage(_) => saw_message_before_complete = true, - EventMsg::TaskComplete(_) => break, - EventMsg::Error(ErrorEvent { message }) => { - panic!("agent reported error in task1: {message}") - } - _ => { - // Ignore other events. - } - } - } - - assert!( - saw_message_before_complete, - "Agent did not stream any AgentMessage before TaskComplete" - ); - - // ---------- Task 2 (same session) ---------- - codex - .submit(Op::UserInput { - items: vec![InputItem::Text { - text: "Respond with exactly: second turn succeeded".into(), - }], - }) - .await - .unwrap(); - - let mut got_expected = false; - loop { - let ev = timeout(Duration::from_secs(60), codex.next_event()) - .await - .expect("timeout waiting for task2 events") - .expect("agent closed"); - - match &ev.msg { - EventMsg::AgentMessage(AgentMessageEvent { message }) - if message.contains("second turn succeeded") => - { - got_expected = true; - } - EventMsg::TaskComplete(_) => break, - EventMsg::Error(ErrorEvent { message }) => { - panic!("agent reported error in task2: {message}") - } - _ => { - // Ignore other events. - } - } - } - - assert!(got_expected, "second task did not receive expected answer"); -} - -/// Exercises a *function‑call → shell execution* round‑trip by instructing the -/// model to run a harmless `echo` command. The test asserts that: -/// 1. the function call is executed (we see `ExecCommandBegin`/`End` events) -/// 2. the captured stdout reaches the client unchanged. -#[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn live_shell_function_call() { - if !api_key_available() { - eprintln!("skipping live_shell_function_call – OPENAI_API_KEY not set"); - return; - } - - let codex = spawn_codex().await.unwrap(); - - const MARKER: &str = "codex_live_echo_ok"; - - codex - .submit(Op::UserInput { - items: vec![InputItem::Text { - text: format!( - "Use the shell function to run the command `echo {MARKER}` and no other commands." - ), - }], - }) - .await - .unwrap(); - - let mut saw_begin = false; - let mut saw_end_with_output = false; - - loop { - let ev = timeout(Duration::from_secs(60), codex.next_event()) - .await - .expect("timeout waiting for function‑call events") - .expect("agent closed"); - - match ev.msg { - EventMsg::ExecCommandBegin(codex_core::protocol::ExecCommandBeginEvent { - command, - .. - }) => { - assert_eq!(command, vec!["echo", MARKER]); - saw_begin = true; - } - EventMsg::ExecCommandEnd(codex_core::protocol::ExecCommandEndEvent { - stdout, - exit_code, - .. - }) => { - assert_eq!(exit_code, 0, "echo returned non‑zero exit code"); - assert!(stdout.contains(MARKER)); - saw_end_with_output = true; - } - EventMsg::TaskComplete(_) => break, - EventMsg::Error(codex_core::protocol::ErrorEvent { message }) => { - panic!("agent error during shell test: {message}") - } - _ => { - // Ignore other events. - } - } - } - - assert!(saw_begin, "ExecCommandBegin event missing"); - assert!( - saw_end_with_output, - "ExecCommandEnd with expected output missing" - ); -} diff --git a/codex-rs/core/tests/prompt_caching.rs b/codex-rs/core/tests/prompt_caching.rs index f460fc3004..8df7ea353d 100644 --- a/codex-rs/core/tests/prompt_caching.rs +++ b/codex-rs/core/tests/prompt_caching.rs @@ -1,7 +1,6 @@ #![allow(clippy::expect_used, clippy::unwrap_used)] -use codex_core::Codex; -use codex_core::CodexSpawnOk; +use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::built_in_model_providers; use codex_core::protocol::EventMsg; @@ -55,14 +54,12 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests config.model_provider = model_provider; config.user_instructions = Some("be consistent and helpful".to_string()); - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - ctrl_c.clone(), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .expect("create new conversation") + .conversation; codex .submit(Op::UserInput { diff --git a/codex-rs/core/tests/stream_error_allows_next_turn.rs b/codex-rs/core/tests/stream_error_allows_next_turn.rs index 1500c789e9..d590c43312 100644 --- a/codex-rs/core/tests/stream_error_allows_next_turn.rs +++ b/codex-rs/core/tests/stream_error_allows_next_turn.rs @@ -1,7 +1,6 @@ use std::time::Duration; -use codex_core::Codex; -use codex_core::CodexSpawnOk; +use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::WireApi; use codex_core::protocol::EventMsg; @@ -90,13 +89,12 @@ async fn continue_after_stream_error() { config.base_instructions = Some("You are a helpful assistant".to_string()); config.model_provider = provider; - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - std::sync::Arc::new(tokio::sync::Notify::new()), - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .unwrap() + .conversation; codex .submit(Op::UserInput { diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index 0ded3337ab..5fa8ab8ba1 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -3,8 +3,7 @@ use std::time::Duration; -use codex_core::Codex; -use codex_core::CodexSpawnOk; +use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; @@ -93,17 +92,15 @@ async fn retries_on_early_close() { requires_openai_auth: false, }; - let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; - let CodexSpawnOk { codex, .. } = Codex::spawn( - config, - Some(CodexAuth::from_api_key("Test API Key")), - ctrl_c, - ) - .await - .unwrap(); + let conversation_manager = ConversationManager::default(); + let codex = conversation_manager + .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) + .await + .unwrap() + .conversation; codex .submit(Op::UserInput { diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 6ed57898b2..ff6123d74b 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -6,12 +6,11 @@ mod event_processor_with_json_output; use std::io::IsTerminal; use std::io::Read; use std::path::PathBuf; -use std::sync::Arc; pub use cli::Cli; use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID; -use codex_core::codex_wrapper::CodexConversation; -use codex_core::codex_wrapper::{self}; +use codex_core::ConversationManager; +use codex_core::NewConversation; use codex_core::config::Config; use codex_core::config::ConfigOverrides; use codex_core::config_types::SandboxMode; @@ -185,35 +184,30 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any std::process::exit(1); } - let CodexConversation { - codex: codex_wrapper, + let conversation_manager = ConversationManager::default(); + let NewConversation { + conversation_id: _, + conversation, session_configured, - ctrl_c, - .. - } = codex_wrapper::init_codex(config).await?; - let codex = Arc::new(codex_wrapper); + } = conversation_manager.new_conversation(config).await?; info!("Codex initialized with event: {session_configured:?}"); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); { - let codex = codex.clone(); + let conversation = conversation.clone(); tokio::spawn(async move { loop { - let interrupted = ctrl_c.notified(); tokio::select! { - _ = interrupted => { - // Forward an interrupt to the codex so it can abort any in‑flight task. - let _ = codex - .submit( - Op::Interrupt, - ) - .await; + _ = tokio::signal::ctrl_c() => { + tracing::debug!("Keyboard interrupt"); + // Immediately notify Codex to abort any in‑flight task. + conversation.submit(Op::Interrupt).await.ok(); - // Exit the inner loop and return to the main input prompt. The codex + // Exit the inner loop and return to the main input prompt. The codex // will emit a `TurnInterrupted` (Error) event which is drained later. break; } - res = codex.next_event() => match res { + res = conversation.next_event() => match res { Ok(event) => { debug!("Received event: {event:?}"); @@ -243,9 +237,9 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any .into_iter() .map(|path| InputItem::LocalImage { path }) .collect(); - let initial_images_event_id = codex.submit(Op::UserInput { items }).await?; + let initial_images_event_id = conversation.submit(Op::UserInput { items }).await?; info!("Sent images with event ID: {initial_images_event_id}"); - while let Ok(event) = codex.next_event().await { + while let Ok(event) = conversation.next_event().await { if event.id == initial_images_event_id && matches!( event.msg, @@ -261,7 +255,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any // Send the prompt. let items: Vec = vec![InputItem::Text { text: prompt }]; - let initial_prompt_task_id = codex.submit(Op::UserInput { items }).await?; + let initial_prompt_task_id = conversation.submit(Op::UserInput { items }).await?; info!("Sent prompt with event ID: {initial_prompt_task_id}"); // Run the loop until the task is complete. @@ -270,7 +264,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any match shutdown { CodexStatus::Running => continue, CodexStatus::InitiateShutdown => { - codex.submit(Op::Shutdown).await?; + conversation.submit(Op::Shutdown).await?; } CodexStatus::Shutdown => { break; diff --git a/codex-rs/linux-sandbox/tests/landlock.rs b/codex-rs/linux-sandbox/tests/landlock.rs index c7081dbca2..76de2547a6 100644 --- a/codex-rs/linux-sandbox/tests/landlock.rs +++ b/codex-rs/linux-sandbox/tests/landlock.rs @@ -11,9 +11,7 @@ use codex_core::exec_env::create_env; use codex_core::protocol::SandboxPolicy; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; use tempfile::NamedTempFile; -use tokio::sync::Notify; // At least on GitHub CI, the arm64 tests appear to need longer timeouts. @@ -59,11 +57,9 @@ async fn run_cmd(cmd: &[&str], writable_roots: &[PathBuf], timeout_ms: u64) { }; let sandbox_program = env!("CARGO_BIN_EXE_codex-linux-sandbox"); let codex_linux_sandbox_exe = Some(PathBuf::from(sandbox_program)); - let ctrl_c = Arc::new(Notify::new()); let res = process_exec_tool_call( params, SandboxType::LinuxSeccomp, - ctrl_c, &sandbox_policy, &codex_linux_sandbox_exe, None, @@ -150,13 +146,11 @@ async fn assert_network_blocked(cmd: &[&str]) { }; let sandbox_policy = SandboxPolicy::new_read_only_policy(); - let ctrl_c = Arc::new(Notify::new()); let sandbox_program = env!("CARGO_BIN_EXE_codex-linux-sandbox"); let codex_linux_sandbox_exe: Option = Some(PathBuf::from(sandbox_program)); let result = process_exec_tool_call( params, SandboxType::LinuxSeccomp, - ctrl_c, &sandbox_policy, &codex_linux_sandbox_exe, None, diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index edb5a205e8..ced01539bb 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -5,12 +5,13 @@ use std::collections::HashMap; use std::sync::Arc; -use codex_core::Codex; -use codex_core::codex_wrapper::CodexConversation; -use codex_core::codex_wrapper::init_codex; +use codex_core::CodexConversation; +use codex_core::ConversationManager; +use codex_core::NewConversation; use codex_core::config::Config as CodexConfig; use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::ApplyPatchApprovalRequestEvent; +use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::InputItem; @@ -41,15 +42,14 @@ pub async fn run_codex_tool_session( initial_prompt: String, config: CodexConfig, outgoing: Arc, - session_map: Arc>>>, + conversation_manager: Arc, running_requests_id_to_codex_uuid: Arc>>, ) { - let CodexConversation { - codex, + let NewConversation { + conversation_id, + conversation, session_configured, - session_id, - .. - } = match init_codex(config).await { + } = match conversation_manager.new_conversation(config).await { Ok(res) => res, Err(e) => { let result = CallToolResult { @@ -65,16 +65,15 @@ pub async fn run_codex_tool_session( return; } }; - let codex = Arc::new(codex); - - // update the session map so we can retrieve the session in a reply, and then drop it, since - // we no longer need it for this function - session_map.lock().await.insert(session_id, codex.clone()); - drop(session_map); + let session_configured_event = Event { + // Use a fake id value for now. + id: "".to_string(), + msg: EventMsg::SessionConfigured(session_configured.clone()), + }; outgoing .send_event_as_notification( - &session_configured, + &session_configured_event, Some(OutgoingNotificationMeta::new(Some(id.clone()))), ) .await; @@ -89,7 +88,7 @@ pub async fn run_codex_tool_session( running_requests_id_to_codex_uuid .lock() .await - .insert(id.clone(), session_id); + .insert(id.clone(), conversation_id); let submission = Submission { id: sub_id.clone(), op: Op::UserInput { @@ -99,18 +98,24 @@ pub async fn run_codex_tool_session( }, }; - if let Err(e) = codex.submit_with_id(submission).await { + if let Err(e) = conversation.submit_with_id(submission).await { tracing::error!("Failed to submit initial prompt: {e}"); // unregister the id so we don't keep it in the map running_requests_id_to_codex_uuid.lock().await.remove(&id); return; } - run_codex_tool_session_inner(codex, outgoing, id, running_requests_id_to_codex_uuid).await; + run_codex_tool_session_inner( + conversation, + outgoing, + id, + running_requests_id_to_codex_uuid, + ) + .await; } pub async fn run_codex_tool_session_reply( - codex: Arc, + conversation: Arc, outgoing: Arc, request_id: RequestId, prompt: String, @@ -121,7 +126,7 @@ pub async fn run_codex_tool_session_reply( .lock() .await .insert(request_id.clone(), session_id); - if let Err(e) = codex + if let Err(e) = conversation .submit(Op::UserInput { items: vec![InputItem::Text { text: prompt }], }) @@ -137,7 +142,7 @@ pub async fn run_codex_tool_session_reply( } run_codex_tool_session_inner( - codex, + conversation, outgoing, request_id, running_requests_id_to_codex_uuid, @@ -146,7 +151,7 @@ pub async fn run_codex_tool_session_reply( } async fn run_codex_tool_session_inner( - codex: Arc, + codex: Arc, outgoing: Arc, request_id: RequestId, running_requests_id_to_codex_uuid: Arc>>, diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index d5c414bf74..61f0c95ad4 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -4,7 +4,7 @@ use crate::exec_approval::handle_exec_approval_request; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotificationMeta; use crate::patch_approval::handle_patch_approval_request; -use codex_core::Codex; +use codex_core::CodexConversation; use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::EventMsg; @@ -13,7 +13,7 @@ use mcp_types::RequestId; use tracing::error; pub async fn run_conversation_loop( - codex: Arc, + codex: Arc, outgoing: Arc, request_id: RequestId, ) { diff --git a/codex-rs/mcp-server/src/exec_approval.rs b/codex-rs/mcp-server/src/exec_approval.rs index 54abfb35bd..1985cc79e6 100644 --- a/codex-rs/mcp-server/src/exec_approval.rs +++ b/codex-rs/mcp-server/src/exec_approval.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use std::sync::Arc; -use codex_core::Codex; +use codex_core::CodexConversation; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; use mcp_types::ElicitRequest; @@ -51,7 +51,7 @@ pub(crate) async fn handle_exec_approval_request( command: Vec, cwd: PathBuf, outgoing: Arc, - codex: Arc, + codex: Arc, request_id: RequestId, tool_call_id: String, event_id: String, @@ -116,7 +116,7 @@ pub(crate) async fn handle_exec_approval_request( async fn on_exec_approval_response( event_id: String, receiver: tokio::sync::oneshot::Receiver, - codex: Arc, + codex: Arc, ) { let response = receiver.await; let value = match response { diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 2f99cda723..a3349aeb35 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -14,7 +14,7 @@ use crate::outgoing_message::OutgoingMessageSender; use crate::tool_handlers::create_conversation::handle_create_conversation; use crate::tool_handlers::send_message::handle_send_message; -use codex_core::Codex; +use codex_core::ConversationManager; use codex_core::config::Config as CodexConfig; use codex_core::protocol::Submission; use mcp_types::CallToolRequest; @@ -42,7 +42,7 @@ pub(crate) struct MessageProcessor { outgoing: Arc, initialized: bool, codex_linux_sandbox_exe: Option, - session_map: Arc>>>, + conversation_manager: Arc, running_requests_id_to_codex_uuid: Arc>>, running_session_ids: Arc>>, } @@ -58,14 +58,14 @@ impl MessageProcessor { outgoing: Arc::new(outgoing), initialized: false, codex_linux_sandbox_exe, - session_map: Arc::new(Mutex::new(HashMap::new())), + conversation_manager: Arc::new(ConversationManager::default()), running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())), running_session_ids: Arc::new(Mutex::new(HashSet::new())), } } - pub(crate) fn session_map(&self) -> Arc>>> { - self.session_map.clone() + pub(crate) fn get_conversation_manager(&self) -> &ConversationManager { + &self.conversation_manager } pub(crate) fn outgoing(&self) -> Arc { @@ -431,9 +431,9 @@ impl MessageProcessor { } }; - // Clone outgoing and session map to move into async task. + // Clone outgoing and server to move into async task. let outgoing = self.outgoing.clone(); - let session_map = self.session_map.clone(); + let conversation_manager = self.conversation_manager.clone(); let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone(); // Spawn an async task to handle the Codex session so that we do not @@ -445,7 +445,7 @@ impl MessageProcessor { initial_prompt, config, outgoing, - session_map, + conversation_manager, running_requests_id_to_codex_uuid, ) .await; @@ -516,33 +516,27 @@ impl MessageProcessor { } }; - // load codex from session map - let session_map_mutex = Arc::clone(&self.session_map); - - // Clone outgoing and session map to move into async task. + // Clone outgoing to move into async task. let outgoing = self.outgoing.clone(); let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone(); - let codex = { - let session_map = session_map_mutex.lock().await; - match session_map.get(&session_id).cloned() { - Some(c) => c, - None => { - tracing::warn!("Session not found for session_id: {session_id}"); - let result = CallToolResult { - content: vec![ContentBlock::TextContent(TextContent { - r#type: "text".to_owned(), - text: format!("Session not found for session_id: {session_id}"), - annotations: None, - })], - is_error: Some(true), - structured_content: None, - }; - outgoing - .send_response(request_id, serde_json::to_value(result).unwrap_or_default()) - .await; - return; - } + let codex = match self.conversation_manager.get_conversation(session_id).await { + Ok(c) => c, + Err(_) => { + tracing::warn!("Session not found for session_id: {session_id}"); + let result = CallToolResult { + content: vec![ContentBlock::TextContent(TextContent { + r#type: "text".to_owned(), + text: format!("Session not found for session_id: {session_id}"), + annotations: None, + })], + is_error: Some(true), + structured_content: None, + }; + outgoing + .send_response(request_id, serde_json::to_value(result).unwrap_or_default()) + .await; + return; } }; @@ -609,15 +603,12 @@ impl MessageProcessor { }; tracing::info!("session_id: {session_id}"); - // Obtain the Codex Arc while holding the session_map lock, then release. - let codex_arc = { - let sessions_guard = self.session_map.lock().await; - match sessions_guard.get(&session_id) { - Some(codex) => Arc::clone(codex), - None => { - tracing::warn!("Session not found for session_id: {session_id}"); - return; - } + // Obtain the Codex conversation from the server. + let codex_arc = match self.conversation_manager.get_conversation(session_id).await { + Ok(c) => c, + Err(_) => { + tracing::warn!("Session not found for session_id: {session_id}"); + return; } }; diff --git a/codex-rs/mcp-server/src/patch_approval.rs b/codex-rs/mcp-server/src/patch_approval.rs index db99ee5f27..3c614ab331 100644 --- a/codex-rs/mcp-server/src/patch_approval.rs +++ b/codex-rs/mcp-server/src/patch_approval.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use codex_core::Codex; +use codex_core::CodexConversation; use codex_core::protocol::FileChange; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; @@ -47,7 +47,7 @@ pub(crate) async fn handle_patch_approval_request( grant_root: Option, changes: HashMap, outgoing: Arc, - codex: Arc, + codex: Arc, request_id: RequestId, tool_call_id: String, event_id: String, @@ -111,7 +111,7 @@ pub(crate) async fn handle_patch_approval_request( pub(crate) async fn on_patch_approval_response( event_id: String, receiver: tokio::sync::oneshot::Receiver, - codex: Arc, + codex: Arc, ) { let response = receiver.await; let value = match response { diff --git a/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs b/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs index 559bf72905..77a68f0128 100644 --- a/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs +++ b/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs @@ -1,16 +1,9 @@ -use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; -use codex_core::Codex; -use codex_core::codex_wrapper::init_codex; +use codex_core::NewConversation; use codex_core::config::Config as CodexConfig; use codex_core::config::ConfigOverrides; -use codex_core::protocol::EventMsg; -use codex_core::protocol::SessionConfiguredEvent; use mcp_types::RequestId; -use tokio::sync::Mutex; -use uuid::Uuid; use crate::conversation_loop::run_conversation_loop; use crate::json_to_toml::json_to_toml; @@ -81,8 +74,16 @@ pub(crate) async fn handle_create_conversation( } }; - // Initialize Codex session - let codex_conversation = match init_codex(cfg).await { + // Initialize Codex session via server API + let NewConversation { + conversation_id: session_id, + conversation, + session_configured, + } = match message_processor + .get_conversation_manager() + .new_conversation(cfg) + .await + { Ok(conv) => conv, Err(e) => { message_processor @@ -100,41 +101,13 @@ pub(crate) async fn handle_create_conversation( } }; - // Expect SessionConfigured; if not, return error. - let EventMsg::SessionConfigured(SessionConfiguredEvent { model, .. }) = - &codex_conversation.session_configured.msg - else { - message_processor - .send_response_with_optional_error( - id, - Some(ToolCallResponseResult::ConversationCreate( - ConversationCreateResult::Error { - message: "Expected SessionConfigured event".to_string(), - }, - )), - Some(true), - ) - .await; - return; - }; - - let effective_model = model.clone(); - - let session_id = codex_conversation.session_id; - let codex_arc = Arc::new(codex_conversation.codex); + let effective_model = session_configured.model.clone(); - // Store session for future calls - insert_session( - session_id, - codex_arc.clone(), - message_processor.session_map(), - ) - .await; // Run the conversation loop in the background so this request can return immediately. let outgoing = message_processor.outgoing(); let spawn_id = id.clone(); tokio::spawn(async move { - run_conversation_loop(codex_arc.clone(), outgoing, spawn_id).await; + run_conversation_loop(conversation.clone(), outgoing, spawn_id).await; }); // Reply with the new conversation id and effective model @@ -151,12 +124,3 @@ pub(crate) async fn handle_create_conversation( ) .await; } - -async fn insert_session( - session_id: Uuid, - codex: Arc, - session_map: Arc>>>, -) { - let mut guard = session_map.lock().await; - guard.insert(session_id, codex); -} diff --git a/codex-rs/mcp-server/src/tool_handlers/send_message.rs b/codex-rs/mcp-server/src/tool_handlers/send_message.rs index 894176bef6..985854f852 100644 --- a/codex-rs/mcp-server/src/tool_handlers/send_message.rs +++ b/codex-rs/mcp-server/src/tool_handlers/send_message.rs @@ -1,12 +1,6 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use codex_core::Codex; use codex_core::protocol::Op; use codex_core::protocol::Submission; use mcp_types::RequestId; -use tokio::sync::Mutex; -use uuid::Uuid; use crate::mcp_protocol::ConversationSendMessageArgs; use crate::mcp_protocol::ConversationSendMessageResult; @@ -41,7 +35,11 @@ pub(crate) async fn handle_send_message( } let session_id = conversation_id.0; - let Some(codex) = get_session(session_id, message_processor.session_map()).await else { + let Ok(codex) = message_processor + .get_conversation_manager() + .get_conversation(session_id) + .await + else { message_processor .send_response_with_optional_error( id, @@ -114,11 +112,3 @@ pub(crate) async fn handle_send_message( ) .await; } - -pub(crate) async fn get_session( - session_id: Uuid, - session_map: Arc>>>, -) -> Option> { - let guard = session_map.lock().await; - guard.get(&session_id).cloned() -} diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 69d27022fa..fb45ecfd19 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -9,6 +9,7 @@ use crate::onboarding::onboarding_screen::OnboardingScreenArgs; use crate::should_show_login_screen; use crate::slash_command::SlashCommand; use crate::tui; +use codex_core::ConversationManager; use codex_core::config::Config; use codex_core::protocol::Event; use codex_core::protocol::Op; @@ -48,6 +49,7 @@ enum AppState<'a> { } pub(crate) struct App<'a> { + server: Arc, app_event_tx: AppEventSender, app_event_rx: Receiver, app_state: AppState<'a>, @@ -85,6 +87,8 @@ impl App<'_> { initial_images: Vec, show_trust_screen: bool, ) -> Self { + let conversation_manager = Arc::new(ConversationManager::default()); + let (app_event_tx, app_event_rx) = channel(); let app_event_tx = AppEventSender::new(app_event_tx); let pending_redraw = Arc::new(AtomicBool::new(false)); @@ -153,6 +157,7 @@ impl App<'_> { } else { let chat_widget = ChatWidget::new( config.clone(), + conversation_manager.clone(), app_event_tx.clone(), initial_prompt, initial_images, @@ -165,6 +170,7 @@ impl App<'_> { let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone()); Self { + server: conversation_manager, app_event_tx, pending_history_lines: Vec::new(), app_event_rx, @@ -328,6 +334,7 @@ impl App<'_> { // User accepted – switch to chat view. let new_widget = Box::new(ChatWidget::new( self.config.clone(), + self.server.clone(), self.app_event_tx.clone(), None, Vec::new(), @@ -449,6 +456,7 @@ impl App<'_> { self.app_state = AppState::Chat { widget: Box::new(ChatWidget::new( config, + self.server.clone(), app_event_tx.clone(), initial_prompt, initial_images, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 0d6911d202..05acdff7d1 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::path::PathBuf; +use std::sync::Arc; use codex_core::config::Config; use codex_core::parse_command::ParsedCommand; @@ -54,6 +55,7 @@ mod agent; use self::agent::spawn_agent; use crate::streaming::controller::AppEventHistorySink; use crate::streaming::controller::StreamController; +use codex_core::ConversationManager; use codex_file_search::FileMatch; // Track information about an in-flight exec command. @@ -468,12 +470,13 @@ impl ChatWidget<'_> { pub(crate) fn new( config: Config, + conversation_manager: Arc, app_event_tx: AppEventSender, initial_prompt: Option, initial_images: Vec, enhanced_keys_supported: bool, ) -> Self { - let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone()); + let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager); Self { app_event_tx: app_event_tx.clone(), diff --git a/codex-rs/tui/src/chatwidget/agent.rs b/codex-rs/tui/src/chatwidget/agent.rs index 69e1c19c45..eb96864f0d 100644 --- a/codex-rs/tui/src/chatwidget/agent.rs +++ b/codex-rs/tui/src/chatwidget/agent.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use codex_core::codex_wrapper::CodexConversation; -use codex_core::codex_wrapper::init_codex; +use codex_core::ConversationManager; +use codex_core::NewConversation; use codex_core::config::Config; use codex_core::protocol::Op; use tokio::sync::mpsc::UnboundedSender; @@ -12,17 +12,21 @@ use crate::app_event_sender::AppEventSender; /// Spawn the agent bootstrapper and op forwarding loop, returning the /// `UnboundedSender` used by the UI to submit operations. -pub(crate) fn spawn_agent(config: Config, app_event_tx: AppEventSender) -> UnboundedSender { +pub(crate) fn spawn_agent( + config: Config, + app_event_tx: AppEventSender, + server: Arc, +) -> UnboundedSender { let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); let app_event_tx_clone = app_event_tx.clone(); tokio::spawn(async move { - let CodexConversation { - codex, + let NewConversation { + conversation_id: _, + conversation, session_configured, - .. - } = match init_codex(config).await { - Ok(vals) => vals, + } = match server.new_conversation(config).await { + Ok(v) => v, Err(e) => { // TODO: surface this error to the user. tracing::error!("failed to initialize codex: {e}"); @@ -30,21 +34,25 @@ pub(crate) fn spawn_agent(config: Config, app_event_tx: AppEventSender) -> Unbou } }; - // Forward the captured `SessionInitialized` event that was consumed - // inside `init_codex()` so it can be rendered in the UI. - app_event_tx_clone.send(AppEvent::CodexEvent(session_configured.clone())); - let codex = Arc::new(codex); - let codex_clone = codex.clone(); + // Forward the captured `SessionConfigured` event so it can be rendered in the UI. + let ev = codex_core::protocol::Event { + // The `id` does not matter for rendering, so we can use a fake value. + id: "".to_string(), + msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured), + }; + app_event_tx_clone.send(AppEvent::CodexEvent(ev)); + + let conversation_clone = conversation.clone(); tokio::spawn(async move { while let Some(op) = codex_op_rx.recv().await { - let id = codex_clone.submit(op).await; + let id = conversation_clone.submit(op).await; if let Err(e) = id { tracing::error!("failed to submit op: {e}"); } } }); - while let Ok(event) = codex.next_event().await { + while let Ok(event) = conversation.next_event().await { app_event_tx_clone.send(AppEvent::CodexEvent(event)); } }); diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 7349762039..186462ca30 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -102,7 +102,8 @@ async fn helpers_are_available_and_do_not_panic() { let (tx_raw, _rx) = channel::(); let tx = AppEventSender::new(tx_raw); let cfg = test_config(); - let mut w = ChatWidget::new(cfg, tx, None, Vec::new(), false); + let conversation_manager = Arc::new(ConversationManager::default()); + let mut w = ChatWidget::new(cfg, conversation_manager, tx, None, Vec::new(), false); // Basic construction sanity. let _ = &mut w; }