Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions codex-rs/mcp-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use mcp_types::JSONRPCErrorError;
use mcp_types::RequestId;
use tokio::sync::oneshot;
use uuid::Uuid;

use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::json_to_toml::json_to_toml;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::wire_format::AddConversationListenerParams;
use crate::wire_format::AddConversationSubscriptionResponse;
use crate::wire_format::CodexRequest;
use crate::wire_format::ConversationId;
use crate::wire_format::InputItem as WireInputItem;
use crate::wire_format::NewConversationParams;
use crate::wire_format::NewConversationResponse;
use crate::wire_format::RemoveConversationListenerParams;
use crate::wire_format::RemoveConversationSubscriptionResponse;
use crate::wire_format::SendUserMessageParams;
use crate::wire_format::SendUserMessageResponse;
use codex_core::protocol::InputItem as CoreInputItem;
use codex_core::protocol::Op;

/// Handles JSON-RPC messages for Codex conversations.
pub(crate) struct CodexMessageProcessor {
conversation_manager: Arc<ConversationManager>,
outgoing: Arc<OutgoingMessageSender>,
codex_linux_sandbox_exe: Option<PathBuf>,
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
}

impl CodexMessageProcessor {
pub fn new(
conversation_manager: Arc<ConversationManager>,
outgoing: Arc<OutgoingMessageSender>,
codex_linux_sandbox_exe: Option<PathBuf>,
) -> Self {
Self {
conversation_manager,
outgoing,
codex_linux_sandbox_exe,
conversation_listeners: HashMap::new(),
}
}

pub async fn process_request(&mut self, request: CodexRequest) {
match request {
CodexRequest::NewConversation { request_id, params } => {
// Do not tokio::spawn() to process new_conversation()
// asynchronously because we need to ensure the conversation is
// created before processing any subsequent messages.
self.process_new_conversation(request_id, params).await;
}
CodexRequest::SendUserMessage { request_id, params } => {
self.send_user_message(request_id, params).await;
}
CodexRequest::AddConversationListener { request_id, params } => {
self.add_conversation_listener(request_id, params).await;
}
CodexRequest::RemoveConversationListener { request_id, params } => {
self.remove_conversation_listener(request_id, params).await;
}
}
}

async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) {
let config = match derive_config_from_params(params, self.codex_linux_sandbox_exe.clone()) {
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

match self.conversation_manager.new_conversation(config).await {
Ok(conversation_id) => {
let NewConversation {
conversation_id,
session_configured,
..
} = conversation_id;
let response = NewConversationResponse {
conversation_id: ConversationId(conversation_id),
model: session_configured.model,
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating conversation: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}

async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
let SendUserMessageParams {
conversation_id,
items,
} = params;
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id.0)
.await
else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("conversation not found: {conversation_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};

let mapped_items: Vec<CoreInputItem> = items
.into_iter()
.map(|item| match item {
WireInputItem::Text { text } => CoreInputItem::Text { text },
WireInputItem::Image { image_url } => CoreInputItem::Image { image_url },
WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path },
})
.collect();

// Submit user input to the conversation.
let _ = conversation
.submit(Op::UserInput {
items: mapped_items,
})
.await;

// Acknowledge with an empty result.
self.outgoing
.send_response(request_id, SendUserMessageResponse {})
.await;
}

async fn add_conversation_listener(
&mut self,
request_id: RequestId,
params: AddConversationListenerParams,
) {
let AddConversationListenerParams { conversation_id } = params;
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id.0)
.await
else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("conversation not found: {}", conversation_id.0),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};

let subscription_id = Uuid::new_v4();
let (cancel_tx, mut cancel_rx) = oneshot::channel();
self.conversation_listeners
.insert(subscription_id, cancel_tx);
let outgoing_for_task = self.outgoing.clone();
let add_listener_request_id = request_id.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut cancel_rx => {
// User has unsubscribed, so exit this task.
break;
}
event = conversation.next_event() => {
let event = match event {
Ok(event) => event,
Err(err) => {
tracing::warn!("conversation.next_event() failed with: {err}");
break;
}
};

outgoing_for_task.send_event_as_notification(
&event,
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
)
.await;
}
}
}
});
let response = AddConversationSubscriptionResponse { subscription_id };
self.outgoing.send_response(request_id, response).await;
}

