Skip to content

Commit 73bd84d

Browse files
authored
fix: try to fix freezes 2 (#9951)
Fixes a TUI freeze caused by awaiting `mpsc::Sender::send()` that blocks the tokio thread, stopping the consumption runtime and creating a deadlock. This could happen if the server was producing enough chunks to fill the `mpsc` fast enough. To solve this we try on insert using a `try_send()` (not requiring an `await`) and delegate to a tokio task if this does not work This is a temporary solution as it can contain races for delta elements and a stronger design should come here
1 parent 32b062d commit 73bd84d

File tree

1 file changed

+60
-2
lines changed

1 file changed

+60
-2
lines changed

codex-rs/tui/src/app.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ use tokio::sync::Mutex;
9393
use tokio::sync::broadcast;
9494
use tokio::sync::mpsc;
9595
use tokio::sync::mpsc::error::TryRecvError;
96+
use tokio::sync::mpsc::error::TrySendError;
9697
use tokio::sync::mpsc::unbounded_channel;
9798
use toml::Value as TomlValue;
9899

@@ -714,8 +715,23 @@ impl App {
714715
guard.active
715716
};
716717

717-
if should_send && let Err(err) = sender.send(event).await {
718-
tracing::warn!("thread {thread_id} event channel closed: {err}");
718+
if should_send {
719+
// Never await a bounded channel send on the main TUI loop: if the receiver falls behind,
720+
// `send().await` can block and the UI stops drawing. If the channel is full, wait in a
721+
// spawned task instead.
722+
match sender.try_send(event) {
723+
Ok(()) => {}
724+
Err(TrySendError::Full(event)) => {
725+
tokio::spawn(async move {
726+
if let Err(err) = sender.send(event).await {
727+
tracing::warn!("thread {thread_id} event channel closed: {err}");
728+
}
729+
});
730+
}
731+
Err(TrySendError::Closed(_)) => {
732+
tracing::warn!("thread {thread_id} event channel closed");
733+
}
734+
}
719735
}
720736
Ok(())
721737
}
@@ -2400,6 +2416,7 @@ mod tests {
24002416
use std::sync::Arc;
24012417
use std::sync::atomic::AtomicBool;
24022418
use tempfile::tempdir;
2419+
use tokio::time;
24032420

24042421
#[test]
24052422
fn normalize_harness_overrides_resolves_relative_add_dirs() -> Result<()> {
@@ -2420,6 +2437,47 @@ mod tests {
24202437
Ok(())
24212438
}
24222439

2440+
#[tokio::test]
2441+
async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> {
2442+
let mut app = make_test_app().await;
2443+
let thread_id = ThreadId::new();
2444+
app.thread_event_channels
2445+
.insert(thread_id, ThreadEventChannel::new(1));
2446+
app.set_thread_active(thread_id, true).await;
2447+
2448+
let event = Event {
2449+
id: String::new(),
2450+
msg: EventMsg::ShutdownComplete,
2451+
};
2452+
2453+
app.enqueue_thread_event(thread_id, event.clone()).await?;
2454+
time::timeout(
2455+
Duration::from_millis(50),
2456+
app.enqueue_thread_event(thread_id, event),
2457+
)
2458+
.await
2459+
.expect("enqueue_thread_event blocked on a full channel")?;
2460+
2461+
let mut rx = app
2462+
.thread_event_channels
2463+
.get_mut(&thread_id)
2464+
.expect("missing thread channel")
2465+
.receiver
2466+
.take()
2467+
.expect("missing receiver");
2468+
2469+
time::timeout(Duration::from_millis(50), rx.recv())
2470+
.await
2471+
.expect("timed out waiting for first event")
2472+
.expect("channel closed unexpectedly");
2473+
time::timeout(Duration::from_millis(50), rx.recv())
2474+
.await
2475+
.expect("timed out waiting for second event")
2476+
.expect("channel closed unexpectedly");
2477+
2478+
Ok(())
2479+
}
2480+
24232481
async fn make_test_app() -> App {
24242482
let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await;
24252483
let config = chat_widget.config_ref().clone();

0 commit comments

Comments
 (0)