Skip to content

Commit 0076e3a

Browse files
committed
fix: verify notifications are sent with the conversationId set
1 parent 1b07bd7 commit 0076e3a

File tree

4 files changed

+76
-13
lines changed

4 files changed

+76
-13
lines changed

codex-rs/mcp-server/src/codex_message_processor.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::error_code::INTERNAL_ERROR_CODE;
1515
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
1616
use crate::json_to_toml::json_to_toml;
1717
use crate::outgoing_message::OutgoingMessageSender;
18-
use crate::outgoing_message::OutgoingNotificationMeta;
18+
use crate::outgoing_message::OutgoingNotification;
1919
use crate::wire_format::AddConversationListenerParams;
2020
use crate::wire_format::AddConversationSubscriptionResponse;
2121
use crate::wire_format::CodexRequest;
@@ -175,7 +175,6 @@ impl CodexMessageProcessor {
175175
let (sender, mut receiver) = oneshot::channel();
176176
self.conversation_listeners.insert(subscription_id, sender);
177177
let outgoing_for_task = self.outgoing.clone();
178-
let add_listener_request_id = request_id.clone();
179178
tokio::spawn(async move {
180179
loop {
181180
tokio::select! {
@@ -192,10 +191,24 @@ impl CodexMessageProcessor {
192191
}
193192
};
194193

195-
outgoing_for_task.send_event_as_notification(
196-
&event,
197-
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
198-
)
194+
let method = format!("codex/event/{}", event.msg);
195+
let mut params = match serde_json::to_value(event) {
196+
Ok(serde_json::Value::Object(map)) => map,
197+
Ok(_) => {
198+
tracing::error!("event did not serialize to an object");
199+
continue;
200+
}
201+
Err(err) => {
202+
tracing::error!("failed to serialize event: {err}");
203+
continue;
204+
}
205+
};
206+
params.insert("conversationId".to_string(), conversation_id.to_string().into());
207+
208+
outgoing_for_task.send_notification(OutgoingNotification {
209+
method,
210+
params: Some(params.into()),
211+
})
199212
.await;
200213
}
201214
}

codex-rs/mcp-server/src/outgoing_message.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,21 @@ impl OutgoingMessageSender {
114114
event_json
115115
};
116116

117-
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
117+
self.send_notification(OutgoingNotification {
118118
method: "codex/event".to_string(),
119119
params: Some(params.clone()),
120-
});
121-
let _ = self.sender.send(outgoing_message).await;
120+
})
121+
.await;
122122

123123
self.send_event_as_notification_new_schema(event, Some(params.clone()))
124124
.await;
125125
}
126126

127+
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
128+
let outgoing_message = OutgoingMessage::Notification(notification);
129+
let _ = self.sender.send(outgoing_message).await;
130+
}
131+
127132
// should be backwards compatible.
128133
// it will replace send_event_as_notification eventually.
129134
async fn send_event_as_notification_new_schema(

codex-rs/mcp-server/tests/codex_message_processor_flow.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![allow(clippy::expect_used, clippy::unwrap_used)]
22

3+
use core::panic;
34
use std::path::Path;
45

56
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
@@ -15,6 +16,7 @@ use mcp_test_support::McpProcess;
1516
use mcp_test_support::create_final_assistant_message_sse_response;
1617
use mcp_test_support::create_mock_chat_completions_server;
1718
use mcp_test_support::create_shell_sse_response;
19+
use mcp_types::JSONRPCNotification;
1820
use mcp_types::JSONRPCResponse;
1921
use mcp_types::RequestId;
2022
use pretty_assertions::assert_eq;
@@ -123,10 +125,26 @@ async fn test_codex_jsonrpc_conversation_flow() {
123125
let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)
124126
.expect("deserialize sendUserMessage response");
125127

126-
// Give the server time to process the user's request.
127-
tokio::time::sleep(std::time::Duration::from_millis(5_000)).await;
128-
129-
// Could verify that some notifications were received?
128+
// Verify the task_finished notification is received.
129+
// Note this also ensures that the final request to the server was made.
130+
let task_finished_notification: JSONRPCNotification = timeout(
131+
DEFAULT_READ_TIMEOUT,
132+
mcp.read_stream_until_notification_message("codex/event/task_complete"),
133+
)
134+
.await
135+
.expect("task_finished_notification timeout")
136+
.expect("task_finished_notification resp");
137+
let serde_json::Value::Object(map) = task_finished_notification
138+
.params
139+
.expect("notification should have params")
140+
else {
141+
panic!("task_finished_notification should have params");
142+
};
143+
assert_eq!(
144+
map.get("conversationId")
145+
.expect("should have conversationId"),
146+
&serde_json::Value::String(conversation_id.to_string())
147+
);
130148

131149
// 4) removeConversationListener
132150
let remove_listener_id = mcp

codex-rs/mcp-server/tests/common/mcp_process.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,33 @@ impl McpProcess {
374374
}
375375
}
376376

377+
pub async fn read_stream_until_notification_message(
378+
&mut self,
379+
method: &str,
380+
) -> anyhow::Result<JSONRPCNotification> {
381+
loop {
382+
let message = self.read_jsonrpc_message().await?;
383+
eprint!("message: {message:?}");
384+
385+
match message {
386+
JSONRPCMessage::Notification(notification) => {
387+
if notification.method == method {
388+
return Ok(notification);
389+
}
390+
}
391+
JSONRPCMessage::Request(_) => {
392+
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
393+
}
394+
JSONRPCMessage::Error(_) => {
395+
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
396+
}
397+
JSONRPCMessage::Response(_) => {
398+
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
399+
}
400+
}
401+
}
402+
}
403+
377404
pub async fn read_stream_until_configured_response_message(
378405
&mut self,
379406
) -> anyhow::Result<String> {

0 commit comments

Comments
 (0)