Skip to content

Commit b4ab7c1

Browse files
authored
Flaky CI fix (openai#1647)
Flushing before sending `TaskCompleteEvent` and ending the submission loop to avoid race conditions.
1 parent 084236f commit b4ab7c1

File tree

10 files changed

+153
-47
lines changed

10 files changed

+153
-47
lines changed

codex-rs/core/src/codex.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,37 @@ async fn submission_loop(
812812
}
813813
});
814814
}
815+
Op::Shutdown => {
816+
info!("Shutting down Codex instance");
817+
818+
// Gracefully flush and shutdown rollout recorder on session end so tests
819+
// that inspect the rollout file do not race with the background writer.
820+
if let Some(sess_arc) = sess {
821+
let recorder_opt = sess_arc.rollout.lock().unwrap().take();
822+
if let Some(rec) = recorder_opt {
823+
if let Err(e) = rec.shutdown().await {
824+
warn!("failed to shutdown rollout recorder: {e}");
825+
let event = Event {
826+
id: sub.id.clone(),
827+
msg: EventMsg::Error(ErrorEvent {
828+
message: "Failed to shutdown rollout recorder".to_string(),
829+
}),
830+
};
831+
if let Err(e) = tx_event.send(event).await {
832+
warn!("failed to send error message: {e:?}");
833+
}
834+
}
835+
}
836+
}
837+
let event = Event {
838+
id: sub.id.clone(),
839+
msg: EventMsg::ShutdownComplete,
840+
};
841+
if let Err(e) = tx_event.send(event).await {
842+
warn!("failed to send Shutdown event: {e}");
843+
}
844+
break;
845+
}
815846
}
816847
}
817848
debug!("Agent loop exited");

codex-rs/core/src/protocol.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ pub enum Op {
116116

117117
/// Request a single history entry identified by `log_id` + `offset`.
118118
GetHistoryEntryRequest { offset: usize, log_id: u64 },
119+
120+
/// Request to shut down codex instance.
121+
Shutdown,
119122
}
120123

121124
/// Determines the conditions under which the user is consulted to approve
@@ -326,6 +329,9 @@ pub enum EventMsg {
326329

327330
/// Response to GetHistoryEntryRequest.
328331
GetHistoryEntryResponse(GetHistoryEntryResponseEvent),
332+
333+
/// Notification that the agent is shutting down.
334+
ShutdownComplete,
329335
}
330336

331337
// Individual event payload types matching each `EventMsg` variant.

codex-rs/core/src/rollout.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use time::macros::format_description;
1414
use tokio::io::AsyncWriteExt;
1515
use tokio::sync::mpsc::Sender;
1616
use tokio::sync::mpsc::{self};
17+
use tokio::sync::oneshot;
1718
use tracing::info;
1819
use tracing::warn;
1920
use uuid::Uuid;
@@ -57,10 +58,10 @@ pub(crate) struct RolloutRecorder {
5758
tx: Sender<RolloutCmd>,
5859
}
5960

60-
#[derive(Clone)]
6161
enum RolloutCmd {
6262
AddItems(Vec<ResponseItem>),
6363
UpdateState(SessionStateSnapshot),
64+
Shutdown { ack: oneshot::Sender<()> },
6465
}
6566

6667
impl RolloutRecorder {
@@ -204,6 +205,21 @@ impl RolloutRecorder {
204205
info!("Resumed rollout successfully from {path:?}");
205206
Ok((Self { tx }, saved))
206207
}
208+
209+
pub async fn shutdown(&self) -> std::io::Result<()> {
210+
let (tx_done, rx_done) = oneshot::channel();
211+
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
212+
Ok(_) => rx_done
213+
.await
214+
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))),
215+
Err(e) => {
216+
warn!("failed to send rollout shutdown command: {e}");
217+
Err(IoError::other(format!(
218+
"failed to send rollout shutdown command: {e}"
219+
)))
220+
}
221+
}
222+
}
207223
}
208224

209225
struct LogFileInfo {
@@ -299,6 +315,9 @@ async fn rollout_writer(
299315
let _ = file.flush().await;
300316
}
301317
}
318+
RolloutCmd::Shutdown { ack } => {
319+
let _ = ack.send(());
320+
}
302321
}
303322
}
304323
}

codex-rs/exec/src/event_processor.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1+
use std::path::Path;
2+
13
use codex_common::summarize_sandbox_policy;
24
use codex_core::WireApi;
35
use codex_core::config::Config;
46
use codex_core::model_supports_reasoning_summaries;
57
use codex_core::protocol::Event;
68

