Skip to content

Commit 255a203

Browse files
committed
fafo
1 parent a6ce1f4 commit 255a203

File tree

1 file changed

+52
-3
lines changed

1 file changed

+52
-3
lines changed

src/jobs/utils.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ use diesel::prelude::*;
33
use diesel::result::Error as DieselError;
44
use log::{error, info};
55
use std::fmt::Write as _;
6+
use std::io::Read;
67
use std::os::unix::process::CommandExt;
78
use std::process::{Command, Output, Stdio};
89
use std::str;
10+
use std::sync::mpsc;
11+
use std::thread;
12+
use std::time::Duration;
913

1014
use crate::config::{Config, RepoConfig};
1115
use crate::errors::{JobError, JobResult};
@@ -128,10 +132,33 @@ macro_rules! job_log_and_error {
128132
}};
129133
}
130134

135+
/// Reads from a stream with timeout to avoid hanging on orphaned pipes
136+
fn read_stream_with_timeout<R: Read + Send + 'static>(
137+
mut stream: R,
138+
timeout: Duration,
139+
) -> Result<Vec<u8>, std::io::Error> {
140+
let (sender, receiver) = mpsc::channel();
141+
142+
thread::spawn(move || {
143+
let mut buffer = Vec::new();
144+
let result = stream.read_to_end(&mut buffer);
145+
let _ = sender.send((result, buffer));
146+
});
147+
148+
match receiver.recv_timeout(timeout) {
149+
Ok((Ok(_bytes_read), buffer)) => Ok(buffer),
150+
Ok((Err(e), _)) => Err(e),
151+
Err(_) => {
152+
// The orphaned child processes can keep running, but we won't wait
153+
Ok(Vec::new())
154+
}
155+
}
156+
}
157+
131158
/// Executes a command and returns its output. A JobError is returned if the command couldn't be executed, but not if
132159
/// it exits with a status code.
133160
pub fn do_command_with_output(cmd: &mut Command) -> JobResult<Output> {
134-
let output = unsafe {
161+
let mut child = unsafe {
135162
cmd.stdin(Stdio::null())
136163
.stdout(Stdio::piped())
137164
.stderr(Stdio::piped())
@@ -141,11 +168,33 @@ pub fn do_command_with_output(cmd: &mut Command) -> JobResult<Output> {
141168
libc::setsid();
142169
Ok(())
143170
})
144-
.output()
171+
.spawn()
145172
.map_err(|e| JobError::new(&format!("Failed to run {:?}: {}", &cmd, e)))?
146173
};
147174

148-
Ok(output)
175+
let exit_status = child
176+
.wait()
177+
.map_err(|e| JobError::new(&format!("Failed to wait for {:?}: {}", &cmd, e)))?;
178+
179+
let stdout = if let Some(stdout_handle) = child.stdout.take() {
180+
read_stream_with_timeout(stdout_handle, Duration::from_secs(30))
181+
.map_err(|e| JobError::new(&format!("Failed to read stdout from {:?}: {}", &cmd, e)))?
182+
} else {
183+
Vec::new()
184+
};
185+
186+
let stderr = if let Some(stderr_handle) = child.stderr.take() {
187+
read_stream_with_timeout(stderr_handle, Duration::from_secs(30))
188+
.map_err(|e| JobError::new(&format!("Failed to read stderr from {:?}: {}", &cmd, e)))?
189+
} else {
190+
Vec::new()
191+
};
192+
193+
Ok(Output {
194+
status: exit_status,
195+
stdout,
196+
stderr,
197+
})
149198
}
150199

151200
/// Executes a command. A JobError is returned if the command exits with an unsuccessful status code.

0 commit comments

Comments
 (0)