diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs new file mode 100644 index 0000000000..c0a6ab287c --- /dev/null +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -0,0 +1,269 @@ +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_core::ConversationManager; +use codex_core::NewConversation; +use codex_core::config::Config; +use codex_core::config::ConfigOverrides; +use mcp_types::JSONRPCErrorError; +use mcp_types::RequestId; +use tokio::sync::oneshot; +use uuid::Uuid; + +use crate::error_code::INTERNAL_ERROR_CODE; +use crate::error_code::INVALID_REQUEST_ERROR_CODE; +use crate::json_to_toml::json_to_toml; +use crate::outgoing_message::OutgoingMessageSender; +use crate::outgoing_message::OutgoingNotificationMeta; +use crate::wire_format::AddConversationListenerParams; +use crate::wire_format::AddConversationSubscriptionResponse; +use crate::wire_format::CodexRequest; +use crate::wire_format::ConversationId; +use crate::wire_format::InputItem as WireInputItem; +use crate::wire_format::NewConversationParams; +use crate::wire_format::NewConversationResponse; +use crate::wire_format::RemoveConversationListenerParams; +use crate::wire_format::RemoveConversationSubscriptionResponse; +use crate::wire_format::SendUserMessageParams; +use crate::wire_format::SendUserMessageResponse; +use codex_core::protocol::InputItem as CoreInputItem; +use codex_core::protocol::Op; + +/// Handles JSON-RPC messages for Codex conversations. +pub(crate) struct CodexMessageProcessor { + conversation_manager: Arc, + outgoing: Arc, + codex_linux_sandbox_exe: Option, + conversation_listeners: HashMap>, +} + +impl CodexMessageProcessor { + pub fn new( + conversation_manager: Arc, + outgoing: Arc, + codex_linux_sandbox_exe: Option, + ) -> Self { + Self { + conversation_manager, + outgoing, + codex_linux_sandbox_exe, + conversation_listeners: HashMap::new(), + } + } + + pub async fn process_request(&mut self, request: CodexRequest) { + match request { + CodexRequest::NewConversation { request_id, params } => { + // Do not tokio::spawn() to process new_conversation() + // asynchronously because we need to ensure the conversation is + // created before processing any subsequent messages. + self.process_new_conversation(request_id, params).await; + } + CodexRequest::SendUserMessage { request_id, params } => { + self.send_user_message(request_id, params).await; + } + CodexRequest::AddConversationListener { request_id, params } => { + self.add_conversation_listener(request_id, params).await; + } + CodexRequest::RemoveConversationListener { request_id, params } => { + self.remove_conversation_listener(request_id, params).await; + } + } + } + + async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) { + let config = match derive_config_from_params(params, self.codex_linux_sandbox_exe.clone()) { + Ok(config) => config, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("error deriving config: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self.conversation_manager.new_conversation(config).await { + Ok(conversation_id) => { + let NewConversation { + conversation_id, + session_configured, + .. + } = conversation_id; + let response = NewConversationResponse { + conversation_id: ConversationId(conversation_id), + model: session_configured.model, + }; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("error creating conversation: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { + let SendUserMessageParams { + conversation_id, + items, + } = params; + let Ok(conversation) = self + .conversation_manager + .get_conversation(conversation_id.0) + .await + else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {conversation_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + let mapped_items: Vec = items + .into_iter() + .map(|item| match item { + WireInputItem::Text { text } => CoreInputItem::Text { text }, + WireInputItem::Image { image_url } => CoreInputItem::Image { image_url }, + WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path }, + }) + .collect(); + + // Submit user input to the conversation. + let _ = conversation + .submit(Op::UserInput { + items: mapped_items, + }) + .await; + + // Acknowledge with an empty result. + self.outgoing + .send_response(request_id, SendUserMessageResponse {}) + .await; + } + + async fn add_conversation_listener( + &mut self, + request_id: RequestId, + params: AddConversationListenerParams, + ) { + let AddConversationListenerParams { conversation_id } = params; + let Ok(conversation) = self + .conversation_manager + .get_conversation(conversation_id.0) + .await + else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {}", conversation_id.0), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + let subscription_id = Uuid::new_v4(); + let (cancel_tx, mut cancel_rx) = oneshot::channel(); + self.conversation_listeners + .insert(subscription_id, cancel_tx); + let outgoing_for_task = self.outgoing.clone(); + let add_listener_request_id = request_id.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut cancel_rx => { + // User has unsubscribed, so exit this task. + break; + } + event = conversation.next_event() => { + let event = match event { + Ok(event) => event, + Err(err) => { + tracing::warn!("conversation.next_event() failed with: {err}"); + break; + } + }; + + outgoing_for_task.send_event_as_notification( + &event, + Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))), + ) + .await; + } + } + } + }); + let response = AddConversationSubscriptionResponse { subscription_id }; + self.outgoing.send_response(request_id, response).await; + } + + async fn remove_conversation_listener( + &mut self, + request_id: RequestId, + params: RemoveConversationListenerParams, + ) { + let RemoveConversationListenerParams { subscription_id } = params; + match self.conversation_listeners.remove(&subscription_id) { + Some(sender) => { + // Signal the spawned task to exit and acknowledge. + let _ = sender.send(()); + let response = RemoveConversationSubscriptionResponse {}; + self.outgoing.send_response(request_id, response).await; + } + None => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("subscription not found: {subscription_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } +} + +fn derive_config_from_params( + params: NewConversationParams, + codex_linux_sandbox_exe: Option, +) -> std::io::Result { + let NewConversationParams { + model, + profile, + cwd, + approval_policy, + sandbox, + config: cli_overrides, + base_instructions, + include_plan_tool, + } = params; + let overrides = ConfigOverrides { + model, + config_profile: profile, + cwd: cwd.map(PathBuf::from), + approval_policy: approval_policy.map(Into::into), + sandbox_mode: sandbox.map(Into::into), + model_provider: None, + codex_linux_sandbox_exe, + base_instructions, + include_plan_tool, + disable_response_storage: None, + show_raw_agent_reasoning: None, + }; + + let cli_overrides = cli_overrides + .unwrap_or_default() + .into_iter() + .map(|(k, v)| (k, json_to_toml(v))) + .collect(); + + Config::load_with_cli_overrides(cli_overrides, overrides) +} diff --git a/codex-rs/mcp-server/src/codex_tool_config.rs b/codex-rs/mcp-server/src/codex_tool_config.rs index 4af3e29c48..1d77eb4b82 100644 --- a/codex-rs/mcp-server/src/codex_tool_config.rs +++ b/codex-rs/mcp-server/src/codex_tool_config.rs @@ -58,7 +58,7 @@ pub struct CodexToolCallParam { /// Custom enum mirroring [`AskForApproval`], but has an extra dependency on /// [`JsonSchema`]. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum CodexToolCallApprovalPolicy { Untrusted, @@ -80,7 +80,7 @@ impl From for AskForApproval { /// Custom enum mirroring [`SandboxMode`] from config_types.rs, but with /// `JsonSchema` support. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum CodexToolCallSandboxMode { ReadOnly, diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index abbe3b94a0..b6dcf8246d 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -15,6 +15,7 @@ use tracing::error; use tracing::info; use tracing_subscriber::EnvFilter; +mod codex_message_processor; mod codex_tool_config; mod codex_tool_runner; mod conversation_loop; @@ -26,6 +27,7 @@ pub(crate) mod message_processor; mod outgoing_message; mod patch_approval; pub(crate) mod tool_handlers; +pub mod wire_format; use crate::message_processor::MessageProcessor; use crate::outgoing_message::OutgoingMessage; diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 98beb15460..d043493c7b 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; +use crate::codex_message_processor::CodexMessageProcessor; use crate::codex_tool_config::CodexToolCallParam; use crate::codex_tool_config::CodexToolCallReplyParam; use crate::codex_tool_config::create_tool_for_codex_tool_call_param; @@ -14,6 +15,7 @@ use crate::mcp_protocol::ToolCallResponseResult; use crate::outgoing_message::OutgoingMessageSender; use crate::tool_handlers::create_conversation::handle_create_conversation; use crate::tool_handlers::send_message::handle_send_message; +use crate::wire_format::CodexRequest; use codex_core::ConversationManager; use codex_core::config::Config as CodexConfig; @@ -40,6 +42,7 @@ use tokio::task; use uuid::Uuid; pub(crate) struct MessageProcessor { + codex_message_processor: CodexMessageProcessor, outgoing: Arc, initialized: bool, codex_linux_sandbox_exe: Option, @@ -55,11 +58,19 @@ impl MessageProcessor { outgoing: OutgoingMessageSender, codex_linux_sandbox_exe: Option, ) -> Self { + let outgoing = Arc::new(outgoing); + let conversation_manager = Arc::new(ConversationManager::default()); + let codex_message_processor = CodexMessageProcessor::new( + conversation_manager.clone(), + outgoing.clone(), + codex_linux_sandbox_exe.clone(), + ); Self { - outgoing: Arc::new(outgoing), + codex_message_processor, + outgoing, initialized: false, codex_linux_sandbox_exe, - conversation_manager: Arc::new(ConversationManager::default()), + conversation_manager, running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())), running_session_ids: Arc::new(Mutex::new(HashSet::new())), } @@ -78,6 +89,17 @@ impl MessageProcessor { } pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) { + if let Ok(request_json) = serde_json::to_value(request.clone()) + && let Ok(codex_request) = serde_json::from_value::(request_json) + { + // If the request is a Codex request, handle it with the Codex + // message processor. + self.codex_message_processor + .process_request(codex_request) + .await; + return; + } + // Hold on to the ID so we can respond. let request_id = request.id.clone(); diff --git a/codex-rs/mcp-server/src/wire_format.rs b/codex-rs/mcp-server/src/wire_format.rs new file mode 100644 index 0000000000..61cd881b36 --- /dev/null +++ b/codex-rs/mcp-server/src/wire_format.rs @@ -0,0 +1,180 @@ +use std::collections::HashMap; +use std::fmt::Display; + +use mcp_types::RequestId; +use serde::Deserialize; +use serde::Serialize; + +use crate::codex_tool_config::CodexToolCallApprovalPolicy; +use crate::codex_tool_config::CodexToolCallSandboxMode; +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ConversationId(pub Uuid); + +impl Display for ConversationId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Request from the client to the server. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(tag = "method", rename_all = "camelCase")] +pub enum CodexRequest { + NewConversation { + #[serde(rename = "id")] + request_id: RequestId, + params: NewConversationParams, + }, + SendUserMessage { + #[serde(rename = "id")] + request_id: RequestId, + params: SendUserMessageParams, + }, + + AddConversationListener { + #[serde(rename = "id")] + request_id: RequestId, + params: AddConversationListenerParams, + }, + RemoveConversationListener { + #[serde(rename = "id")] + request_id: RequestId, + params: RemoveConversationListenerParams, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)] +#[serde(rename_all = "camelCase")] +pub struct NewConversationParams { + /// Optional override for the model name (e.g. "o3", "o4-mini"). + #[serde(skip_serializing_if = "Option::is_none")] + pub model: Option, + + /// Configuration profile from config.toml to specify default options. + #[serde(skip_serializing_if = "Option::is_none")] + pub profile: Option, + + /// Working directory for the session. If relative, it is resolved against + /// the server process's current working directory. + #[serde(skip_serializing_if = "Option::is_none")] + pub cwd: Option, + + /// Approval policy for shell commands generated by the model: + /// `untrusted`, `on-failure`, `on-request`, `never`. + #[serde(skip_serializing_if = "Option::is_none")] + pub approval_policy: Option, + + /// Sandbox mode: `read-only`, `workspace-write`, or `danger-full-access`. + #[serde(skip_serializing_if = "Option::is_none")] + pub sandbox: Option, + + /// Individual config settings that will override what is in + /// CODEX_HOME/config.toml. + #[serde(skip_serializing_if = "Option::is_none")] + pub config: Option>, + + /// The set of instructions to use instead of the default ones. + #[serde(skip_serializing_if = "Option::is_none")] + pub base_instructions: Option, + + /// Whether to include the plan tool in the conversation. + #[serde(skip_serializing_if = "Option::is_none")] + pub include_plan_tool: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct NewConversationResponse { + pub conversation_id: ConversationId, + pub model: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct AddConversationSubscriptionResponse { + pub subscription_id: Uuid, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct RemoveConversationSubscriptionResponse {} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SendUserMessageParams { + pub conversation_id: ConversationId, + pub items: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SendUserMessageResponse {} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct AddConversationListenerParams { + pub conversation_id: ConversationId, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct RemoveConversationListenerParams { + pub subscription_id: Uuid, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum InputItem { + Text { + text: String, + }, + /// Pre‑encoded data: URI image. + Image { + image_url: String, + }, + + /// Local image path provided by the user. This will be converted to an + /// `Image` variant (base64 data URL) during request serialization. + LocalImage { + path: std::path::PathBuf, + }, +} + +#[allow(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use serde_json::json; + + #[test] + fn serialize_new_conversation() { + let request = CodexRequest::NewConversation { + request_id: RequestId::Integer(42), + params: NewConversationParams { + model: Some("gpt-5".to_string()), + profile: None, + cwd: None, + approval_policy: Some(CodexToolCallApprovalPolicy::OnRequest), + sandbox: None, + config: None, + base_instructions: None, + include_plan_tool: None, + }, + }; + assert_eq!( + json!({ + "method": "newConversation", + "id": 42, + "params": { + "model": "gpt-5", + "approvalPolicy": "on-request" + } + }), + serde_json::to_value(&request).unwrap(), + ); + } +} diff --git a/codex-rs/mcp-server/tests/codex_message_processor_flow.rs b/codex-rs/mcp-server/tests/codex_message_processor_flow.rs new file mode 100644 index 0000000000..c81a04a5f4 --- /dev/null +++ b/codex-rs/mcp-server/tests/codex_message_processor_flow.rs @@ -0,0 +1,176 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::path::Path; + +use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_mcp_server::wire_format::AddConversationListenerParams; +use codex_mcp_server::wire_format::AddConversationSubscriptionResponse; +use codex_mcp_server::wire_format::NewConversationParams; +use codex_mcp_server::wire_format::NewConversationResponse; +use codex_mcp_server::wire_format::RemoveConversationListenerParams; +use codex_mcp_server::wire_format::RemoveConversationSubscriptionResponse; +use codex_mcp_server::wire_format::SendUserMessageParams; +use codex_mcp_server::wire_format::SendUserMessageResponse; +use mcp_test_support::McpProcess; +use mcp_test_support::create_final_assistant_message_sse_response; +use mcp_test_support::create_mock_chat_completions_server; +use mcp_test_support::create_shell_sse_response; +use mcp_types::JSONRPCResponse; +use mcp_types::RequestId; +use pretty_assertions::assert_eq; +use serde::de::DeserializeOwned; +use std::env; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_codex_jsonrpc_conversation_flow() { + if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let tmp = TempDir::new().expect("tmp dir"); + // Temporary Codex home with config pointing at the mock server. + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home).expect("create codex home dir"); + let working_directory = tmp.path().join("workdir"); + std::fs::create_dir(&working_directory).expect("create working directory"); + + // Create a mock model server that immediately ends each turn. + // Two turns are expected: initial session configure + one user message. + let responses = vec![ + create_shell_sse_response( + vec!["ls".to_string()], + Some(&working_directory), + Some(5000), + "call1234", + ) + .expect("create shell sse response"), + create_final_assistant_message_sse_response("Enjoy your new git repo!") + .expect("create final assistant message"), + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(&codex_home, &server.uri()).expect("write config"); + + // Start MCP server and initialize. + let mut mcp = McpProcess::new(&codex_home).await.expect("spawn mcp"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init error"); + + // 1) newConversation + let new_conv_id = mcp + .send_new_conversation_request(NewConversationParams { + cwd: Some(working_directory.to_string_lossy().into_owned()), + ..Default::default() + }) + .await + .expect("send newConversation"); + let new_conv_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), + ) + .await + .expect("newConversation timeout") + .expect("newConversation resp"); + let new_conv_resp = to_response::(new_conv_resp) + .expect("deserialize newConversation response"); + let NewConversationResponse { + conversation_id, + model, + } = new_conv_resp; + assert_eq!(model, "mock-model"); + + // 2) addConversationListener + let add_listener_id = mcp + .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .await + .expect("send addConversationListener"); + let add_listener_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), + ) + .await + .expect("addConversationListener timeout") + .expect("addConversationListener resp"); + let AddConversationSubscriptionResponse { subscription_id } = + to_response::(add_listener_resp) + .expect("deserialize addConversationListener response"); + + // 3) sendUserMessage (should trigger notifications; we only validate an OK response) + let send_user_id = mcp + .send_send_user_message_request(SendUserMessageParams { + conversation_id, + items: vec![codex_mcp_server::wire_format::InputItem::Text { + text: "text".to_string(), + }], + }) + .await + .expect("send sendUserMessage"); + let send_user_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)), + ) + .await + .expect("sendUserMessage timeout") + .expect("sendUserMessage resp"); + let SendUserMessageResponse {} = to_response::(send_user_resp) + .expect("deserialize sendUserMessage response"); + + // Give the server time to process the user's request. + tokio::time::sleep(std::time::Duration::from_millis(5_000)).await; + + // Could verify that some notifications were received? + + // 4) removeConversationListener + let remove_listener_id = mcp + .send_remove_conversation_listener_request(RemoveConversationListenerParams { + subscription_id, + }) + .await + .expect("send removeConversationListener"); + let remove_listener_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(remove_listener_id)), + ) + .await + .expect("removeConversationListener timeout") + .expect("removeConversationListener resp"); + let RemoveConversationSubscriptionResponse {} = + to_response(remove_listener_resp).expect("deserialize removeConversationListener response"); +} + +fn to_response(response: JSONRPCResponse) -> anyhow::Result { + let value = serde_json::to_value(response.result)?; + let codex_response = serde_json::from_value(value)?; + Ok(codex_response) +} + +// Helper: minimal config.toml pointing at mock provider. +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index 83cf085cd4..4181479b5f 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -18,6 +18,10 @@ use codex_mcp_server::mcp_protocol::ConversationCreateArgs; use codex_mcp_server::mcp_protocol::ConversationId; use codex_mcp_server::mcp_protocol::ConversationSendMessageArgs; use codex_mcp_server::mcp_protocol::ToolCallRequestParams; +use codex_mcp_server::wire_format::AddConversationListenerParams; +use codex_mcp_server::wire_format::NewConversationParams; +use codex_mcp_server::wire_format::RemoveConversationListenerParams; +use codex_mcp_server::wire_format::SendUserMessageParams; use mcp_types::CallToolRequestParams; use mcp_types::ClientCapabilities; @@ -236,6 +240,47 @@ impl McpProcess { .await } + // --------------------------------------------------------------------- + // Codex JSON-RPC (non-tool) helpers + // --------------------------------------------------------------------- + /// Send a `newConversation` JSON-RPC request. + pub async fn send_new_conversation_request( + &mut self, + params: NewConversationParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("newConversation", params).await + } + + /// Send an `addConversationListener` JSON-RPC request. + pub async fn send_add_conversation_listener_request( + &mut self, + params: AddConversationListenerParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("addConversationListener", params).await + } + + /// Send a `sendUserMessage` JSON-RPC request with a single text item. + pub async fn send_send_user_message_request( + &mut self, + params: SendUserMessageParams, + ) -> anyhow::Result { + // Wire format expects variants in camelCase; text item uses external tagging. + let params = Some(serde_json::to_value(params)?); + self.send_request("sendUserMessage", params).await + } + + /// Send a `removeConversationListener` JSON-RPC request. + pub async fn send_remove_conversation_listener_request( + &mut self, + params: RemoveConversationListenerParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("removeConversationListener", params) + .await + } + async fn send_request( &mut self, method: &str,