Skip to content

Commit 53e28f1

Browse files
authored
Add realtime websocket tracing (openai#12981)
- add transport and conversation logs around connect, close, and parse flow - log realtime transport failures as errors for easier debugging
1 parent 4d180ae commit 53e28f1

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use tokio_tungstenite::WebSocketStream;
2525
use tokio_tungstenite::tungstenite::Error as WsError;
2626
use tokio_tungstenite::tungstenite::Message;
2727
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
28+
use tracing::debug;
29+
use tracing::error;
2830
use tracing::info;
2931
use tracing::trace;
3032
use tungstenite::protocol::WebSocketConfig;
@@ -62,15 +64,23 @@ impl WsStream {
6264
};
6365
match command {
6466
WsCommand::Send { message, tx_result } => {
67+
debug!("realtime websocket sending message");
6568
let result = inner.send(message).await;
6669
let should_break = result.is_err();
70+
if let Err(err) = &result {
71+
error!("realtime websocket send failed: {err}");
72+
}
6773
let _ = tx_result.send(result);
6874
if should_break {
6975
break;
7076
}
7177
}
7278
WsCommand::Close { tx_result } => {
79+
info!("realtime websocket sending close");
7380
let result = inner.close(None).await;
81+
if let Err(err) = &result {
82+
error!("realtime websocket close failed: {err}");
83+
}
7484
let _ = tx_result.send(result);
7585
break;
7686
}
@@ -82,7 +92,9 @@ impl WsStream {
8292
};
8393
match message {
8494
Ok(Message::Ping(payload)) => {
95+
trace!(payload_len = payload.len(), "realtime websocket received ping");
8596
if let Err(err) = inner.send(Message::Pong(payload)).await {
97+
error!("realtime websocket failed to send pong: {err}");
8698
let _ = tx_message.send(Err(err));
8799
break;
88100
}
@@ -93,6 +105,24 @@ impl WsStream {
93105
| Message::Close(_)
94106
| Message::Frame(_))) => {
95107
let is_close = matches!(message, Message::Close(_));
108+
match &message {
109+
Message::Text(_) => trace!("realtime websocket received text frame"),
110+
Message::Binary(binary) => {
111+
error!(
112+
payload_len = binary.len(),
113+
"realtime websocket received unexpected binary frame"
114+
);
115+
}
116+
Message::Close(frame) => info!(
117+
"realtime websocket received close frame: code={:?} reason={:?}",
118+
frame.as_ref().map(|frame| frame.code),
119+
frame.as_ref().map(|frame| frame.reason.as_str())
120+
),
121+
Message::Frame(_) => {
122+
trace!("realtime websocket received raw frame");
123+
}
124+
Message::Ping(_) | Message::Pong(_) => {}
125+
}
96126
if tx_message.send(Ok(message)).is_err() {
97127
break;
98128
}
@@ -101,13 +131,15 @@ impl WsStream {
101131
}
102132
}
103133
Err(err) => {
134+
error!("realtime websocket receive failed: {err}");
104135
let _ = tx_message.send(Err(err));
105136
break;
106137
}
107138
}
108139
}
109140
}
110141
}
142+
info!("realtime websocket pump exiting");
111143
});
112144

