Skip to content

Commit ec18dbb

Browse files
committed
Update output logging to stream in realtime (closes #138)
1 parent 05be271 commit ec18dbb

File tree

2 files changed

+100
-158
lines changed

2 files changed

+100
-158
lines changed

src/factotum/executor/execution_strategy/mod.rs

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
#[cfg(test)]
1616
mod tests;
17-
use std::process::Command;
17+
use std::process::{Command, Stdio};
1818
use std::time::{Instant, Duration};
19+
use std::io::{BufRead, BufReader};
20+
use std::thread;
21+
use colored::*;
1922

2023
#[derive(Clone, PartialEq, Debug)]
2124
pub struct RunResult {
@@ -65,34 +68,92 @@ pub fn execute_simulation(name: &str, command: &mut Command) -> RunResult {
6568
pub fn execute_os(name: &str, command: &mut Command) -> RunResult {
6669
let run_start = Instant::now();
6770
info!("Executing sh {:?}", command);
68-
match command.output() {
69-
Ok(r) => {
70-
let run_duration = run_start.elapsed();
71-
let return_code = r.status.code().unwrap_or(1); // 1 will be returned if the process was killed by a signal
72-
73-
let task_stdout: String = String::from_utf8_lossy(&r.stdout).trim_right().into();
74-
let task_stderr: String = String::from_utf8_lossy(&r.stderr).trim_right().into();
75-
76-
info!("task '{}' stdout:\n'{}'", name, task_stdout);
77-
info!("task '{}' stderr:\n'{}'", name, task_stderr);
78-
79-
let task_stdout_opt = if task_stdout.is_empty() {
80-
None
81-
} else {
82-
Some(task_stdout)
83-
};
84-
let task_stderr_opt = if task_stderr.is_empty() {
85-
None
86-
} else {
87-
Some(task_stderr)
88-
};
8971

90-
RunResult {
91-
duration: run_duration,
92-
task_execution_error: None,
93-
stdout: task_stdout_opt,
94-
stderr: task_stderr_opt,
95-
return_code: return_code,
72+
command.stdout(Stdio::piped());
73+
command.stderr(Stdio::piped());
74+
75+
match command.spawn() {
76+
Ok(mut child) => {
77+
let stdout = child.stdout.take().expect("Failed to capture stdout");
78+
let stderr = child.stderr.take().expect("Failed to capture stderr");
79+
80+
let name_stdout = name.to_string();
81+
let name_stderr = name.to_string();
82+
83+
let stdout_handle = thread::spawn(move || {
84+
let reader = BufReader::new(stdout);
85+
let mut lines = Vec::new();
86+
for line in reader.lines() {
87+
if let Ok(line) = line {
88+
println!("[{}] {}", name_stdout.cyan(), line);
89+
lines.push(line);
90+
}
91+
}
92+
lines.join("\n")
93+
});
94+
95+
let stderr_handle = thread::spawn(move || {
96+
let reader = BufReader::new(stderr);
97+
let mut lines = Vec::new();
98+
for line in reader.lines() {
99+
if let Ok(line) = line {
100+
eprintln!("[{}] {}", name_stderr.cyan(), line.red());
101+
lines.push(line);
102+
}
103+
}
104+
lines.join("\n")
105+
});
106+
107+
match child.wait() {
108+
Ok(status) => {
109+
let run_duration = run_start.elapsed();
110+
let return_code = status.code().unwrap_or(1);
111+
112+
let task_stdout = match stdout_handle.join() {
113+
Ok(output) => output,
114+
Err(_) => {
115+
warn!("stdout reader thread panicked for task '{}'", name);
116+
String::new()
117+
}
118+
};
119+
let task_stderr = match stderr_handle.join() {
120+
Ok(output) => output,
121+
Err(_) => {
122+
warn!("stderr reader thread panicked for task '{}'", name);
123+
String::new()
124+
}
125+
};
126+
127+
info!("task '{}' completed with return code {}", name, return_code);
128+
129+
let task_stdout_opt = if task_stdout.is_empty() {
130+
None
131+
} else {
132+
Some(task_stdout)
133+
};
134+
let task_stderr_opt = if task_stderr.is_empty() {
135+
None
136+
} else {
137+
Some(task_stderr)
138+
};
139+
140+
RunResult {
141+
duration: run_duration,
142+
task_execution_error: None,
143+
stdout: task_stdout_opt,
144+
stderr: task_stderr_opt,
145+
return_code: return_code,
146+
}
147+
}
148+
Err(e) => {
149+
RunResult {
150+
duration: run_start.elapsed(),
151+
task_execution_error: Some(format!("Error waiting for process - {}", e)),
152+
stdout: None,
153+
stderr: None,
154+
return_code: -1,
155+
}
156+
}
96157
}
97158
}
98159
Err(message) => {

src/main.rs

Lines changed: 11 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -245,34 +245,21 @@ fn get_task_result_line_str(task_result: &Task<&FactfileTask>) -> (String, Optio
245245
return (result, stderr);
246246
}
247247

248-
fn get_task_results_str(task_results: &Vec<&Task<&FactfileTask>>) -> (String, String) {
249-
let mut stderr = String::new();
250-
let mut stdout = String::new();
251-
248+
fn get_task_summary_str(task_results: &Vec<&Task<&FactfileTask>>) -> String {
252249
let mut total_run_time = Duration::new(0, 0);
253250
let mut executed = 0;
254251

255252
for task in task_results.iter() {
256-
let (task_stdout, task_stderr) = get_task_result_line_str(task);
257-
stdout.push_str(&task_stdout);
258-
259-
if let Some(task_stderr_str) = task_stderr {
260-
stderr.push_str(&task_stderr_str);
261-
}
262-
263253
if let Some(ref run_result) = task.run_result {
264254
total_run_time = total_run_time + run_result.duration;
265255
executed += 1;
266256
}
267257
}
268258

269-
let summary = format!("{}/{} tasks run in {}\n",
270-
executed,
271-
task_results.len(),
272-
get_duration_as_string(&total_run_time));
273-
stdout.push_str(&summary.green().to_string());
274-
275-
(stdout, stderr)
259+
format!("{}/{} tasks run in {}\n",
260+
executed,
261+
task_results.len(),
262+
get_duration_as_string(&total_run_time)).green().to_string()
276263
}
277264

278265
fn validate_start_task(job: &Factfile, start_task: &str) -> Result<(), &'static str> {
@@ -422,18 +409,12 @@ fn parse_file_and_execute_with_strategy<F>(factfile: &str,
422409
let normal_completion = !has_errors && !has_early_finish;
423410

424411
let result = if normal_completion {
425-
let (stdout_summary, stderr_summary) = get_task_results_str(&tasks);
426-
print!("{}", stdout_summary);
427-
if !stderr_summary.trim_right().is_empty() {
428-
print_err!("{}", stderr_summary.trim_right());
429-
}
412+
let summary = get_task_summary_str(&tasks);
413+
print!("{}", summary);
430414
PROC_SUCCESS
431415
} else if has_early_finish && !has_errors {
432-
let (stdout_summary, stderr_summary) = get_task_results_str(&tasks);
433-
print!("{}", stdout_summary);
434-
if !stderr_summary.trim_right().is_empty() {
435-
print_err!("{}", stderr_summary.trim_right());
436-
}
416+
let summary = get_task_summary_str(&tasks);
417+
print!("{}", summary);
437418
let incomplete_tasks = tasks.iter()
438419
.filter(|r| !r.run_result.is_some())
439420
.map(|r| format!("'{}'", r.name.cyan()))
@@ -453,12 +434,8 @@ fn parse_file_and_execute_with_strategy<F>(factfile: &str,
453434
incomplete_tasks);
454435
PROC_SUCCESS
455436
} else {
456-
let (stdout_summary, stderr_summary) = get_task_results_str(&tasks);
457-
print!("{}", stdout_summary);
458-
459-
if !stderr_summary.trim_right().is_empty() {
460-
print_err!("{}", stderr_summary.trim_right());
461-
}
437+
let summary = get_task_summary_str(&tasks);
438+
print!("{}", summary);
462439

463440
let incomplete_tasks = tasks.iter()
464441
.filter(|r| !r.run_result.is_some())
@@ -1189,102 +1166,6 @@ fn test_get_task_result_line_str() {
11891166

11901167
}
11911168

1192-
#[test]
1193-
fn test_get_task_results_str_summary() {
1194-
use chrono::UTC;
1195-
use factotum::executor::execution_strategy::RunResult;
1196-
use factotum::factfile::{Task as FactfileTask, OnResult};
1197-
1198-
let dt = UTC::now();
1199-
1200-
let task_one_spec = FactfileTask {
1201-
name: "hello world".to_string(),
1202-
depends_on: vec![],
1203-
executor: "".to_string(),
1204-
command: "".to_string(),
1205-
arguments: vec![],
1206-
on_result: OnResult {
1207-
terminate_job: vec![],
1208-
continue_job: vec![],
1209-
},
1210-
};
1211-
1212-
let task_one = Task::<&FactfileTask> {
1213-
name: String::from("hello world"),
1214-
// children: vec![],
1215-
state: State::Success,
1216-
task_spec: &task_one_spec,
1217-
run_started: Some(dt),
1218-
run_result: Some(RunResult {
1219-
duration: Duration::from_secs(20),
1220-
task_execution_error: None,
1221-
stdout: Some(String::from("hello world")),
1222-
stderr: Some(String::from("Mistake")),
1223-
return_code: 0,
1224-
}),
1225-
};
1226-
1227-
1228-
let task_two_spec = FactfileTask {
1229-
name: "hello world 2".to_string(),
1230-
depends_on: vec![],
1231-
executor: "".to_string(),
1232-
command: "".to_string(),
1233-
arguments: vec![],
1234-
on_result: OnResult {
1235-
terminate_job: vec![],
1236-
continue_job: vec![],
1237-
},
1238-
};
1239-
1240-
let task_two = Task::<&FactfileTask> {
1241-
name: String::from("hello world 2"),
1242-
// children: vec![],
1243-
state: State::Success,
1244-
task_spec: &task_two_spec,
1245-
run_started: Some(dt),
1246-
run_result: Some(RunResult {
1247-
duration: Duration::from_secs(80),
1248-
task_execution_error: None,
1249-
stdout: Some(String::from("hello world")),
1250-
stderr: Some(String::from("Mistake")),
1251-
return_code: 0,
1252-
}),
1253-
};
1254-
1255-
let mut tasks: Vec<&Task<&FactfileTask>> = vec![];
1256-
let (stdout, stderr) = get_task_results_str(&tasks);
1257-
let expected: String = format!("{}", "0/0 tasks run in 0.0s\n".green());
1258-
1259-
assert_eq!(stdout, expected);
1260-
assert_eq!(stderr, "");
1261-
1262-
tasks.push(&task_one);
1263-
1264-
let (one_task_stdout, one_task_stderr) = get_task_results_str(&tasks);
1265-
let (first_task_stdout, first_task_stderr) = get_task_result_line_str(&tasks[0]);
1266-
let expected_one_task = format!("{}{}",
1267-
first_task_stdout,
1268-
"1/1 tasks run in 20.0s\n".green());
1269-
1270-
assert_eq!(one_task_stdout, expected_one_task);
1271-
let first_task_stderr_str = first_task_stderr.unwrap();
1272-
assert_eq!(one_task_stderr, first_task_stderr_str);
1273-
1274-
tasks.push(&task_two);
1275-
1276-
let (two_task_stdout, two_task_stderr) = get_task_results_str(&tasks);
1277-
let (task_two_stdout, task_two_stderr) = get_task_result_line_str(&tasks[1]);
1278-
let expected_two_task = format!("{}{}{}",
1279-
first_task_stdout,
1280-
task_two_stdout,
1281-
"2/2 tasks run in 1m, 40s\n".green());
1282-
assert_eq!(two_task_stdout, expected_two_task);
1283-
assert_eq!(two_task_stderr,
1284-
format!("{}{}", first_task_stderr_str, task_two_stderr.unwrap()));
1285-
1286-
}
1287-
12881169
#[test]
12891170
fn test_start_task_validation_not_present() {
12901171
let mut factfile = Factfile::new("N/A", "test");

0 commit comments

Comments
 (0)