diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index 2e33c8754b..2fc3565c8c 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -22,6 +22,8 @@ use mcp_types::Tool; use serde_json::json; use sha1::Digest; use sha1::Sha1; +use std::sync::Arc; +use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::info; use tracing::warn; @@ -38,6 +40,10 @@ const MAX_TOOL_NAME_LENGTH: usize = 64; /// Timeout for the `tools/list` request. const LIST_TOOLS_TIMEOUT: Duration = Duration::from_secs(10); +/// Maximum number of concurrent `tools/list` requests. +const MAX_CONCURRENT_LIST_TOOLS: usize = 4; +/// Maximum number of MCP servers to start concurrently. +const MAX_CONCURRENT_SERVER_STARTS: usize = 3; /// Map that holds a startup error for every MCP server that could **not** be /// spawned successfully. @@ -111,9 +117,10 @@ impl McpConnectionManager { return Ok((Self::default(), ClientStartErrors::default())); } - // Launch all configured servers concurrently. + // Launch configured servers with limited concurrency and quick backoff retries. let mut join_set = JoinSet::new(); let mut errors = ClientStartErrors::new(); + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_SERVER_STARTS)); for (server_name, cfg) in mcp_servers { // Validate server name before spawning @@ -126,44 +133,72 @@ impl McpConnectionManager { continue; } + let permit = semaphore.clone().acquire_owned(); join_set.spawn(async move { - let McpServerConfig { command, args, env } = cfg; - let client_res = McpClient::new_stdio_client( - command.into(), - args.into_iter().map(OsString::from).collect(), - env, - ) - .await; - match client_res { - Ok(client) => { - // Initialize the client. - let params = mcp_types::InitializeRequestParams { - capabilities: ClientCapabilities { - experimental: None, - roots: None, - sampling: None, - // https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities - // indicates this should be an empty object. - elicitation: Some(json!({})), - }, - client_info: Implementation { - name: "codex-mcp-client".to_owned(), - version: env!("CARGO_PKG_VERSION").to_owned(), - title: Some("Codex".into()), - }, - protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(), - }; - let initialize_notification_params = None; - let timeout = Some(Duration::from_secs(10)); - match client - .initialize(params, initialize_notification_params, timeout) - .await - { - Ok(_response) => (server_name, Ok(client)), - Err(e) => (server_name, Err(e)), + // Ensure limited concurrency + let _permit = permit.await; + + // Small helper to spawn and initialize a client once + async fn try_start( + server_name: &str, + cfg: &McpServerConfig, + ) -> (String, Result) { + let McpServerConfig { command, args, env } = cfg.clone(); + let client_res = McpClient::new_stdio_client( + command.into(), + args.into_iter().map(OsString::from).collect(), + env, + ) + .await; + match client_res { + Ok(client) => { + // Initialize the client. + let params = mcp_types::InitializeRequestParams { + capabilities: ClientCapabilities { + experimental: None, + roots: None, + sampling: None, + // https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities + // indicates this should be an empty object. + elicitation: Some(json!({})), + }, + client_info: Implementation { + name: "codex-mcp-client".to_owned(), + version: env!("CARGO_PKG_VERSION").to_owned(), + title: Some("Codex".into()), + }, + protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(), + }; + let initialize_notification_params = None; + let timeout = Some(Duration::from_secs(10)); + match client + .initialize(params, initialize_notification_params, timeout) + .await + { + Ok(_response) => (server_name.to_string(), Ok(client)), + Err(e) => (server_name.to_string(), Err(e)), + } + } + Err(e) => (server_name.to_string(), Err(e.into())), + } + } + + // Try start once, then quick backoff retry + let (name1, res1) = try_start(&server_name, &cfg).await; + match res1 { + Ok(client) => (name1, Ok(client)), + Err(e1) => { + // Quick backoff to avoid thundering herd; do not block overall startup for long + tokio::time::sleep(Duration::from_millis(750)).await; + let (name2, res2) = try_start(&server_name, &cfg).await; + match res2 { + Ok(client) => (name2, Ok(client)), + Err(e2) => ( + name2, + Err(anyhow!("initialization failed twice: {e1}; retry: {e2}")), + ), } } - Err(e) => (server_name, Err(e.into())), } }); } @@ -233,6 +268,7 @@ async fn list_all_tools( clients: &HashMap>, ) -> Result> { let mut join_set = JoinSet::new(); + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_LIST_TOOLS)); // Spawn one task per server so we can query them concurrently. This // keeps the overall latency roughly at the slowest server instead of @@ -240,7 +276,9 @@ async fn list_all_tools( for (server_name, client) in clients { let server_name_cloned = server_name.clone(); let client_clone = client.clone(); + let permit = semaphore.clone().acquire_owned(); join_set.spawn(async move { + let _permit = permit.await; let res = client_clone .list_tools(None, Some(LIST_TOOLS_TIMEOUT)) .await; diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 3d312ffce0..6bcc4e413b 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -527,8 +527,22 @@ impl ChatWidget<'_> { )); } } - EventMsg::ExecCommandOutputDelta(_) => { - // TODO + EventMsg::ExecCommandOutputDelta(delta) => { + // Show a live tail of exec output in the status line while running. + use codex_core::protocol::ExecOutputStream; + let mut text = String::from_utf8_lossy(&delta.chunk).to_string(); + // Use only the last non-empty line for status. + if let Some(last) = text.lines().rev().find(|l| !l.trim().is_empty()) { + text = last.trim().to_string(); + } else { + text.truncate(120); + } + let prefix = match delta.stream { + ExecOutputStream::Stdout => "stdout", + ExecOutputStream::Stderr => "stderr", + }; + let status = format!("{prefix}: {text}"); + self.update_latest_log(status); } EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: _, @@ -585,7 +599,10 @@ impl ChatWidget<'_> { self.app_event_tx.send(AppEvent::ExitRequest); } EventMsg::TurnDiff(TurnDiffEvent { unified_diff }) => { - info!("TurnDiffEvent: {unified_diff}"); + // Avoid spamming logs with full unified diffs; log a concise summary instead. + let bytes = unified_diff.len(); + let lines = unified_diff.lines().count(); + info!("TurnDiffEvent received ({} bytes, {} lines)", bytes, lines); } EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { info!("BackgroundEvent: {message}"); diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 8f64f3247d..3c97de9b98 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -193,6 +193,7 @@ pub async fn run_main( // Build layered subscriber: let file_layer = tracing_subscriber::fmt::layer() .with_writer(non_blocking) + .with_ansi(false) .with_target(false) .with_filter(env_filter());