9+
pub(crate) enum CodexStatus {
10+
Running,
11+
InitiateShutdown,
12+
Shutdown,
13+
}
14+
715
pub(crate) trait EventProcessor {
816
/// Print summary of effective configuration and user prompt.
917
fn print_config_summary(&mut self, config: &Config, prompt: &str);
1018

1119
/// Handle a single event emitted by the agent.
12-
fn process_event(&mut self, event: Event);
20+
fn process_event(&mut self, event: Event) -> CodexStatus;
1321
}
1422

1523
pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static str, String)> {
@@ -35,3 +43,28 @@ pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static st
3543

3644
entries
3745
}
46+
47+
pub(crate) fn handle_last_message(
48+
last_agent_message: Option<&str>,
49+
last_message_path: Option<&Path>,
50+
) {
51+
match (last_message_path, last_agent_message) {
52+
(Some(path), Some(msg)) => write_last_message_file(msg, Some(path)),
53+
(Some(path), None) => {
54+
write_last_message_file("", Some(path));
55+
eprintln!(
56+
"Warning: no last agent message; wrote empty content to {}",
57+
path.display()
58+
);
59+
}
60+
(None, _) => eprintln!("Warning: no file to write last message to."),
61+
}
62+
}
63+
64+
fn write_last_message_file(contents: &str, last_message_path: Option<&Path>) {
65+
if let Some(path) = last_message_path {
66+
if let Err(e) = std::fs::write(path, contents) {
67+
eprintln!("Failed to write last message file {path:?}: {e}");
68+
}
69+
}
70+
}

codex-rs/exec/src/event_processor_with_human_output.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@ use codex_core::protocol::McpToolCallEndEvent;
1515
use codex_core::protocol::PatchApplyBeginEvent;
1616
use codex_core::protocol::PatchApplyEndEvent;
1717
use codex_core::protocol::SessionConfiguredEvent;
18+
use codex_core::protocol::TaskCompleteEvent;
1819
use codex_core::protocol::TokenUsage;
1920
use owo_colors::OwoColorize;
2021
use owo_colors::Style;
2122
use shlex::try_join;
2223
use std::collections::HashMap;
2324
use std::io::Write;
25+
use std::path::PathBuf;
2426
use std::time::Instant;
2527

28+
use crate::event_processor::CodexStatus;
2629
use crate::event_processor::EventProcessor;
2730
use crate::event_processor::create_config_summary_entries;
31+
use crate::event_processor::handle_last_message;
2832

2933
/// This should be configurable. When used in CI, users may not want to impose
3034
/// a limit so they can see the full transcript.
@@ -54,10 +58,15 @@ pub(crate) struct EventProcessorWithHumanOutput {
5458
show_agent_reasoning: bool,
5559
answer_started: bool,
5660
reasoning_started: bool,
61+
last_message_path: Option<PathBuf>,
5762
}
5863

5964
impl EventProcessorWithHumanOutput {
60-
pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self {
65+
pub(crate) fn create_with_ansi(
66+
with_ansi: bool,
67+
config: &Config,
68+
last_message_path: Option<PathBuf>,
69+
) -> Self {
6170
let call_id_to_command = HashMap::new();
6271
let call_id_to_patch = HashMap::new();
6372
let call_id_to_tool_call = HashMap::new();
@@ -77,6 +86,7 @@ impl EventProcessorWithHumanOutput {
7786
show_agent_reasoning: !config.hide_agent_reasoning,
7887
answer_started: false,
7988
reasoning_started: false,
89+
last_message_path,
8090
}
8191
} else {
8292
Self {
@@ -93,6 +103,7 @@ impl EventProcessorWithHumanOutput {
93103
show_agent_reasoning: !config.hide_agent_reasoning,
94104
answer_started: false,
95105
reasoning_started: false,
106+
last_message_path,
96107
}
97108
}
98109
}
@@ -158,7 +169,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
158169
);
159170
}
160171

161-
fn process_event(&mut self, event: Event) {
172+
fn process_event(&mut self, event: Event) -> CodexStatus {
162173
let Event { id: _, msg } = event;
163174
match msg {
164175
EventMsg::Error(ErrorEvent { message }) => {
@@ -168,9 +179,16 @@ impl EventProcessor for EventProcessorWithHumanOutput {
168179
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
169180
ts_println!(self, "{}", message.style(self.dimmed));
170181
}
171-
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
182+
EventMsg::TaskStarted => {
172183
// Ignore.
173184
}
185+
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
186+
handle_last_message(
187+
last_agent_message.as_deref(),
188+
self.last_message_path.as_deref(),
189+
);
190+
return CodexStatus::InitiateShutdown;
191+
}
174192
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
175193
ts_println!(self, "tokens used: {total_tokens}");
176194
}
@@ -185,7 +203,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
185203
}
186204
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
187205
if !self.show_agent_reasoning {
188-
return;
206+
return CodexStatus::Running;
189207
}
190208
if !self.reasoning_started {
191209
ts_println!(
@@ -498,7 +516,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
498516
EventMsg::GetHistoryEntryResponse(_) => {
499517
// Currently ignored in exec output.
500518
}
519+
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
501520
}
521+
CodexStatus::Running
502522
}
503523
}
504524