async fn remove_conversation_listener(
&mut self,
request_id: RequestId,
params: RemoveConversationListenerParams,
) {
let RemoveConversationListenerParams { subscription_id } = params;
match self.conversation_listeners.remove(&subscription_id) {
Some(sender) => {
// Signal the spawned task to exit and acknowledge.
let _ = sender.send(());
let response = RemoveConversationSubscriptionResponse {};
self.outgoing.send_response(request_id, response).await;
}
None => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("subscription not found: {subscription_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
}

fn derive_config_from_params(
params: NewConversationParams,
codex_linux_sandbox_exe: Option<PathBuf>,
) -> std::io::Result<Config> {
let NewConversationParams {
model,
profile,
cwd,
approval_policy,
sandbox,
config: cli_overrides,
base_instructions,
include_plan_tool,
} = params;
let overrides = ConfigOverrides {
model,
config_profile: profile,
cwd: cwd.map(PathBuf::from),
approval_policy: approval_policy.map(Into::into),
sandbox_mode: sandbox.map(Into::into),
model_provider: None,
codex_linux_sandbox_exe,
base_instructions,
include_plan_tool,
disable_response_storage: None,
show_raw_agent_reasoning: None,
};

let cli_overrides = cli_overrides
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, json_to_toml(v)))
.collect();

Config::load_with_cli_overrides(cli_overrides, overrides)
}
4 changes: 2 additions & 2 deletions codex-rs/mcp-server/src/codex_tool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct CodexToolCallParam {

/// Custom enum mirroring [`AskForApproval`], but has an extra dependency on
/// [`JsonSchema`].
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum CodexToolCallApprovalPolicy {
Untrusted,
Expand All @@ -80,7 +80,7 @@ impl From<CodexToolCallApprovalPolicy> for AskForApproval {

/// Custom enum mirroring [`SandboxMode`] from config_types.rs, but with
/// `JsonSchema` support.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum CodexToolCallSandboxMode {
ReadOnly,
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/mcp-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;

mod codex_message_processor;
mod codex_tool_config;
mod codex_tool_runner;
mod conversation_loop;
Expand All @@ -26,6 +27,7 @@ pub(crate) mod message_processor;
mod outgoing_message;
mod patch_approval;
pub(crate) mod tool_handlers;
pub mod wire_format;

use crate::message_processor::MessageProcessor;
use crate::outgoing_message::OutgoingMessage;
Expand Down
26 changes: 24 additions & 2 deletions codex-rs/mcp-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;

use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_tool_config::CodexToolCallParam;
use crate::codex_tool_config::CodexToolCallReplyParam;
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
Expand All @@ -14,6 +15,7 @@ use crate::mcp_protocol::ToolCallResponseResult;
use crate::outgoing_message::OutgoingMessageSender;
use crate::tool_handlers::create_conversation::handle_create_conversation;
use crate::tool_handlers::send_message::handle_send_message;
use crate::wire_format::CodexRequest;

use codex_core::ConversationManager;
use codex_core::config::Config as CodexConfig;
Expand All @@ -40,6 +42,7 @@ use tokio::task;
use uuid::Uuid;

pub(crate) struct MessageProcessor {
codex_message_processor: CodexMessageProcessor,
outgoing: Arc<OutgoingMessageSender>,
initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>,
Expand All @@ -55,11 +58,19 @@ impl MessageProcessor {
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
) -> Self {
let outgoing = Arc::new(outgoing);
let conversation_manager = Arc::new(ConversationManager::default());
let codex_message_processor = CodexMessageProcessor::new(
conversation_manager.clone(),
outgoing.clone(),
codex_linux_sandbox_exe.clone(),
);
Self {
outgoing: Arc::new(outgoing),
codex_message_processor,
outgoing,
initialized: false,
codex_linux_sandbox_exe,
conversation_manager: Arc::new(ConversationManager::default()),
conversation_manager,
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
running_session_ids: Arc::new(Mutex::new(HashSet::new())),
}
Expand All @@ -78,6 +89,17 @@ impl MessageProcessor {
}

pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
if let Ok(request_json) = serde_json::to_value(request.clone())
&& let Ok(codex_request) = serde_json::from_value::<CodexRequest>(request_json)
{
// If the request is a Codex request, handle it with the Codex
// message processor.
self.codex_message_processor
.process_request(codex_request)
.await;
return;
}

// Hold on to the ID so we can respond.
let request_id = request.id.clone();

Expand Down
Loading
Loading