diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index c0a6ab287c..5e0e4cad6e 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -15,7 +15,7 @@ use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::json_to_toml::json_to_toml; use crate::outgoing_message::OutgoingMessageSender; -use crate::outgoing_message::OutgoingNotificationMeta; +use crate::outgoing_message::OutgoingNotification; use crate::wire_format::AddConversationListenerParams; use crate::wire_format::AddConversationSubscriptionResponse; use crate::wire_format::CodexRequest; @@ -176,7 +176,6 @@ impl CodexMessageProcessor { self.conversation_listeners .insert(subscription_id, cancel_tx); let outgoing_for_task = self.outgoing.clone(); - let add_listener_request_id = request_id.clone(); tokio::spawn(async move { loop { tokio::select! { @@ -193,10 +192,24 @@ impl CodexMessageProcessor { } }; - outgoing_for_task.send_event_as_notification( - &event, - Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))), - ) + let method = format!("codex/event/{}", event.msg); + let mut params = match serde_json::to_value(event) { + Ok(serde_json::Value::Object(map)) => map, + Ok(_) => { + tracing::error!("event did not serialize to an object"); + continue; + } + Err(err) => { + tracing::error!("failed to serialize event: {err}"); + continue; + } + }; + params.insert("conversationId".to_string(), conversation_id.to_string().into()); + + outgoing_for_task.send_notification(OutgoingNotification { + method, + params: Some(params.into()), + }) .await; } } diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index 3a77cbf2a0..f13b8b324f 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -114,16 +114,21 @@ impl OutgoingMessageSender { event_json }; - let outgoing_message = OutgoingMessage::Notification(OutgoingNotification { + self.send_notification(OutgoingNotification { method: "codex/event".to_string(), params: Some(params.clone()), - }); - let _ = self.sender.send(outgoing_message).await; + }) + .await; self.send_event_as_notification_new_schema(event, Some(params.clone())) .await; } + pub(crate) async fn send_notification(&self, notification: OutgoingNotification) { + let outgoing_message = OutgoingMessage::Notification(notification); + let _ = self.sender.send(outgoing_message).await; + } + // should be backwards compatible. // it will replace send_event_as_notification eventually. async fn send_event_as_notification_new_schema( diff --git a/codex-rs/mcp-server/tests/codex_message_processor_flow.rs b/codex-rs/mcp-server/tests/codex_message_processor_flow.rs index c81a04a5f4..7b7845609e 100644 --- a/codex-rs/mcp-server/tests/codex_message_processor_flow.rs +++ b/codex-rs/mcp-server/tests/codex_message_processor_flow.rs @@ -15,6 +15,7 @@ use mcp_test_support::McpProcess; use mcp_test_support::create_final_assistant_message_sse_response; use mcp_test_support::create_mock_chat_completions_server; use mcp_test_support::create_shell_sse_response; +use mcp_types::JSONRPCNotification; use mcp_types::JSONRPCResponse; use mcp_types::RequestId; use pretty_assertions::assert_eq; @@ -123,10 +124,26 @@ async fn test_codex_jsonrpc_conversation_flow() { let SendUserMessageResponse {} = to_response::(send_user_resp) .expect("deserialize sendUserMessage response"); - // Give the server time to process the user's request. - tokio::time::sleep(std::time::Duration::from_millis(5_000)).await; - - // Could verify that some notifications were received? + // Verify the task_finished notification is received. + // Note this also ensures that the final request to the server was made. + let task_finished_notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await + .expect("task_finished_notification timeout") + .expect("task_finished_notification resp"); + let serde_json::Value::Object(map) = task_finished_notification + .params + .expect("notification should have params") + else { + panic!("task_finished_notification should have params"); + }; + assert_eq!( + map.get("conversationId") + .expect("should have conversationId"), + &serde_json::Value::String(conversation_id.to_string()) + ); // 4) removeConversationListener let remove_listener_id = mcp diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index 4181479b5f..a659b1d950 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -374,6 +374,33 @@ impl McpProcess { } } + pub async fn read_stream_until_notification_message( + &mut self, + method: &str, + ) -> anyhow::Result { + loop { + let message = self.read_jsonrpc_message().await?; + eprint!("message: {message:?}"); + + match message { + JSONRPCMessage::Notification(notification) => { + if notification.method == method { + return Ok(notification); + } + } + JSONRPCMessage::Request(_) => { + anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); + } + JSONRPCMessage::Error(_) => { + anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}"); + } + JSONRPCMessage::Response(_) => { + anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}"); + } + } + } + } + pub async fn read_stream_until_configured_response_message( &mut self, ) -> anyhow::Result {