Skip to content

Commit 258e6c9

Browse files
committed
[nextest-runner] remove cancellation AtomicBool
We've changed things such that the executor always checks in with the dispatcher before starting a unit. This makes the bool unnecessary. The cancel receiver is currently necessary for the handle_delay_between_attempts step, which should cancel the run no matter what. We'll remove this shortly by moving this functionality into req_rx.
1 parent b6b7077 commit 258e6c9

File tree

3 files changed

+10
-51
lines changed

3 files changed

+10
-51
lines changed

nextest-runner/src/runner/dispatcher.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@ use crate::{
2323
use chrono::Local;
2424
use debug_ignore::DebugIgnore;
2525
use quick_junit::ReportUuid;
26-
use std::{
27-
collections::BTreeMap,
28-
sync::atomic::{AtomicBool, Ordering},
29-
time::Duration,
30-
};
26+
use std::{collections::BTreeMap, time::Duration};
3127
use tokio::sync::{
3228
broadcast,
3329
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
@@ -102,7 +98,6 @@ where
10298
signal_handler: &mut SignalHandler,
10399
input_handler: &mut InputHandler,
104100
report_cancel_rx: oneshot::Receiver<()>,
105-
cancelled_ref: &AtomicBool,
106101
cancellation_sender: broadcast::Sender<()>,
107102
) -> RunnerTaskState {
108103
let mut report_cancel_rx = std::pin::pin!(report_cancel_rx);
@@ -275,10 +270,7 @@ where
275270
self.info_finished(total.saturating_sub(index + 1));
276271
}
277272
HandleEventResponse::Cancel(cancel) => {
278-
// A cancellation notice was received. Note the ordering here:
279-
// cancelled_ref is set *before* notifications are broadcast. This
280-
// prevents race conditions.
281-
cancelled_ref.store(true, Ordering::Release);
273+
// A cancellation notice was received.
282274
let _ = cancellation_sender.send(());
283275
match cancel {
284276
// Some of the branches here don't do anything, but are specified

nextest-runner/src/runner/executor.rs

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ use std::{
3939
num::NonZeroUsize,
4040
pin::Pin,
4141
process::{ExitStatus, Stdio},
42-
sync::{
43-
atomic::{AtomicBool, Ordering},
44-
Arc,
45-
},
42+
sync::Arc,
4643
time::Duration,
4744
};
4845
use tokio::{
@@ -92,7 +89,6 @@ impl<'a> ExecutorContext<'a> {
9289
pub(super) async fn run_setup_scripts(
9390
&self,
9491
resp_tx: UnboundedSender<ExecutorEvent<'a>>,
95-
cancelled_ref: &AtomicBool,
9692
) -> SetupScriptExecuteData<'a> {
9793
let setup_scripts = self.profile.setup_scripts(self.test_list);
9894
let total = setup_scripts.len();
@@ -108,11 +104,6 @@ impl<'a> ExecutorContext<'a> {
108104
let config = script.config;
109105

110106
let script_fut = async move {
111-
if cancelled_ref.load(Ordering::Acquire) {
112-
// Check for test cancellation.
113-
return None;
114-
}
115-
116107
let (req_rx_tx, req_rx_rx) = oneshot::channel();
117108
let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted {
118109
script_id: script_id.clone(),
@@ -172,15 +163,9 @@ impl<'a> ExecutorContext<'a> {
172163
test_instance: TestInstance<'a>,
173164
settings: TestSettings<'a>,
174165
resp_tx: UnboundedSender<ExecutorEvent<'a>>,
175-
cancelled_ref: &AtomicBool,
176166
mut cancel_receiver: broadcast::Receiver<()>,
177167
setup_script_data: Arc<SetupScriptExecuteData<'a>>,
178168
) {
179-
if cancelled_ref.load(Ordering::Acquire) {
180-
// Check for test cancellation.
181-
return;
182-
}
183-
184169
debug!(test_name = test_instance.name, "running test");
185170

186171
let settings = Arc::new(settings);
@@ -224,14 +209,10 @@ impl<'a> ExecutorContext<'a> {
224209
total_attempts,
225210
};
226211

227-
// Note: do not check for cancellation here.
228-
// Only check for cancellation after the first
229-
// run, to avoid a situation where run_statuses
230-
// is empty.
231-
232212
if retry_data.attempt > 1 {
233-
// Ensure that the dispatcher believes the run is still ongoing. If the run is
234-
// cancelled, the dispatcher will let us know.
213+
// Ensure that the dispatcher believes the run is still ongoing.
214+
// If the run is cancelled, the dispatcher will let us know by
215+
// dropping the receiver.
235216
let (tx, rx) = oneshot::channel();
236217
_ = resp_tx.send(ExecutorEvent::RetryStarted {
237218
test_instance,
@@ -242,8 +223,8 @@ impl<'a> ExecutorContext<'a> {
242223
match rx.await {
243224
Ok(()) => {}
244225
Err(_) => {
245-
// The receiver was dropped -- the dispatcher has signaled that this unit
246-
// should exit.
226+
// The receiver was dropped -- the dispatcher has
227+
// signaled that this unit should exit.
247228
return;
248229
}
249230
}
@@ -265,9 +246,6 @@ impl<'a> ExecutorContext<'a> {
265246
if run_status.result.is_success() {
266247
// The test succeeded.
267248
break run_status;
268-
} else if cancelled_ref.load(Ordering::Acquire) {
269-
// The test was cancelled.
270-
break run_status;
271249
} else if retry_data.attempt < retry_data.total_attempts {
272250
// Retry this test: send a retry event, then retry the loop.
273251
delay = backoff_iter

nextest-runner/src/runner/imp.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ use async_scoped::TokioScope;
2020
use future_queue::StreamExt;
2121
use futures::prelude::*;
2222
use quick_junit::ReportUuid;
23-
use std::{
24-
convert::Infallible,
25-
fmt,
26-
sync::{atomic::AtomicBool, Arc},
27-
};
23+
use std::{convert::Infallible, fmt, sync::Arc};
2824
use tokio::{
2925
runtime::Runtime,
3026
sync::{broadcast, mpsc::unbounded_channel, oneshot},
@@ -178,7 +174,6 @@ impl<'a> TestRunner<'a> {
178174
F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
179175
E: fmt::Debug + Send,
180176
{
181-
let cancelled = AtomicBool::new(false);
182177
let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
183178

184179
// If report_cancel_tx is None, at least one error has occurred and the
@@ -190,7 +185,6 @@ impl<'a> TestRunner<'a> {
190185
let res = self.inner.execute(
191186
&mut self.signal_handler,
192187
&mut self.input_handler,
193-
&cancelled,
194188
report_cancel_rx,
195189
|event| {
196190
match callback(event) {
@@ -247,7 +241,6 @@ impl<'a> TestRunnerInner<'a> {
247241
&self,
248242
signal_handler: &mut SignalHandler,
249243
input_handler: &mut InputHandler,
250-
cancelled_ref: &AtomicBool,
251244
report_cancel_rx: oneshot::Receiver<()>,
252245
callback: F,
253246
) -> Result<RunStats, Vec<JoinError>>
@@ -295,7 +288,6 @@ impl<'a> TestRunnerInner<'a> {
295288
signal_handler,
296289
input_handler,
297290
report_cancel_rx,
298-
cancelled_ref,
299291
cancellation_sender.clone(),
300292
);
301293
scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
@@ -305,9 +297,7 @@ impl<'a> TestRunnerInner<'a> {
305297
let run_scripts_fut = async move {
306298
// Since script tasks are run serially, we just reuse the one
307299
// script task.
308-
let script_data = executor_cx_ref
309-
.run_setup_scripts(script_resp_tx, cancelled_ref)
310-
.await;
300+
let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
311301
if script_tx.send(script_data).is_err() {
312302
// The dispatcher has shut down, so we should too.
313303
debug!("script_tx.send failed, shutting down");
@@ -369,7 +359,6 @@ impl<'a> TestRunnerInner<'a> {
369359
test_instance,
370360
settings,
371361
resp_tx.clone(),
372-
cancelled_ref,
373362
cancel_rx,
374363
setup_script_data,
375364
))

0 commit comments

Comments
 (0)