113145
(
@@ -298,7 +330,7 @@ impl RealtimeWebsocketWriter {
298330
async fn send_json(&self, message: RealtimeOutboundMessage) -> Result<(), ApiError> {
299331
let payload = serde_json::to_string(&message)
300332
.map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?;
301-
trace!("realtime websocket request: {payload}");
333+
debug!(?message, "realtime websocket request");
302334

303335
if self.is_closed.load(Ordering::SeqCst) {
304336
return Err(ApiError::Stream(
@@ -325,24 +357,33 @@ impl RealtimeWebsocketEvents {
325357
Some(Ok(msg)) => msg,
326358
Some(Err(err)) => {
327359
self.is_closed.store(true, Ordering::SeqCst);
360+
error!("realtime websocket read failed: {err}");
328361
return Err(ApiError::Stream(format!(
329362
"failed to read websocket message: {err}"
330363
)));
331364
}
332365
None => {
333366
self.is_closed.store(true, Ordering::SeqCst);
367+
info!("realtime websocket event stream ended");
334368
return Ok(None);
335369
}
336370
};
337371

338372
match msg {
339373
Message::Text(text) => {
340374
if let Some(event) = parse_realtime_event(&text) {
375+
debug!(?event, "realtime websocket parsed event");
341376
return Ok(Some(event));
342377
}
378+
debug!("realtime websocket ignored unsupported text frame");
343379
}
344-
Message::Close(_) => {
380+
Message::Close(frame) => {
345381
self.is_closed.store(true, Ordering::SeqCst);
382+
info!(
383+
"realtime websocket closed: code={:?} reason={:?}",
384+
frame.as_ref().map(|frame| frame.code),
385+
frame.as_ref().map(|frame| frame.reason.as_str())
386+
);
346387
return Ok(None);
347388
}
348389
Message::Binary(_) => {
@@ -383,15 +424,24 @@ impl RealtimeWebsocketClient {
383424
request.headers_mut().extend(headers);
384425

385426
info!("connecting realtime websocket: {ws_url}");
386-
let (stream, _) =
427+
let (stream, response) =
387428
tokio_tungstenite::connect_async_with_config(request, Some(websocket_config()), false)
388429
.await
389430
.map_err(|err| {
390431
ApiError::Stream(format!("failed to connect realtime websocket: {err}"))
391432
})?;
433+
info!(
434+
ws_url = %ws_url,
435+
status = %response.status(),
436+
"realtime websocket connected"
437+
);
392438

393439
let (stream, rx_message) = WsStream::new(stream);
394440
let connection = RealtimeWebsocketConnection::new(stream, rx_message);
441+
debug!(
442+
conversation_id = config.session_id.as_deref().unwrap_or("<none>"),
443+
"realtime websocket sending session.create"
444+
);
395445
connection
396446
.send_session_create(config.prompt, config.session_id)
397447
.await?;

codex-rs/core/src/realtime_conversation.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tokio::sync::Mutex;
3131
use tokio::task::JoinHandle;
3232
use tracing::debug;
3333
use tracing::error;
34+
use tracing::info;
3435
use tracing::warn;
3536

3637
const AUDIO_IN_QUEUE_CAPACITY: usize = 256;
@@ -184,18 +185,22 @@ pub(crate) async fn handle_start(
184185
let requested_session_id = params
185186
.session_id
186187
.or_else(|| Some(sess.conversation_id.to_string()));
188+
info!("starting realtime conversation");
187189
let events_rx = match sess
188190
.conversation
189191
.start(api_provider, None, prompt, requested_session_id.clone())
190192
.await
191193
{
192194
Ok(events_rx) => events_rx,
193195
Err(err) => {
196+
error!("failed to start realtime conversation: {err}");
194197
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
195198
return Ok(());
196199
}
197200
};
198201

202+
info!("realtime conversation started");
203+
199204
sess.send_event_raw(Event {
200205
id: sub_id.clone(),
201206
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
@@ -211,6 +216,7 @@ pub(crate) async fn handle_start(
211216
msg,
212217
};
213218
while let Ok(event) = events_rx.recv().await {
219+
debug!(conversation_id = %sess_clone.conversation_id, "received realtime conversation event");
214220
let maybe_routed_text = match &event {
215221
RealtimeEvent::ConversationItemAdded(item) => {
216222
realtime_text_from_conversation_item(item)
@@ -231,6 +237,7 @@ pub(crate) async fn handle_start(
231237
.await;
232238
}
233239
if let Some(()) = sess_clone.conversation.running_state().await {
240+
info!("realtime conversation transport closed");
234241
sess_clone
235242
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
236243
RealtimeConversationClosedEvent {
@@ -250,6 +257,7 @@ pub(crate) async fn handle_audio(
250257
params: ConversationAudioParams,
251258
) {
252259
if let Err(err) = sess.conversation.audio_in(params.frame).await {
260+
error!("failed to append realtime audio: {err}");
253261
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
254262
}
255263
}
@@ -284,6 +292,7 @@ pub(crate) async fn handle_text(
284292
debug!(text = %params.text, "[realtime-text] appending realtime conversation text input");
285293

286294
if let Err(err) = sess.conversation.text_in(params.text).await {
295+
error!("failed to append realtime text: {err}");
287296
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
288297
}
289298
}

0 commit comments

Comments
 (0)