Skip to content

Commit 0091930

Browse files
[core] Allow resume after client errors (openai#2053)
## Summary Allow tui conversations to resume after the client fails out of retries. I tested this with exec / mocked api failures as well, and it appears to be fine. But happy to add an exec integration test as well! ## Testing - [x] Added integration test - [x] Tested locally
1 parent a2b9f46 commit 0091930

File tree

3 files changed

+159
-4
lines changed

3 files changed

+159
-4
lines changed

codex-rs/core/src/codex.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
10981098
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
10991099
.await;
11001100

1101-
let last_agent_message: Option<String>;
1101+
let mut last_agent_message: Option<String> = None;
11021102
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
11031103
// many turns, from the perspective of the user, it is a single turn.
11041104
let mut turn_diff_tracker = TurnDiffTracker::new();
@@ -1248,7 +1248,8 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
12481248
}),
12491249
};
12501250
sess.tx_event.send(event).await.ok();
1251-
return;
1251+
// let the user continue the conversation
1252+
break;
12521253
}
12531254
}
12541255
}

codex-rs/core/tests/common/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,26 @@ pub fn load_sse_fixture_with_id(path: impl AsRef<std::path::Path>, id: &str) ->
7373

7474
pub async fn wait_for_event<F>(
7575
codex: &codex_core::Codex,
76-
mut predicate: F,
76+
predicate: F,
7777
) -> codex_core::protocol::EventMsg
7878
where
7979
F: FnMut(&codex_core::protocol::EventMsg) -> bool,
8080
{
8181
use tokio::time::Duration;
82+
wait_for_event_with_timeout(codex, predicate, Duration::from_secs(1)).await
83+
}
84+
85+
pub async fn wait_for_event_with_timeout<F>(
86+
codex: &codex_core::Codex,
87+
mut predicate: F,
88+
wait_time: tokio::time::Duration,
89+
) -> codex_core::protocol::EventMsg
90+
where
91+
F: FnMut(&codex_core::protocol::EventMsg) -> bool,
92+
{
8293
use tokio::time::timeout;
8394
loop {
84-
let ev = timeout(Duration::from_secs(1), codex.next_event())
95+
let ev = timeout(wait_time, codex.next_event())
8596
.await
8697
.expect("timeout waiting for event")
8798
.expect("stream ended unexpectedly");
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
use std::time::Duration;
2+
3+
use codex_core::Codex;
4+
use codex_core::CodexSpawnOk;
5+
use codex_core::ModelProviderInfo;
6+
use codex_core::WireApi;
7+
use codex_core::protocol::EventMsg;
8+
use codex_core::protocol::InputItem;
9+
use codex_core::protocol::Op;
10+
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
11+
use codex_login::CodexAuth;
12+
use core_test_support::load_default_config_for_test;
13+
use core_test_support::load_sse_fixture_with_id;
14+
use core_test_support::wait_for_event_with_timeout;
15+
use tempfile::TempDir;
16+
use wiremock::Mock;
17+
use wiremock::MockServer;
18+
use wiremock::ResponseTemplate;
19+
use wiremock::matchers::body_string_contains;
20+
use wiremock::matchers::method;
21+
use wiremock::matchers::path;
22+
23+
fn sse_completed(id: &str) -> String {
24+
load_sse_fixture_with_id("tests/fixtures/completed_template.json", id)
25+
}
26+
27+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
28+
async fn continue_after_stream_error() {
29+
#![allow(clippy::unwrap_used)]
30+
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
31+
println!(
32+
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
33+
);
34+
return;
35+
}
36+
37+
let server = MockServer::start().await;
38+
39+
let fail = ResponseTemplate::new(500)
40+
.insert_header("content-type", "application/json")
41+
.set_body_string(
42+
serde_json::json!({
43+
"error": {"type": "bad_request", "message": "synthetic client error"}
44+
})
45+
.to_string(),
46+
);
47+
48+
// The provider below disables request retries (request_max_retries = 0),
49+
// so the failing request should only occur once.
50+
Mock::given(method("POST"))
51+
.and(path("/v1/responses"))
52+
.and(body_string_contains("first message"))
53+
.respond_with(fail)
54+
.up_to_n_times(2)
55+
.mount(&server)
56+
.await;
57+
58+
let ok = ResponseTemplate::new(200)
59+
.insert_header("content-type", "text/event-stream")
60+
.set_body_raw(sse_completed("resp_ok2"), "text/event-stream");
61+
62+
Mock::given(method("POST"))
63+
.and(path("/v1/responses"))
64+
.and(body_string_contains("follow up"))
65+
.respond_with(ok)
66+
.expect(1)
67+
.mount(&server)
68+
.await;
69+
70+
// Configure a provider that uses the Responses API and points at our mock
71+
// server. Use an existing env var (PATH) to satisfy the auth plumbing
72+
// without requiring a real secret.
73+
let provider = ModelProviderInfo {
74+
name: "mock-openai".into(),
75+
base_url: Some(format!("{}/v1", server.uri())),
76+
env_key: Some("PATH".into()),
77+
env_key_instructions: None,
78+
wire_api: WireApi::Responses,
79+
query_params: None,
80+
http_headers: None,
81+
env_http_headers: None,
82+
request_max_retries: Some(1),
83+
stream_max_retries: Some(1),
84+
stream_idle_timeout_ms: Some(2_000),
85+
requires_openai_auth: false,
86+
};
87+
88+
let home = TempDir::new().unwrap();
89+
let mut config = load_default_config_for_test(&home);
90+
config.base_instructions = Some("You are a helpful assistant".to_string());
91+
config.model_provider = provider;
92+
93+
let CodexSpawnOk { codex, .. } = Codex::spawn(
94+
config,
95+
Some(CodexAuth::from_api_key("Test API Key")),
96+
std::sync::Arc::new(tokio::sync::Notify::new()),
97+
)
98+
.await
99+
.unwrap();
100+
101+
codex
102+
.submit(Op::UserInput {
103+
items: vec![InputItem::Text {
104+
text: "first message".into(),
105+
}],
106+
})
107+
.await
108+
.unwrap();
109+
110+
// Expect an Error followed by TaskComplete so the session is released.
111+
wait_for_event_with_timeout(
112+
&codex,
113+
|ev| matches!(ev, EventMsg::Error(_)),
114+
Duration::from_secs(5),
115+
)
116+
.await;
117+
118+
wait_for_event_with_timeout(
119+
&codex,
120+
|ev| matches!(ev, EventMsg::TaskComplete(_)),
121+
Duration::from_secs(5),
122+
)
123+
.await;
124+
125+
// 2) Second turn: now send another prompt that should succeed using the
126+
// mock server SSE stream. If the agent failed to clear the running task on
127+
// error above, this submission would be rejected/queued indefinitely.
128+
codex
129+
.submit(Op::UserInput {
130+
items: vec![InputItem::Text {
131+
text: "follow up".into(),
132+
}],
133+
})
134+
.await
135+
.unwrap();
136+
137+
wait_for_event_with_timeout(
138+
&codex,
139+
|ev| matches!(ev, EventMsg::TaskComplete(_)),
140+
Duration::from_secs(5),
141+
)
142+
.await;
143+
}

0 commit comments

Comments
 (0)