Skip to content

Commit c05fd09

Browse files
committed
Auto merge of #567 - Mark-Simulacrum:simplify-runner, r=pietroalbini
Skip mpsc channels in report generation and disk cleaning I doubt this is the cause of our deadlocks, but std's mpsc channels have had some bugs in the past around timeouts IIRC. In any case, the new implementation feels just simpler to me, so an improvement regardless.
2 parents b55a557 + edd87b7 commit c05fd09

File tree

3 files changed

+34
-46
lines changed

3 files changed

+34
-46
lines changed

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
clippy::unnecessary_wraps,
99
clippy::needless_question_mark,
1010
clippy::vec_init_then_push,
11-
clippy::upper_case_acronyms
11+
clippy::upper_case_acronyms,
12+
clippy::mutex_atomic
1213
)]
1314

1415
pub mod actions;

src/runner/worker.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ use crate::runner::{OverrideResult, RunnerState};
77
use crate::utils;
88
use rustwide::{BuildDirectory, Workspace};
99
use std::collections::HashMap;
10+
use std::sync::Condvar;
1011
use std::sync::{
1112
atomic::{AtomicBool, Ordering},
12-
mpsc::{self, RecvTimeoutError},
13-
Arc, Mutex,
13+
Mutex,
1414
};
1515
use std::thread;
1616
use std::time::Duration;
@@ -151,34 +151,38 @@ pub(super) struct DiskSpaceWatcher<'a, DB: WriteResults + Sync> {
151151
interval: Duration,
152152
threshold: f32,
153153
workers: &'a [Worker<'a, DB>],
154-
stop_send: Arc<Mutex<mpsc::Sender<()>>>,
155-
stop_recv: Arc<Mutex<mpsc::Receiver<()>>>,
154+
should_stop: Mutex<bool>,
155+
waiter: Condvar,
156156
}
157157

158158
impl<'a, DB: WriteResults + Sync> DiskSpaceWatcher<'a, DB> {
159159
pub(super) fn new(interval: Duration, threshold: f32, workers: &'a [Worker<'a, DB>]) -> Self {
160-
let (stop_send, stop_recv) = mpsc::channel();
161160
DiskSpaceWatcher {
162161
interval,
163162
threshold,
164163
workers,
165-
stop_send: Arc::new(Mutex::new(stop_send)),
166-
stop_recv: Arc::new(Mutex::new(stop_recv)),
164+
should_stop: Mutex::new(false),
165+
waiter: Condvar::new(),
167166
}
168167
}
169168

170169
pub(super) fn stop(&self) {
171-
self.stop_send.lock().unwrap().send(()).unwrap();
170+
*self.should_stop.lock().unwrap() = true;
171+
self.waiter.notify_all();
172172
}
173173

174174
pub(super) fn run(&self) {
175-
loop {
175+
let mut should_stop = self.should_stop.lock().unwrap();
176+
while !*should_stop {
176177
self.check();
177-
match self.stop_recv.lock().unwrap().recv_timeout(self.interval) {
178-
Ok(()) => return,
179-
Err(RecvTimeoutError::Timeout) => {}
180-
Err(RecvTimeoutError::Disconnected) => panic!("disconnected stop channel"),
181-
}
178+
// Wait for either the interval to pass or should_stop to get a
179+
// write. We don't care if we timed out or not, we can double check
180+
// the should_stop regardless.
181+
should_stop = self
182+
.waiter
183+
.wait_timeout(should_stop, self.interval)
184+
.unwrap()
185+
.0;
182186
}
183187
}
184188

src/server/reports.rs

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use crate::server::Data;
77
use crate::utils;
88
use rusoto_core::request::HttpClient;
99
use rusoto_s3::S3Client;
10-
use std::sync::{mpsc, Arc, Mutex};
11-
use std::thread;
10+
use std::sync::{Arc, Mutex};
11+
use std::thread::{self, Thread};
1212
use std::time::Duration;
1313

1414
// Automatically wake up the reports generator thread every 10 minutes to check for new jobs
@@ -32,7 +32,7 @@ fn generate_report(data: &Data, ex: &Experiment, results: &DatabaseDB) -> Fallib
3232
Ok(res)
3333
}
3434

35-
fn reports_thread(data: &Data, wakes: &mpsc::Receiver<()>) -> Fallible<()> {
35+
fn reports_thread(data: &Data) -> Fallible<()> {
3636
let timeout = Duration::from_secs(AUTOMATIC_THREAD_WAKEUP);
3737
let results = DatabaseDB::new(&data.db);
3838

@@ -41,9 +41,7 @@ fn reports_thread(data: &Data, wakes: &mpsc::Receiver<()>) -> Fallible<()> {
4141
Some(ex) => ex,
4242
None => {
4343
// This will sleep AUTOMATIC_THREAD_WAKEUP seconds *or* until a wake is received
44-
if let Err(mpsc::RecvTimeoutError::Disconnected) = wakes.recv_timeout(timeout) {
45-
thread::sleep(timeout);
46-
}
44+
std::thread::park_timeout(timeout);
4745

4846
continue;
4947
}
@@ -128,43 +126,28 @@ fn reports_thread(data: &Data, wakes: &mpsc::Receiver<()>) -> Fallible<()> {
128126
}
129127

130128
#[derive(Clone, Default)]
131-
pub struct ReportsWorker(Arc<Mutex<Option<mpsc::Sender<()>>>>);
129+
pub struct ReportsWorker(Arc<Mutex<Option<Thread>>>);
132130

133131
impl ReportsWorker {
134132
pub fn new() -> Self {
135133
ReportsWorker(Arc::new(Mutex::new(None)))
136134
}
137135

138136
pub fn spawn(&self, data: Data) {
139-
let waker = self.0.clone();
140-
thread::spawn(move || {
141-
// Set up a new waker channel
142-
let (wake_send, wake_recv) = mpsc::channel();
143-
{
144-
let mut waker = waker.lock().unwrap();
145-
*waker = Some(wake_send);
146-
}
147-
148-
loop {
149-
let result = reports_thread(&data.clone(), &wake_recv)
150-
.with_context(|_| "the reports generator thread crashed");
151-
if let Err(e) = result {
152-
utils::report_failure(&e);
153-
}
154-
155-
warn!("the reports generator thread will be respawned in one minute");
156-
thread::sleep(Duration::from_secs(60));
137+
let joiner = thread::spawn(move || loop {
138+
let result = reports_thread(&data.clone())
139+
.with_context(|_| "the reports generator thread crashed");
140+
if let Err(e) = result {
141+
utils::report_failure(&e);
157142
}
158143
});
144+
*self.0.lock().unwrap_or_else(|e| e.into_inner()) = Some(joiner.thread().clone());
159145
}
160146

161147
pub fn wake(&self) {
162-
// We don't really care if the wake fails: the reports generator thread wakes up on its own
163-
// every few minutes, so this just speeds up the process
164-
if let Some(waker) = self.0.lock().ok().as_ref().and_then(|opt| opt.as_ref()) {
165-
if waker.send(()).is_err() {
166-
warn!("can't wake the reports generator, will have to wait");
167-
}
148+
let guard = self.0.lock().unwrap_or_else(|e| e.into_inner());
149+
if let Some(thread) = &*guard {
150+
thread.unpark();
168151
} else {
169152
warn!("no report generator to wake up!");
170153
}

0 commit comments

Comments
 (0)