Skip to content

Commit 62b35cb

Browse files
committed
feat: support traditional JSON-RPC request/response in MCP server
1 parent de2c6a2 commit 62b35cb

File tree

7 files changed

+698
-4
lines changed

7 files changed

+698
-4
lines changed
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
use std::collections::HashMap;
2+
use std::path::PathBuf;
3+
use std::sync::Arc;
4+
5+
use codex_core::ConversationManager;
6+
use codex_core::NewConversation;
7+
use codex_core::config::Config;
8+
use codex_core::config::ConfigOverrides;
9+
use mcp_types::JSONRPCErrorError;
10+
use mcp_types::RequestId;
11+
use tokio::sync::oneshot;
12+
use uuid::Uuid;
13+
14+
use crate::error_code::INTERNAL_ERROR_CODE;
15+
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
16+
use crate::json_to_toml::json_to_toml;
17+
use crate::outgoing_message::OutgoingMessageSender;
18+
use crate::outgoing_message::OutgoingNotificationMeta;
19+
use crate::wire_format::AddConversationListenerParams;
20+
use crate::wire_format::AddConversationSubscriptionResponse;
21+
use crate::wire_format::CodexRequest;
22+
use crate::wire_format::ConversationId;
23+
use crate::wire_format::InputItem as WireInputItem;
24+
use crate::wire_format::NewConversationParams;
25+
use crate::wire_format::NewConversationResponse;
26+
use crate::wire_format::RemoveConversationListenerParams;
27+
use crate::wire_format::RemoveConversationSubscriptionResponse;
28+
use crate::wire_format::SendUserMessageParams;
29+
use crate::wire_format::SendUserMessageResponse;
30+
use codex_core::protocol::InputItem as CoreInputItem;
31+
use codex_core::protocol::Op;
32+
33+
/// Handles JSON-RPC messages for Codex conversations.
34+
pub(crate) struct CodexMessageProcessor {
35+
conversation_manager: Arc<ConversationManager>,
36+
outgoing: Arc<OutgoingMessageSender>,
37+
codex_linux_sandbox_exe: Option<PathBuf>,
38+
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
39+
}
40+
41+
impl CodexMessageProcessor {
42+
pub fn new(
43+
conversation_manager: Arc<ConversationManager>,
44+
outgoing: Arc<OutgoingMessageSender>,
45+
codex_linux_sandbox_exe: Option<PathBuf>,
46+
) -> Self {
47+
Self {
48+
conversation_manager,
49+
outgoing,
50+
codex_linux_sandbox_exe,
51+
conversation_listeners: HashMap::new(),
52+
}
53+
}
54+
55+
pub async fn process_request(&mut self, request: CodexRequest) {
56+
match request {
57+
CodexRequest::NewConversation { request_id, params } => {
58+
// Do not tokio::spawn() to process new_conversation()
59+
// asynchronously because we need to ensure the conversation is
60+
// created before processing any subsequent messages.
61+
self.process_new_conversation(request_id, params).await;
62+
}
63+
CodexRequest::SendUserMessage { request_id, params } => {
64+
self.send_user_message(request_id, params).await;
65+
}
66+
CodexRequest::AddConversationListener { request_id, params } => {
67+
self.add_conversation_listener(request_id, params).await;
68+
}
69+
CodexRequest::RemoveConversationListener { request_id, params } => {
70+
self.remove_conversation_listener(request_id, params).await;
71+
}
72+
}
73+
}
74+
75+
async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) {
76+
let config = match derive_config_from_params(params, self.codex_linux_sandbox_exe.clone()) {
77+
Ok(config) => config,
78+
Err(err) => {
79+
let error = JSONRPCErrorError {
80+
code: INVALID_REQUEST_ERROR_CODE,
81+
message: format!("error deriving config: {err}"),
82+
data: None,
83+
};
84+
self.outgoing.send_error(request_id, error).await;
85+
return;
86+
}
87+
};
88+
89+
match self.conversation_manager.new_conversation(config).await {
90+
Ok(conversation_id) => {
91+
let NewConversation {
92+
conversation_id,
93+
session_configured,
94+
..
95+
} = conversation_id;
96+
let response = NewConversationResponse {
97+
conversation_id: ConversationId(conversation_id),
98+
model: session_configured.model,
99+
};
100+
self.outgoing.send_response(request_id, response).await;
101+
}
102+
Err(err) => {
103+
let error = JSONRPCErrorError {
104+
code: INTERNAL_ERROR_CODE,
105+
message: format!("error creating conversation: {err}"),
106+
data: None,
107+
};
108+
self.outgoing.send_error(request_id, error).await;
109+
}
110+
}
111+
}
112+
113+
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
114+
let SendUserMessageParams {
115+
conversation_id,
116+
items,
117+
} = params;
118+
let Ok(conversation) = self
119+
.conversation_manager
120+
.get_conversation(conversation_id.0)
121+
.await
122+
else {
123+
let error = JSONRPCErrorError {
124+
code: INVALID_REQUEST_ERROR_CODE,
125+
message: format!("conversation not found: {conversation_id}"),
126+
data: None,
127+
};
128+
self.outgoing.send_error(request_id, error).await;
129+
return;
130+
};
131+
132+
let mapped_items: Vec<CoreInputItem> = items
133+
.into_iter()
134+
.map(|item| match item {
135+
WireInputItem::Text { text } => CoreInputItem::Text { text },
136+
WireInputItem::Image { image_url } => CoreInputItem::Image { image_url },
137+
WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path },
138+
})
139+
.collect();
140+
141+
// Submit user input to the conversation.
142+
let _ = conversation
143+
.submit(Op::UserInput {
144+
items: mapped_items,
145+
})
146+
.await;
147+
148+
// Acknowledge with an empty result.
149+
self.outgoing
150+
.send_response(request_id, SendUserMessageResponse {})
151+
.await;
152+
}
153+
154+
async fn add_conversation_listener(
155+
&mut self,
156+
request_id: RequestId,
157+
params: AddConversationListenerParams,
158+
) {
159+
let AddConversationListenerParams { conversation_id } = params;
160+
let Ok(conversation) = self
161+
.conversation_manager
162+
.get_conversation(conversation_id.0)
163+
.await
164+
else {
165+
let error = JSONRPCErrorError {
166+
code: INVALID_REQUEST_ERROR_CODE,
167+
message: format!("conversation not found: {}", conversation_id.0),
168+
data: None,
169+
};
170+
self.outgoing.send_error(request_id, error).await;
171+
return;
172+
};
173+
174+
let subscription_id = Uuid::new_v4();
175+
let (cancel_tx, mut cancel_rx) = oneshot::channel();
176+
self.conversation_listeners
177+
.insert(subscription_id, cancel_tx);
178+
let outgoing_for_task = self.outgoing.clone();
179+
let add_listener_request_id = request_id.clone();
180+
tokio::spawn(async move {
181+
loop {
182+
tokio::select! {
183+
_ = &mut cancel_rx => {
184+
// User has unsubscribed, so exit this task.
185+
break;
186+
}
187+
event = conversation.next_event() => {
188+
let event = match event {
189+
Ok(event) => event,
190+
Err(err) => {
191+
tracing::warn!("conversation.next_event() failed with: {err}");
192+
break;
193+
}
194+
};
195+
196+
outgoing_for_task.send_event_as_notification(
197+
&event,
198+
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
199+
)
200+
.await;
201+
}
202+
}
203+
}
204+
});
205+
let response = AddConversationSubscriptionResponse { subscription_id };
206+
self.outgoing.send_response(request_id, response).await;
207+
}
208+
209+
async fn remove_conversation_listener(
210+
&mut self,
211+
request_id: RequestId,
212+
params: RemoveConversationListenerParams,
213+
) {
214+
let RemoveConversationListenerParams { subscription_id } = params;
215+
match self.conversation_listeners.remove(&subscription_id) {
216+
Some(sender) => {
217+
// Signal the spawned task to exit and acknowledge.
218+
let _ = sender.send(());
219+
let response = RemoveConversationSubscriptionResponse {};
220+
self.outgoing.send_response(request_id, response).await;
221+
}
222+
None => {
223+
let error = JSONRPCErrorError {
224+
code: INVALID_REQUEST_ERROR_CODE,
225+
message: format!("subscription not found: {subscription_id}"),
226+
data: None,
227+
};
228+
self.outgoing.send_error(request_id, error).await;
229+
}
230+
}
231+
}
232+
}
233+
234+
fn derive_config_from_params(
235+
params: NewConversationParams,
236+
codex_linux_sandbox_exe: Option<PathBuf>,
237+
) -> std::io::Result<Config> {
238+
let NewConversationParams {
239+
model,
240+
profile,
241+
cwd,
242+
approval_policy,
243+
sandbox,
244+
config: cli_overrides,
245+
base_instructions,
246+
include_plan_tool,
247+
} = params;
248+
let overrides = ConfigOverrides {
249+
model,
250+
config_profile: profile,
251+
cwd: cwd.map(PathBuf::from),
252+
approval_policy: approval_policy.map(Into::into),
253+
sandbox_mode: sandbox.map(Into::into),
254+
model_provider: None,
255+
codex_linux_sandbox_exe,
256+
base_instructions,
257+
include_plan_tool,
258+
disable_response_storage: None,
259+
show_raw_agent_reasoning: None,
260+
};
261+
262+
let cli_overrides = cli_overrides
263+
.unwrap_or_default()
264+
.into_iter()
265+
.map(|(k, v)| (k, json_to_toml(v)))
266+
.collect();
267+
268+
Config::load_with_cli_overrides(cli_overrides, overrides)
269+
}

