Skip to content

Commit 10c181f

Browse files
apollo_node: fix awk print race condition in integration test stdout (#11420)
1 parent 175fff5 commit 10c181f

File tree

1 file changed

+43
-40
lines changed

1 file changed

+43
-40
lines changed

crates/apollo_node/src/test_utils/node_runner.rs

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::fs::create_dir_all;
2+
use std::io::{stdout, Stdout, Write};
23
use std::path::PathBuf;
34
use std::process::Stdio;
5+
use std::sync::{Mutex, OnceLock};
46

57
use apollo_config::CONFIG_FILE_ARG;
68
use apollo_infra_utils::command::create_shell_command;
@@ -15,6 +17,30 @@ use tracing::{error, info, instrument};
1517
pub const NODE_EXECUTABLE_PATH: &str = "target/debug/apollo_node";
1618
const TEMP_LOGS_DIR: &str = "integration_test_temporary_logs";
1719

20+
/// Global synchronized stdout writer to prevent race conditions when multiple
21+
/// node processes write their annotated output concurrently.
22+
static STDOUT_WRITER: OnceLock<Mutex<Stdout>> = OnceLock::new();
23+
24+
fn get_stdout_writer() -> &'static Mutex<Stdout> {
25+
STDOUT_WRITER.get_or_init(|| Mutex::new(stdout()))
26+
}
27+
28+
/// Writes an annotated line to stdout atomically (with synchronization).
29+
fn write_annotated_stdout_line(prefix: &str, line: &str) {
30+
let writer = get_stdout_writer();
31+
if let Ok(mut stdout) = writer.lock() {
32+
writeln!(stdout, "{} {}", prefix, line).expect("Should be able to write to stdout.");
33+
stdout.flush().expect("Should be able to flush stdout.");
34+
}
35+
}
36+
37+
/// Writes a line with a newline to an async file.
38+
async fn write_file_line(file: &mut File, line: &str) {
39+
if let Err(e) = file.write_all(format!("{line}\n").as_bytes()).await {
40+
error!("Failed to write to file: {}", e);
41+
}
42+
}
43+
1844
#[derive(Debug, Clone)]
1945
pub struct NodeRunner {
2046
node_index: usize,
@@ -44,7 +70,7 @@ pub fn spawn_run_node(
4470
AbortOnDropHandle::new(task::spawn(async move {
4571
info!("Running the node from its spawned task.");
4672
// Obtain handles, as the processes and task are terminated when their handles are dropped.
47-
let (mut node_handle, _annotator_handle, _pipe_task) =
73+
let (mut node_handle, _pipe_task) =
4874
spawn_node_child_process(node_config_paths, node_runner.clone()).await;
4975
let _node_run_result = node_handle.
5076
wait(). // Runs the node until completion, should be running indefinitely.
@@ -57,7 +83,7 @@ pub fn spawn_run_node(
5783
async fn spawn_node_child_process(
5884
node_config_paths: Vec<PathBuf>,
5985
node_runner: NodeRunner,
60-
) -> (Child, Child, AbortOnDropHandle<()>) {
86+
) -> (Child, AbortOnDropHandle<()>) {
6187
info!("Getting the node executable.");
6288
let node_executable = get_node_executable_path();
6389

@@ -78,26 +104,18 @@ async fn spawn_node_child_process(
78104
.spawn()
79105
.expect("Spawning sequencer node should succeed.");
80106

81-
let mut annotator_process: Child = create_shell_command("awk")
82-
.arg("-v")
83-
// Print the prefix in different colors.
84-
.arg(format!("prefix=\u{1b}[3{}m{}\u{1b}[0m", node_runner.node_index+1, node_runner.get_description()))
85-
.arg("{print prefix, $0}")
86-
.stdin(std::process::Stdio::piped())
87-
.stderr(Stdio::inherit())
88-
.stdout(Stdio::inherit())
89-
.kill_on_drop(true) // Required for stopping when the handle is dropped.
90-
.spawn()
91-
.expect("Spawning node output annotation should succeed.");
92-
93-
info!("Node PID: {:?}, Annotator PID: {:?}", node_process.id(), annotator_process.id());
107+
// Print the prefix in different colors.
108+
let prefix = format!(
109+
"\u{1b}[3{}m{}\u{1b}[0m",
110+
node_runner.node_index + 1,
111+
node_runner.get_description()
112+
);
113+
info!("Node PID: {:?}", node_process.id());
94114

95-
// Get the node stdout and the annotator stdin.
115+
// Get the node stdout.
96116
let node_stdout = node_process.stdout.take().expect("Node stdout should be available.");
97-
let mut annotator_stdin =
98-
annotator_process.stdin.take().expect("Annotator stdin should be available.");
99117

100-
// Spawn a task to connect the node stdout with the annotator stdin.
118+
// Spawn a task to read node stdout and write to both file and synchronized stdout.
101119
let pipe_task = AbortOnDropHandle::new(tokio::spawn(async move {
102120
let mut reader = BufReader::new(node_stdout).lines();
103121
info!("Writing node logs to file: {:?}", node_runner.logs_file_path());
@@ -106,35 +124,20 @@ async fn spawn_node_child_process(
106124
while let Some(line) = reader.next_line().await.transpose() {
107125
match line {
108126
Ok(line) => {
109-
// Write to annotator stdin
110-
if let Err(e) = annotator_stdin.write_all(line.as_bytes()).await {
111-
error!("Failed to write to annotator stdin: {}", e);
112-
}
113-
if let Err(e) = annotator_stdin.write_all(b"\n").await {
114-
error!("Failed to write newline to annotator stdin: {}", e);
115-
}
116-
117-
// Write to file
118-
if let Err(e) = file.write_all(line.as_bytes()).await {
119-
error!("Failed to write to file: {}", e);
120-
}
121-
if let Err(e) = file.write_all(b"\n").await {
122-
error!("Failed to write newline to file: {}", e);
123-
}
127+
// Write annotated line to synchronized stdout.
128+
write_annotated_stdout_line(&prefix, &line);
129+
130+
// Write to file.
131+
write_file_line(&mut file, &line).await;
124132
}
125133
Err(e) => {
126134
error!("Error while reading node stdout: {}", e);
127135
}
128136
}
129137
}
130-
131-
// Close the annotator stdin when done.
132-
if let Err(e) = annotator_stdin.shutdown().await {
133-
error!("Failed to shut down annotator stdin: {}", e);
134-
}
135138
}));
136139

137-
(node_process, annotator_process, pipe_task)
140+
(node_process, pipe_task)
138141
}
139142

140143
pub fn get_node_executable_path() -> String {

0 commit comments

Comments
 (0)