Skip to content

Commit a62510e

Browse files
authored
fix: verify notifications are sent with the conversationId set (#2278)
This updates `CodexMessageProcessor` so that each notification it sends for a `EventMsg` from a `CodexConversation` such that: - The `params` always has an appropriate `conversationId` field. - The `method` is now includes the name of the `EventMsg` type rather than using `codex/event` as the `method` type for all notifications. (We currently prefix the method name with `codex/event/`, but I think that should go away once we formalize the notification schema in `wire_format.rs`.) As part of this, we update `test_codex_jsonrpc_conversation_flow()` to verify that the `task_finished` notification has made it through the system instead of sleeping for 5s and "hoping" the server finished processing the task. Note we have seen some flakiness in some of our other, similar integration tests, and I expect adding a similar check would help in those cases, as well.
1 parent e7bad65 commit a62510e

File tree

4 files changed

+75
-13
lines changed

4 files changed

+75
-13
lines changed

codex-rs/mcp-server/src/codex_message_processor.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::error_code::INTERNAL_ERROR_CODE;
1515
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
1616
use crate::json_to_toml::json_to_toml;
1717
use crate::outgoing_message::OutgoingMessageSender;
18-
use crate::outgoing_message::OutgoingNotificationMeta;
18+
use crate::outgoing_message::OutgoingNotification;
1919
use crate::wire_format::AddConversationListenerParams;
2020
use crate::wire_format::AddConversationSubscriptionResponse;
2121
use crate::wire_format::CodexRequest;
@@ -176,7 +176,6 @@ impl CodexMessageProcessor {
176176
self.conversation_listeners
177177
.insert(subscription_id, cancel_tx);
178178
let outgoing_for_task = self.outgoing.clone();
179-
let add_listener_request_id = request_id.clone();
180179
tokio::spawn(async move {
181180
loop {
182181
tokio::select! {
@@ -193,10 +192,24 @@ impl CodexMessageProcessor {
193192
}
194193
};
195194

196-
outgoing_for_task.send_event_as_notification(
197-
&event,
198-
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
199-
)
195+
let method = format!("codex/event/{}", event.msg);
196+
let mut params = match serde_json::to_value(event) {
197+
Ok(serde_json::Value::Object(map)) => map,
198+
Ok(_) => {
199+
tracing::error!("event did not serialize to an object");
200+
continue;
201+
}
202+
Err(err) => {
203+
tracing::error!("failed to serialize event: {err}");
204+
continue;
205+
}
206+
};
207+
params.insert("conversationId".to_string(), conversation_id.to_string().into());
208+
209+
outgoing_for_task.send_notification(OutgoingNotification {
210+
method,
211+
params: Some(params.into()),
212+
})
200213
.await;
201214
}
202215
}

codex-rs/mcp-server/src/outgoing_message.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,21 @@ impl OutgoingMessageSender {
114114
event_json
115115
};
116116

117-
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
117+
self.send_notification(OutgoingNotification {
118118
method: "codex/event".to_string(),
119119
params: Some(params.clone()),
120-
});
121-
let _ = self.sender.send(outgoing_message).await;
120+
})
121+
.await;
122122

123123
self.send_event_as_notification_new_schema(event, Some(params.clone()))
124124
.await;
125125
}
126126

127+
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
128+
let outgoing_message = OutgoingMessage::Notification(notification);
129+
let _ = self.sender.send(outgoing_message).await;
130+
}
131+
127132
// should be backwards compatible.
128133
// it will replace send_event_as_notification eventually.
129134
async fn send_event_as_notification_new_schema(

codex-rs/mcp-server/tests/codex_message_processor_flow.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use mcp_test_support::McpProcess;
1515
use mcp_test_support::create_final_assistant_message_sse_response;
1616
use mcp_test_support::create_mock_chat_completions_server;
1717
use mcp_test_support::create_shell_sse_response;
18+
use mcp_types::JSONRPCNotification;
1819
use mcp_types::JSONRPCResponse;
1920
use mcp_types::RequestId;
2021
use pretty_assertions::assert_eq;
@@ -123,10 +124,26 @@ async fn test_codex_jsonrpc_conversation_flow() {
123124
let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)
124125
.expect("deserialize sendUserMessage response");
125126

126-
// Give the server time to process the user's request.
127-
tokio::time::sleep(std::time::Duration::from_millis(5_000)).await;
128-
129-
// Could verify that some notifications were received?
127+
// Verify the task_finished notification is received.
128+
// Note this also ensures that the final request to the server was made.
129+
let task_finished_notification: JSONRPCNotification = timeout(
130+
DEFAULT_READ_TIMEOUT,
131+
mcp.read_stream_until_notification_message("codex/event/task_complete"),
132+
)
133+
.await
134+
.expect("task_finished_notification timeout")
135+
.expect("task_finished_notification resp");
136+
let serde_json::Value::Object(map) = task_finished_notification
137+
.params
138+
.expect("notification should have params")
139+
else {
140+
panic!("task_finished_notification should have params");
141+
};
142+
assert_eq!(
143+
map.get("conversationId")
144+
.expect("should have conversationId"),
145+
&serde_json::Value::String(conversation_id.to_string())
146+
);
130147

131148
// 4) removeConversationListener
132149
let remove_listener_id = mcp

codex-rs/mcp-server/tests/common/mcp_process.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,33 @@ impl McpProcess {
374374
}
375375
}
376376

377+
pub async fn read_stream_until_notification_message(
378+
&mut self,
379+
method: &str,
380+
) -> anyhow::Result<JSONRPCNotification> {
381+
loop {
382+
let message = self.read_jsonrpc_message().await?;
383+
eprint!("message: {message:?}");
384+
385+
match message {
386+
JSONRPCMessage::Notification(notification) => {
387+
if notification.method == method {
388+
return Ok(notification);
389+
}
390+
}
391+
JSONRPCMessage::Request(_) => {
392+
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
393+
}
394+
JSONRPCMessage::Error(_) => {
395+
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
396+
}
397+
JSONRPCMessage::Response(_) => {
398+
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
399+
}
400+
}
401+
}
402+
}
403+
377404
pub async fn read_stream_until_configured_response_message(
378405
&mut self,
379406
) -> anyhow::Result<String> {

0 commit comments

Comments
 (0)