codex-rs/exec/src/event_processor_with_json_output.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
use std::collections::HashMap;
2+
use std::path::PathBuf;
23

34
use codex_core::config::Config;
45
use codex_core::protocol::Event;
56
use codex_core::protocol::EventMsg;
7+
use codex_core::protocol::TaskCompleteEvent;
68
use serde_json::json;
79

10+
use crate::event_processor::CodexStatus;
811
use crate::event_processor::EventProcessor;
912
use crate::event_processor::create_config_summary_entries;
13+
use crate::event_processor::handle_last_message;
1014

11-
pub(crate) struct EventProcessorWithJsonOutput;
15+
pub(crate) struct EventProcessorWithJsonOutput {
16+
last_message_path: Option<PathBuf>,
17+
}
1218

1319
impl EventProcessorWithJsonOutput {
14-
pub fn new() -> Self {
15-
Self {}
20+
pub fn new(last_message_path: Option<PathBuf>) -> Self {
21+
Self { last_message_path }
1622
}
1723
}
1824

@@ -33,15 +39,25 @@ impl EventProcessor for EventProcessorWithJsonOutput {
3339
println!("{prompt_json}");
3440
}
3541

36-
fn process_event(&mut self, event: Event) {
42+
fn process_event(&mut self, event: Event) -> CodexStatus {
3743
match event.msg {
3844
EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) => {
3945
// Suppress streaming events in JSON mode.
46+
CodexStatus::Running
47+
}
48+
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
49+
handle_last_message(
50+
last_agent_message.as_deref(),
51+
self.last_message_path.as_deref(),
52+
);
53+
CodexStatus::InitiateShutdown
4054
}
55+
EventMsg::ShutdownComplete => CodexStatus::Shutdown,
4156
_ => {
4257
if let Ok(line) = serde_json::to_string(&event) {
4358
println!("{line}");
4459
}
60+
CodexStatus::Running
4561
}
4662
}
4763
}

codex-rs/exec/src/lib.rs

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ mod event_processor_with_json_output;
55

66
use std::io::IsTerminal;
77
use std::io::Read;
8-
use std::path::Path;
98
use std::path::PathBuf;
109
use std::sync::Arc;
1110

@@ -28,6 +27,7 @@ use tracing::error;
2827
use tracing::info;
2928
use tracing_subscriber::EnvFilter;
3029

30+
use crate::event_processor::CodexStatus;
3131
use crate::event_processor::EventProcessor;
3232

3333
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
@@ -123,11 +123,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
123123

124124
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
125125
let mut event_processor: Box<dyn EventProcessor> = if json_mode {
126-
Box::new(EventProcessorWithJsonOutput::new())
126+
Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
127127
} else {
128128
Box::new(EventProcessorWithHumanOutput::create_with_ansi(
129129
stdout_with_ansi,
130130
&config,
131+
last_message_file.clone(),
131132
))
132133
};
133134

@@ -224,40 +225,17 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
224225

225226
// Run the loop until the task is complete.
226227
while let Some(event) = rx.recv().await {
227-
let (is_last_event, last_assistant_message) = match &event.msg {
228-
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
229-
(true, last_agent_message.clone())
228+
let shutdown: CodexStatus = event_processor.process_event(event);
229+
match shutdown {
230+
CodexStatus::Running => continue,
231+
CodexStatus::InitiateShutdown => {
232+
codex.submit(Op::Shutdown).await?;
233+
}
234+
CodexStatus::Shutdown => {
235+
break;
230236
}
231-
_ => (false, None),
232-
};
233-
event_processor.process_event(event);
234-
if is_last_event {
235-
handle_last_message(last_assistant_message, last_message_file.as_deref())?;
236-
break;
237237
}
238238
}
239239

240240
Ok(())
241241
}
242-
243-
fn handle_last_message(
244-
last_agent_message: Option<String>,
245-
last_message_file: Option<&Path>,
246-
) -> std::io::Result<()> {
247-
match (last_agent_message, last_message_file) {
248-
(Some(last_agent_message), Some(last_message_file)) => {
249-
// Last message and a file to write to.
250-
std::fs::write(last_message_file, last_agent_message)?;
251-
}
252-
(None, Some(last_message_file)) => {
253-
eprintln!(
254-
"Warning: No last message to write to file: {}",
255-
last_message_file.to_string_lossy()
256-
);
257-
}
258-
(_, None) => {
259-
// No last message and no file to write to.
260-
}
261-
}
262-
Ok(())
263-
}

0 commit comments

Comments
 (0)