Skip to content

Commit 455f5d5

Browse files
Purge caches and build directories when nothing is running
This prevents deleting files that Cargo is trying to read out from under it.
1 parent f3e4a12 commit 455f5d5

File tree

3 files changed

+195
-40
lines changed

3 files changed

+195
-40
lines changed

src/runner/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::prelude::*;
1010
use crate::results::TestResult;
1111
use crate::runner::worker::{DiskSpaceWatcher, Worker};
1212
use rustwide::Workspace;
13+
use std::sync::Arc;
1314
use std::thread::scope;
1415
use std::time::Duration;
1516
pub use worker::RecordProgress;
@@ -95,9 +96,19 @@ pub fn run_ex(
9596
let disk_watcher = DiskSpaceWatcher::new(
9697
DISK_SPACE_WATCHER_INTERVAL,
9798
DISK_SPACE_WATCHER_THRESHOLD,
98-
&workers,
99+
workers.len(),
99100
);
100101

102+
for worker in workers.iter() {
103+
let disk_watcher = Arc::clone(&disk_watcher);
104+
assert!(worker
105+
.between_crates
106+
.set(Box::new(move |is_permanent| {
107+
disk_watcher.worker_idle(is_permanent);
108+
}))
109+
.is_ok());
110+
}
111+
101112
scope(|scope1| {
102113
std::thread::Builder::new()
103114
.name("disk-space-watcher".into())

src/runner/worker.rs

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@ use crate::utils;
1111
use rustwide::logging::{self, LogStorage};
1212
use rustwide::{BuildDirectory, Workspace};
1313
use std::collections::HashMap;
14-
use std::sync::Condvar;
15-
use std::sync::{
16-
atomic::{AtomicBool, Ordering},
17-
Mutex,
18-
};
14+
use std::sync::Mutex;
15+
use std::sync::{Arc, Condvar, OnceLock};
1916
use std::time::Duration;
2017

2118
pub trait RecordProgress: Send + Sync {
@@ -51,8 +48,10 @@ pub(super) struct Worker<'a> {
5148
ex: &'a Experiment,
5249
config: &'a crate::config::Config,
5350
api: &'a dyn RecordProgress,
54-
target_dir_cleanup: AtomicBool,
5551
next_crate: &'a (dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
52+
53+
// Called by the worker thread between crates, when no global state (namely caches) is in use.
54+
pub(super) between_crates: OnceLock<Box<dyn Fn(bool) + Send + Sync + 'a>>,
5655
}
5756

5857
impl<'a> Worker<'a> {
@@ -81,7 +80,8 @@ impl<'a> Worker<'a> {
8180
config,
8281
next_crate,
8382
api,
84-
target_dir_cleanup: AtomicBool::new(false),
83+
84+
between_crates: OnceLock::new(),
8585
}
8686
}
8787

@@ -163,14 +163,21 @@ impl<'a> Worker<'a> {
163163
// Backoff from calling the server again, to reduce load when we're spinning until
164164
// the next experiment is ready.
165165
std::thread::sleep(Duration::from_secs(rand::random_range(60..120)));
166+
167+
if let Some(cb) = self.between_crates.get() {
168+
cb(true);
169+
}
170+
166171
// We're done if no more crates left.
167172
return Ok(());
168173
};
169174

170-
self.maybe_cleanup_target_dir()?;
171-
172175
info!("{} processing crate {}", self.name, krate);
173176

177+
if let Some(cb) = self.between_crates.get() {
178+
cb(false);
179+
}
180+
174181
if !self.ex.ignore_blacklist && self.config.should_skip(&krate) {
175182
for tc in &self.ex.toolchains {
176183
// If a skipped crate is somehow sent to the agent (for example, when a crate was
@@ -338,41 +345,36 @@ impl<'a> Worker<'a> {
338345
}
339346
}
340347
}
341-
342-
fn maybe_cleanup_target_dir(&self) -> Fallible<()> {
343-
if !self.target_dir_cleanup.swap(false, Ordering::SeqCst) {
344-
return Ok(());
345-
}
346-
info!("purging target dir for {}", self.name);
347-
for dir in self.build_dir.values() {
348-
dir.lock().unwrap().purge()?;
349-
}
350-
351-
Ok(())
352-
}
353-
354-
fn schedule_target_dir_cleanup(&self) {
355-
self.target_dir_cleanup.store(true, Ordering::SeqCst);
356-
}
357348
}
358349

359-
pub(super) struct DiskSpaceWatcher<'a> {
350+
pub(super) struct DiskSpaceWatcher {
360351
interval: Duration,
361352
threshold: f32,
362-
workers: &'a [Worker<'a>],
363353
should_stop: Mutex<bool>,
364354
waiter: Condvar,
355+
356+
worker_count: usize,
357+
358+
// If the bool is true, that means we're waiting for the cache to reach zero, in which case
359+
// workers will wait for it to be false before starting. This gives us a global 'is the cache
360+
// in use' synchronization point.
361+
cache_in_use: Mutex<(usize, bool)>,
362+
cache_waiter: Condvar,
365363
}
366364

367-
impl<'a> DiskSpaceWatcher<'a> {
368-
pub(super) fn new(interval: Duration, threshold: f32, workers: &'a [Worker<'a>]) -> Self {
369-
DiskSpaceWatcher {
365+
impl DiskSpaceWatcher {
366+
pub(super) fn new(interval: Duration, threshold: f32, worker_count: usize) -> Arc<Self> {
367+
Arc::new(DiskSpaceWatcher {
370368
interval,
371369
threshold,
372-
workers,
373370
should_stop: Mutex::new(false),
374371
waiter: Condvar::new(),
375-
}
372+
373+
worker_count,
374+
375+
cache_in_use: Mutex::new((0, false)),
376+
cache_waiter: Condvar::new(),
377+
})
376378
}
377379

378380
pub(super) fn stop(&self) {
@@ -406,14 +408,67 @@ impl<'a> DiskSpaceWatcher<'a> {
406408
};
407409

408410
if usage.is_threshold_reached(self.threshold) {
409-
warn!("running the scheduled thread cleanup");
410-
for worker in self.workers {
411-
worker.schedule_target_dir_cleanup();
412-
}
411+
self.clean(workspace);
412+
}
413+
}
413414

414-
if let Err(e) = workspace.purge_all_caches() {
415-
warn!("failed to purge caches: {:?}", e);
416-
}
415+
fn clean(&self, workspace: &dyn ToClean) {
416+
warn!("declaring interest in worker idle");
417+
418+
// Set interest in cleaning caches and then wait for cache use to drain to zero.
419+
let mut guard = self.cache_in_use.lock().unwrap();
420+
guard.1 = true;
421+
422+
self.cache_waiter.notify_all();
423+
424+
warn!("declared interest in workers, waiting for everyone to idle");
425+
426+
let mut guard = self
427+
.cache_waiter
428+
.wait_while(guard, |c| c.0 != self.worker_count)
429+
.unwrap();
430+
431+
// OK, purging caches, clear interest.
432+
guard.1 = false;
433+
434+
self.cache_waiter.notify_all();
435+
436+
warn!("purging all build dirs and caches");
437+
438+
workspace.purge();
439+
}
440+
441+
pub(super) fn worker_idle(&self, permanent: bool) {
442+
log::trace!("worker at idle point");
443+
let mut guard = self.cache_in_use.lock().unwrap();
444+
log::trace!("worker declared idle");
445+
// note that we're not running right now.
446+
guard.0 += 1;
447+
self.cache_waiter.notify_all();
448+
if !permanent {
449+
let mut guard = self.cache_waiter.wait_while(guard, |c| c.1).unwrap();
450+
// Then set ourselves as running.
451+
guard.0 -= 1;
452+
self.cache_waiter.notify_all();
453+
}
454+
}
455+
}
456+
457+
trait ToClean {
458+
fn purge(&self);
459+
}
460+
461+
impl ToClean for Workspace {
462+
fn purge(&self) {
463+
if let Err(e) = self.purge_all_caches() {
464+
warn!("failed to purge caches: {:?}", e);
465+
}
466+
467+
if let Err(e) = self.purge_all_build_dirs() {
468+
warn!("failed to purge build directories: {:?}", e);
417469
}
418470
}
419471
}
472+
473+
#[cfg(test)]
474+
mod test;

src/runner/worker/test.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use std::sync::atomic::AtomicBool;
2+
use std::sync::atomic::Ordering;
3+
use std::sync::Arc;
4+
use std::sync::Condvar;
5+
use std::sync::Mutex;
6+
use std::time::Duration;
7+
8+
use crate::runner::DiskSpaceWatcher;
9+
10+
#[derive(Default, Debug)]
11+
struct PurgeTracker {
12+
count: Mutex<usize>,
13+
wait: Condvar,
14+
}
15+
16+
impl PurgeTracker {
17+
#[track_caller]
18+
fn assert_count_eq(&self, count: usize) {
19+
let guard = self.count.lock().unwrap();
20+
let (g, timer) = self
21+
.wait
22+
.wait_timeout_while(guard, Duration::from_secs(10), |g| *g != count)
23+
.unwrap();
24+
assert!(
25+
!timer.timed_out(),
26+
"timed out while waiting for {} to equal {}",
27+
*g,
28+
count
29+
);
30+
assert_eq!(*g, count);
31+
}
32+
}
33+
34+
impl super::ToClean for PurgeTracker {
35+
fn purge(&self) {
36+
*self.count.lock().unwrap() += 1;
37+
self.wait.notify_all();
38+
}
39+
}
40+
41+
#[test]
42+
fn check_cleanup_single_worker() {
43+
let _ = env_logger::try_init();
44+
let tracker = Arc::new(PurgeTracker::default());
45+
let watcher = DiskSpaceWatcher::new(Duration::from_secs(60), 0.8, 1);
46+
let done = &AtomicBool::new(false);
47+
std::thread::scope(|s| {
48+
s.spawn(|| {
49+
for _ in 0..3 {
50+
watcher.clean(&*tracker);
51+
}
52+
done.store(true, Ordering::Relaxed);
53+
});
54+
55+
s.spawn(|| {
56+
while !done.load(Ordering::Relaxed) {
57+
watcher.worker_idle(false);
58+
}
59+
});
60+
});
61+
62+
tracker.assert_count_eq(3);
63+
}
64+
65+
#[test]
66+
fn check_cleanup_multi_worker() {
67+
let _ = env_logger::try_init();
68+
let tracker = Arc::new(PurgeTracker::default());
69+
let watcher = DiskSpaceWatcher::new(Duration::from_secs(60), 0.8, 3);
70+
let done = &AtomicBool::new(false);
71+
std::thread::scope(|s| {
72+
s.spawn(|| {
73+
for _ in 0..5 {
74+
watcher.clean(&*tracker);
75+
}
76+
done.store(true, Ordering::Relaxed);
77+
});
78+
79+
for _ in 0..3 {
80+
s.spawn(|| {
81+
while !done.load(Ordering::Relaxed) {
82+
watcher.worker_idle(false);
83+
}
84+
});
85+
}
86+
});
87+
88+
tracker.assert_count_eq(5);
89+
}

0 commit comments

Comments
 (0)