codex-rs/mcp-server/src/codex_tool_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct CodexToolCallParam {
5858

5959
/// Custom enum mirroring [`AskForApproval`], but has an extra dependency on
6060
/// [`JsonSchema`].
61-
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
61+
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
6262
#[serde(rename_all = "kebab-case")]
6363
pub enum CodexToolCallApprovalPolicy {
6464
Untrusted,
@@ -80,7 +80,7 @@ impl From<CodexToolCallApprovalPolicy> for AskForApproval {
8080

8181
/// Custom enum mirroring [`SandboxMode`] from config_types.rs, but with
8282
/// `JsonSchema` support.
83-
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
83+
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
8484
#[serde(rename_all = "kebab-case")]
8585
pub enum CodexToolCallSandboxMode {
8686
ReadOnly,

codex-rs/mcp-server/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tracing::error;
1515
use tracing::info;
1616
use tracing_subscriber::EnvFilter;
1717

18+
mod codex_message_processor;
1819
mod codex_tool_config;
1920
mod codex_tool_runner;
2021
mod conversation_loop;
@@ -26,6 +27,7 @@ pub(crate) mod message_processor;
2627
mod outgoing_message;
2728
mod patch_approval;
2829
pub(crate) mod tool_handlers;
30+
pub mod wire_format;
2931

3032
use crate::message_processor::MessageProcessor;
3133
use crate::outgoing_message::OutgoingMessage;

codex-rs/mcp-server/src/message_processor.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::collections::HashSet;
33
use std::path::PathBuf;
44
use std::sync::Arc;
55

6+
use crate::codex_message_processor::CodexMessageProcessor;
67
use crate::codex_tool_config::CodexToolCallParam;
78
use crate::codex_tool_config::CodexToolCallReplyParam;
89
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
@@ -14,6 +15,7 @@ use crate::mcp_protocol::ToolCallResponseResult;
1415
use crate::outgoing_message::OutgoingMessageSender;
1516
use crate::tool_handlers::create_conversation::handle_create_conversation;
1617
use crate::tool_handlers::send_message::handle_send_message;
18+
use crate::wire_format::CodexRequest;
1719

1820
use codex_core::ConversationManager;
1921
use codex_core::config::Config as CodexConfig;
@@ -40,6 +42,7 @@ use tokio::task;
4042
use uuid::Uuid;
4143

4244
pub(crate) struct MessageProcessor {
45+
codex_message_processor: CodexMessageProcessor,
4346
outgoing: Arc<OutgoingMessageSender>,
4447
initialized: bool,
4548
codex_linux_sandbox_exe: Option<PathBuf>,
@@ -55,11 +58,19 @@ impl MessageProcessor {
5558
outgoing: OutgoingMessageSender,
5659
codex_linux_sandbox_exe: Option<PathBuf>,
5760
) -> Self {
61+
let outgoing = Arc::new(outgoing);
62+
let conversation_manager = Arc::new(ConversationManager::default());
63+
let codex_message_processor = CodexMessageProcessor::new(
64+
conversation_manager.clone(),
65+
outgoing.clone(),
66+
codex_linux_sandbox_exe.clone(),
67+
);
5868
Self {
59-
outgoing: Arc::new(outgoing),
69+
codex_message_processor,
70+
outgoing,
6071
initialized: false,
6172
codex_linux_sandbox_exe,
62-
conversation_manager: Arc::new(ConversationManager::default()),
73+
conversation_manager,
6374
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
6475
running_session_ids: Arc::new(Mutex::new(HashSet::new())),
6576
}
@@ -78,6 +89,17 @@ impl MessageProcessor {
7889
}
7990

8091
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
92+
if let Ok(request_json) = serde_json::to_value(request.clone())
93+
&& let Ok(codex_request) = serde_json::from_value::<CodexRequest>(request_json)
94+
{
95+
// If the request is a Codex request, handle it with the Codex
96+
// message processor.
97+
self.codex_message_processor
98+
.process_request(codex_request)
99+
.await;
100+
return;
101+
}
102+
81103
// Hold on to the ID so we can respond.
82104
let request_id = request.id.clone();
83105

0 commit comments

Comments
 (0)