Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions codex-rs/mcp-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::json_to_toml::json_to_toml;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::outgoing_message::OutgoingNotification;
use crate::wire_format::AddConversationListenerParams;
use crate::wire_format::AddConversationSubscriptionResponse;
use crate::wire_format::CodexRequest;
Expand Down Expand Up @@ -176,7 +176,6 @@ impl CodexMessageProcessor {
self.conversation_listeners
.insert(subscription_id, cancel_tx);
let outgoing_for_task = self.outgoing.clone();
let add_listener_request_id = request_id.clone();
tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -193,10 +192,24 @@ impl CodexMessageProcessor {
}
};

outgoing_for_task.send_event_as_notification(
&event,
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
)
let method = format!("codex/event/{}", event.msg);
let mut params = match serde_json::to_value(event) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
tracing::error!("event did not serialize to an object");
continue;
}
Err(err) => {
tracing::error!("failed to serialize event: {err}");
continue;
}
};
params.insert("conversationId".to_string(), conversation_id.to_string().into());

outgoing_for_task.send_notification(OutgoingNotification {
method,
params: Some(params.into()),
})
.await;
}
}
Expand Down
11 changes: 8 additions & 3 deletions codex-rs/mcp-server/src/outgoing_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,21 @@ impl OutgoingMessageSender {
event_json
};

let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
self.send_notification(OutgoingNotification {
method: "codex/event".to_string(),
params: Some(params.clone()),
});
let _ = self.sender.send(outgoing_message).await;
})
.await;

self.send_event_as_notification_new_schema(event, Some(params.clone()))
.await;
}

pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
let outgoing_message = OutgoingMessage::Notification(notification);
let _ = self.sender.send(outgoing_message).await;
}

// should be backwards compatible.
// it will replace send_event_as_notification eventually.
async fn send_event_as_notification_new_schema(
Expand Down
25 changes: 21 additions & 4 deletions codex-rs/mcp-server/tests/codex_message_processor_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use mcp_test_support::McpProcess;
use mcp_test_support::create_final_assistant_message_sse_response;
use mcp_test_support::create_mock_chat_completions_server;
use mcp_test_support::create_shell_sse_response;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -123,10 +124,26 @@ async fn test_codex_jsonrpc_conversation_flow() {
let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)
.expect("deserialize sendUserMessage response");

// Give the server time to process the user's request.
tokio::time::sleep(std::time::Duration::from_millis(5_000)).await;

// Could verify that some notifications were received?
// Verify the task_finished notification is received.
// Note this also ensures that the final request to the server was made.
let task_finished_notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await
.expect("task_finished_notification timeout")
.expect("task_finished_notification resp");
let serde_json::Value::Object(map) = task_finished_notification
.params
.expect("notification should have params")
else {
panic!("task_finished_notification should have params");
};
assert_eq!(
map.get("conversationId")
.expect("should have conversationId"),
&serde_json::Value::String(conversation_id.to_string())
);

// 4) removeConversationListener
let remove_listener_id = mcp
Expand Down
27 changes: 27 additions & 0 deletions codex-rs/mcp-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,33 @@ impl McpProcess {
}
}

pub async fn read_stream_until_notification_message(
&mut self,
method: &str,
) -> anyhow::Result<JSONRPCNotification> {
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");

match message {
JSONRPCMessage::Notification(notification) => {
if notification.method == method {
return Ok(notification);
}
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
}
JSONRPCMessage::Response(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
}
}
}
}

pub async fn read_stream_until_configured_response_message(
&mut self,
) -> anyhow::Result<String> {
Expand Down
Loading