Skip to content

Commit 80fd5d0

Browse files
authored
Merge pull request #144 from ltratt/no_spurious_wakeups
No spurious wakeups
2 parents b6d8afc + 31c5910 commit 80fd5d0

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

src/jobrunner.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,21 @@ impl JobRunner {
8181
/// Listen for new jobs on the queue and then run them.
8282
fn attend(&mut self) {
8383
self.update_pollfds();
84-
// `check_queue` serves two subtly different purposes:
85-
// * Has the event pipe told us there are new jobs in the queue?
86-
// * Are there jobs in the queue from a previous round that we couldn't run yet?
84+
// Have we encountered a temporary failure when trying to run a job? If so, we'll need to
85+
// retry again after a timeout. Note: if this is true then `check_queue` will also be true.
86+
let mut tmp_failure = false;
87+
// Do we need to check the queue?
8788
let mut check_queue = false;
8889
// A scratch buffer used to read from files.
8990
let mut buf = Box::new([0; READBUF]);
9091
// The earliest finish_by time of any running process (i.e. the process that will timeout
9192
// the soonest).
9293
let mut next_finish_by: Option<Instant> = None;
9394
loop {
95+
assert!(!tmp_failure || check_queue);
9496
// If there are jobs on the queue we haven't been able to run for temporary reasons,
9597
// then wait a short amount of time and try again.
96-
let mut timeout = if check_queue { WAIT_TIMEOUT * 1000 } else { -1 };
98+
let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 };
9799
// If any processes will exceed their timeout then, if that's shorter than the above
98100
// timeout, only wait for enough time to pass before we need to send them SIGTERM.
99101
if let Some(fby) = next_finish_by {
@@ -247,6 +249,7 @@ impl JobRunner {
247249
self.running[i] = None;
248250
self.num_running -= 1;
249251
self.update_pollfds();
252+
check_queue = true;
250253
}
251254
}
252255
}
@@ -273,19 +276,33 @@ impl JobRunner {
273276
// However, it's only worth us checking the queue (which requires a lock) if there's
274277
// space for us to run further jobs.
275278
if check_queue && self.num_running < self.maxjobs {
276-
check_queue = !self.try_pop_queue();
279+
match self.try_pop_queue() {
280+
(false, false) => {
281+
check_queue = false;
282+
tmp_failure = false;
283+
}
284+
(true, false) => {
285+
check_queue = true;
286+
tmp_failure = false;
287+
}
288+
(true, true) => {
289+
check_queue = true;
290+
tmp_failure = true;
291+
}
292+
(false, true) => unreachable!(),
293+
}
277294
}
278295
}
279296
}
280297

281-
/// Try to pop all jobs on the queue: returns `true` if it was able to do so successfully or
282-
/// `false` otherwise.
283-
fn try_pop_queue(&mut self) -> bool {
298+
/// Try to pop all jobs on the queue: returns `(queue_is_empty,
299+
/// encountered_temporary_failure)`.
300+
fn try_pop_queue(&mut self) -> (bool, bool) {
284301
let snare = Arc::clone(&self.snare);
285302
let mut queue = snare.queue.lock().unwrap();
286303
loop {
287-
if self.num_running == self.maxjobs && !queue.is_empty() {
288-
return false;
304+
if self.num_running == self.maxjobs {
305+
return (queue.is_empty(), false);
289306
}
290307
let pjob = queue.pop(|repo_id| {
291308
self.running.iter().any(|jobslot| {
@@ -311,7 +328,7 @@ impl JobRunner {
311328
Err(Some(qj)) => {
312329
// The job couldn't be run for temporary reasons: we'll retry later.
313330
queue.push_front(qj);
314-
return false;
331+
return (true, true);
315332
}
316333
Err(None) => {
317334
// The job couldn't be run for permanent reasons: it has been consumed
@@ -326,7 +343,7 @@ impl JobRunner {
326343
// We weren't able to pop any jobs from the queue, but that doesn't mean that
327344
// the queue is necessarily empty: there may be `QueueKind::Sequential` jobs in
328345
// it which can't be popped until others with the same path have completed.
329-
return queue.is_empty();
346+
return (queue.is_empty(), false);
330347
}
331348
}
332349
}

src/queue.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,7 @@ impl Queue {
4848

4949
/// Are there any jobs in the queue?
5050
pub fn is_empty(&self) -> bool {
51-
for v in self.q.values() {
52-
if !v.is_empty() {
53-
return false;
54-
}
55-
}
56-
true
51+
self.q.values().all(|x| x.is_empty())
5752
}
5853

5954
/// Push a new request to the back of the queue.

0 commit comments

Comments
 (0)