Skip to content

Commit bf6c1e7

Browse files
committed
[nextest-runner] improve child error handling
* Add a `ChildAccumulator` struct that tracks the fds, output and errors externally. Tracking all this data externally means that interactive queries about the output can be satisfied. * Continue collecting output while the process is being terminated -- again to ensure that interactive queries are up-to-date. * Make `ChildError` cloneable by wrapping all the errors in `Arc`. child error handling 2
1 parent 0ac874e commit bf6c1e7

File tree

4 files changed

+123
-89
lines changed

4 files changed

+123
-89
lines changed

nextest-runner/src/errors.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::{
2525
env::JoinPathsError,
2626
fmt::{self, Write as _},
2727
process::ExitStatus,
28+
sync::Arc,
2829
};
2930
use target_spec_miette::IntoMietteDiagnostic;
3031
use thiserror::Error;
@@ -383,24 +384,24 @@ impl<T: std::error::Error> std::error::Error for ErrorList<T> {
383384
}
384385

385386
/// An error was returned during the process of managing a child process.
386-
#[derive(Debug, Error)]
387+
#[derive(Clone, Debug, Error)]
387388
#[non_exhaustive]
388389
pub enum ChildError {
389390
/// An error occurred while reading standard output.
390391
#[error("error reading standard output")]
391-
ReadStdout(#[source] std::io::Error),
392+
ReadStdout(#[source] Arc<std::io::Error>),
392393

393394
/// An error occurred while reading standard error.
394395
#[error("error reading standard error")]
395-
ReadStderr(#[source] std::io::Error),
396+
ReadStderr(#[source] Arc<std::io::Error>),
396397

397398
/// An error occurred while reading a combined stream.
398399
#[error("error reading combined stream")]
399-
ReadCombined(#[source] std::io::Error),
400+
ReadCombined(#[source] Arc<std::io::Error>),
400401

401402
/// An error occurred while waiting for the child process to exit.
402403
#[error("error waiting for child process to exit")]
403-
Wait(#[source] std::io::Error),
404+
Wait(#[source] Arc<std::io::Error>),
404405
}
405406

406407
/// An unknown test group was specified in the config.

nextest-runner/src/runner.rs

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
},
2323
signal::{JobControlEvent, ShutdownEvent, SignalEvent, SignalHandler, SignalHandlerKind},
2424
target_runner::TargetRunner,
25-
test_command::{ChildFds, ChildOutputMut},
25+
test_command::{ChildAccumulator, ChildFds, ChildOutputMut},
2626
test_output::{CaptureStrategy, ChildSplitOutput, TestExecutionOutput},
2727
time::{PausableSleep, StopwatchSnapshot, StopwatchStart},
2828
};
@@ -849,15 +849,13 @@ impl<'a> TestRunnerInner<'a> {
849849

850850
let mut timeout_hit = 0;
851851

852-
let mut child_fds = ChildFds::new_split(child.stdout.take(), child.stderr.take());
853-
let mut child_acc = child_fds.make_acc();
852+
let child_fds = ChildFds::new_split(child.stdout.take(), child.stderr.take());
853+
let mut child_acc = ChildAccumulator::new(child_fds);
854854

855855
let (res, leaked) = {
856856
let res = loop {
857857
tokio::select! {
858-
_ = child_fds.fill_buf(&mut child_acc), if !child_fds.is_done() => {
859-
// Some data was read from the child's stdout/stderr.
860-
}
858+
() = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
861859
res = child.wait() => {
862860
// The setup script finished executing.
863861
break res;
@@ -886,7 +884,14 @@ impl<'a> TestRunnerInner<'a> {
886884
// attempt to terminate the slow test.
887885
// as there is a race between shutting down a slow test and its own completion
888886
// we silently ignore errors to avoid printing false warnings.
889-
imp::terminate_child(&mut child, TerminateMode::Timeout, req_rx, job.as_ref(), slow_timeout.grace_period).await;
887+
imp::terminate_child(
888+
&mut child,
889+
&mut child_acc,
890+
TerminateMode::Timeout,
891+
req_rx,
892+
job.as_ref(),
893+
slow_timeout.grace_period,
894+
).await;
890895
status = Some(ExecutionResult::Timeout);
891896
if slow_timeout.grace_period.is_zero() {
892897
break child.wait().await;
@@ -905,6 +910,7 @@ impl<'a> TestRunnerInner<'a> {
905910
RunUnitRequest::Signal(req) => {
906911
handle_signal_request(
907912
&mut child,
913+
&mut child_acc,
908914
req,
909915
stopwatch,
910916
interval_sleep.as_mut(),
@@ -927,10 +933,8 @@ impl<'a> TestRunnerInner<'a> {
927933
let sleep = tokio::time::sleep(leak_timeout);
928934

929935
tokio::select! {
930-
_ = child_fds.fill_buf(&mut child_acc), if !child_fds.is_done() => {
931-
// Some data was read from the child's stdout/stderr.
932-
}
933-
() = sleep, if !child_fds.is_done() => {
936+
() = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
937+
() = sleep, if !child_acc.fds.is_done() => {
934938
break true;
935939
}
936940
else => {
@@ -942,19 +946,18 @@ impl<'a> TestRunnerInner<'a> {
942946
(res, leaked)
943947
};
944948

945-
let mut errors = child_fds.into_errors();
946949
let exit_status = match res {
947950
Ok(exit_status) => Some(exit_status),
948951
Err(err) => {
949-
errors.push(ChildError::Wait(err));
952+
child_acc.errors.push(ChildError::Wait(Arc::new(err)));
950953
None
951954
}
952955
};
953956

954-
if !errors.is_empty() {
957+
if !child_acc.errors.is_empty() {
955958
// TODO: we may wish to return whatever parts of the output we did collect here.
956959
return Err(SetupScriptError::Child {
957-
errors: ErrorList(errors),
960+
errors: ErrorList(child_acc.errors),
958961
});
959962
}
960963

@@ -968,7 +971,7 @@ impl<'a> TestRunnerInner<'a> {
968971
None
969972
};
970973

971-
let (stdout, stderr) = match child_acc {
974+
let (stdout, stderr) = match child_acc.output {
972975
ChildOutputMut::Split { stdout, stderr } => (
973976
stdout.map(|x| x.freeze().into()),
974977
stderr.map(|x| x.freeze().into()),
@@ -1051,16 +1054,17 @@ impl<'a> TestRunnerInner<'a> {
10511054

10521055
let crate::test_command::Child {
10531056
mut child,
1054-
mut child_fds,
1057+
child_fds,
10551058
} = cmd
10561059
.spawn(self.capture_strategy)
10571060
.map_err(RunTestError::Spawn)?;
1058-
let mut child_acc = child_fds.make_acc();
10591061

10601062
// If assigning the child to the job fails, ignore this. This can happen if the process has
10611063
// exited.
10621064
let _ = imp::assign_process_to_job(&child, job.as_ref());
10631065

1066+
let mut child_acc = ChildAccumulator::new(child_fds);
1067+
10641068
let mut status: Option<ExecutionResult> = None;
10651069
let slow_timeout = test.settings.slow_timeout();
10661070
let leak_timeout = test.settings.leak_timeout();
@@ -1075,7 +1079,7 @@ impl<'a> TestRunnerInner<'a> {
10751079
let (res, leaked) = {
10761080
let res = loop {
10771081
tokio::select! {
1078-
() = child_fds.fill_buf(&mut child_acc), if !child_fds.is_done() => {}
1082+
() = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
10791083
res = child.wait() => {
10801084
// The test finished executing.
10811085
break res;
@@ -1106,6 +1110,7 @@ impl<'a> TestRunnerInner<'a> {
11061110
// errors to avoid printing false warnings.
11071111
imp::terminate_child(
11081112
&mut child,
1113+
&mut child_acc,
11091114
TerminateMode::Timeout,
11101115
req_rx,
11111116
job.as_ref(),
@@ -1129,6 +1134,7 @@ impl<'a> TestRunnerInner<'a> {
11291134
RunUnitRequest::Signal(req) => {
11301135
handle_signal_request(
11311136
&mut child,
1137+
&mut child_acc,
11321138
req,
11331139
stopwatch,
11341140
interval_sleep.as_mut(),
@@ -1152,13 +1158,13 @@ impl<'a> TestRunnerInner<'a> {
11521158

11531159
tokio::select! {
11541160
// All of the branches here need to check for
1155-
// `!child_fds.is_done()`, because if child_fds is done we
1161+
// `!child_acc.fds.is_done()`, because if child_fds is done we
11561162
// want to hit the `else` block right away.
1157-
() = child_fds.fill_buf(&mut child_acc), if !child_fds.is_done() => {}
1158-
() = sleep, if !child_fds.is_done() => {
1163+
() = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
1164+
() = sleep, if !child_acc.fds.is_done() => {
11591165
break true;
11601166
}
1161-
recv = req_rx.recv(), if !child_fds.is_done() => {
1167+
recv = req_rx.recv(), if !child_acc.fds.is_done() => {
11621168
// The sender stays open longer than the whole loop, and the buffer is big
11631169
// enough for all messages ever sent through this channel, so a RecvError
11641170
// should never happen.
@@ -1179,27 +1185,26 @@ impl<'a> TestRunnerInner<'a> {
11791185
(res, leaked)
11801186
};
11811187

1182-
let mut errors = child_fds.into_errors();
11831188
let exit_status = match res {
11841189
Ok(exit_status) => Some(exit_status),
11851190
Err(err) => {
1186-
errors.push(ChildError::Wait(err));
1191+
child_acc.errors.push(ChildError::Wait(Arc::new(err)));
11871192
None
11881193
}
11891194
};
11901195

1191-
if !errors.is_empty() {
1196+
if !child_acc.errors.is_empty() {
11921197
// TODO: we may wish to return whatever parts of the output we did collect here.
11931198
return Err(RunTestError::Child {
1194-
errors: ErrorList(errors),
1199+
errors: ErrorList(child_acc.errors),
11951200
});
11961201
}
11971202

11981203
let exit_status = exit_status.expect("None always results in early return");
11991204
let result = status.unwrap_or_else(|| create_execution_result(exit_status, leaked));
12001205

12011206
Ok(InternalExecuteStatus {
1202-
output: TestExecutionOutput::Output(child_acc.freeze()),
1207+
output: TestExecutionOutput::Output(child_acc.output.freeze()),
12031208
result,
12041209
stopwatch_end: stopwatch.snapshot(),
12051210
is_slow,
@@ -1223,8 +1228,13 @@ fn drain_req_rx(mut receiver: UnboundedReceiver<RunUnitRequest>) {
12231228
}
12241229
}
12251230

1231+
// It would be nice to fix this function to not have so many arguments, but this
1232+
// code is actively being refactored right now and imposing too much structure
1233+
// can cause more harm than good.
1234+
#[allow(clippy::too_many_arguments)]
12261235
async fn handle_signal_request(
1227-
child: &mut tokio::process::Child,
1236+
child: &mut Child,
1237+
child_acc: &mut ChildAccumulator,
12281238
req: SignalRequest,
12291239
// These annotations are needed to silence lints on non-Unix platforms.
12301240
#[allow(unused_variables)] stopwatch: &mut StopwatchStart,
@@ -1258,6 +1268,7 @@ async fn handle_signal_request(
12581268
SignalRequest::Shutdown(event) => {
12591269
imp::terminate_child(
12601270
child,
1271+
child_acc,
12611272
TerminateMode::Signal(event),
12621273
req_rx,
12631274
job,
@@ -2539,6 +2550,7 @@ mod imp {
25392550

25402551
pub(super) async fn terminate_child(
25412552
child: &mut Child,
2553+
_child_acc: &mut ChildAccumulator,
25422554
mode: TerminateMode,
25432555
_req_rx: &mut UnboundedReceiver<RunUnitRequest>,
25442556
job: Option<&Job>,
@@ -2622,8 +2634,11 @@ mod imp {
26222634
unsafe { libc::raise(SIGSTOP) };
26232635
}
26242636

2637+
// TODO: should this indicate whether the process exited immediately? Could
2638+
// do this with a non-async fn that optionally returns a future to await on.
26252639
pub(super) async fn terminate_child(
26262640
child: &mut Child,
2641+
child_acc: &mut ChildAccumulator,
26272642
mode: TerminateMode,
26282643
req_rx: &mut UnboundedReceiver<RunUnitRequest>,
26292644
_job: Option<&Job>,
@@ -2655,6 +2670,7 @@ mod imp {
26552670

26562671
loop {
26572672
tokio::select! {
2673+
() = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
26582674
_ = child.wait() => {
26592675
// The process exited.
26602676
break;

nextest-runner/src/test_command.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{
2020
use tracing::warn;
2121

2222
mod imp;
23-
pub(crate) use imp::{Child, ChildFds, ChildOutputMut};
23+
pub(crate) use imp::{Child, ChildAccumulator, ChildFds, ChildOutputMut};
2424

2525
#[derive(Clone, Debug)]
2626
pub(crate) struct LocalExecuteContext<'a> {

0 commit comments

Comments
 (0)