@@ -11,11 +11,8 @@ use crate::utils;
1111use rustwide:: logging:: { self , LogStorage } ;
1212use rustwide:: { BuildDirectory , Workspace } ;
1313use std:: collections:: HashMap ;
14- use std:: sync:: Condvar ;
15- use std:: sync:: {
16- atomic:: { AtomicBool , Ordering } ,
17- Mutex ,
18- } ;
14+ use std:: sync:: Mutex ;
15+ use std:: sync:: { Arc , Condvar , OnceLock } ;
1916use std:: time:: Duration ;
2017
2118pub trait RecordProgress : Send + Sync {
@@ -51,8 +48,10 @@ pub(super) struct Worker<'a> {
5148 ex : & ' a Experiment ,
5249 config : & ' a crate :: config:: Config ,
5350 api : & ' a dyn RecordProgress ,
54- target_dir_cleanup : AtomicBool ,
5551 next_crate : & ' a ( dyn Fn ( ) -> Fallible < Option < Crate > > + Send + Sync ) ,
52+
53+ // Called by the worker thread between crates, when no global state (namely caches) is in use.
54+ pub ( super ) between_crates : OnceLock < Box < dyn Fn ( ) + Send + Sync + ' a > > ,
5655}
5756
5857impl < ' a > Worker < ' a > {
@@ -81,7 +80,8 @@ impl<'a> Worker<'a> {
8180 config,
8281 next_crate,
8382 api,
84- target_dir_cleanup : AtomicBool :: new ( false ) ,
83+
84+ between_crates : OnceLock :: new ( ) ,
8585 }
8686 }
8787
@@ -167,10 +167,12 @@ impl<'a> Worker<'a> {
167167 return Ok ( ( ) ) ;
168168 } ;
169169
170- self . maybe_cleanup_target_dir ( ) ?;
171-
172170 info ! ( "{} processing crate {}" , self . name, krate) ;
173171
172+ if let Some ( cb) = self . between_crates . get ( ) {
173+ cb ( ) ;
174+ }
175+
174176 if !self . ex . ignore_blacklist && self . config . should_skip ( & krate) {
175177 for tc in & self . ex . toolchains {
176178 // If a skipped crate is somehow sent to the agent (for example, when a crate was
@@ -338,41 +340,36 @@ impl<'a> Worker<'a> {
338340 }
339341 }
340342 }
341-
342- fn maybe_cleanup_target_dir ( & self ) -> Fallible < ( ) > {
343- if !self . target_dir_cleanup . swap ( false , Ordering :: SeqCst ) {
344- return Ok ( ( ) ) ;
345- }
346- info ! ( "purging target dir for {}" , self . name) ;
347- for dir in self . build_dir . values ( ) {
348- dir. lock ( ) . unwrap ( ) . purge ( ) ?;
349- }
350-
351- Ok ( ( ) )
352- }
353-
354- fn schedule_target_dir_cleanup ( & self ) {
355- self . target_dir_cleanup . store ( true , Ordering :: SeqCst ) ;
356- }
357343}
358344
359- pub ( super ) struct DiskSpaceWatcher < ' a > {
345+ pub ( super ) struct DiskSpaceWatcher {
360346 interval : Duration ,
361347 threshold : f32 ,
362- workers : & ' a [ Worker < ' a > ] ,
363348 should_stop : Mutex < bool > ,
364349 waiter : Condvar ,
350+
351+ worker_count : usize ,
352+
353+ // If the bool is true, that means we're waiting for the cache to reach zero, in which case
354+ // workers will wait for it to be false before starting. This gives us a global 'is the cache
355+ // in use' synchronization point.
356+ cache_in_use : Mutex < ( usize , bool ) > ,
357+ cache_waiter : Condvar ,
365358}
366359
367- impl < ' a > DiskSpaceWatcher < ' a > {
368- pub ( super ) fn new ( interval : Duration , threshold : f32 , workers : & ' a [ Worker < ' a > ] ) -> Self {
369- DiskSpaceWatcher {
360+ impl DiskSpaceWatcher {
361+ pub ( super ) fn new ( interval : Duration , threshold : f32 , worker_count : usize ) -> Arc < Self > {
362+ Arc :: new ( DiskSpaceWatcher {
370363 interval,
371364 threshold,
372- workers,
373365 should_stop : Mutex :: new ( false ) ,
374366 waiter : Condvar :: new ( ) ,
375- }
367+
368+ worker_count,
369+
370+ cache_in_use : Mutex :: new ( ( 0 , false ) ) ,
371+ cache_waiter : Condvar :: new ( ) ,
372+ } )
376373 }
377374
378375 pub ( super ) fn stop ( & self ) {
@@ -406,14 +403,66 @@ impl<'a> DiskSpaceWatcher<'a> {
406403 } ;
407404
408405 if usage. is_threshold_reached ( self . threshold ) {
409- warn ! ( "running the scheduled thread cleanup" ) ;
410- for worker in self . workers {
411- worker. schedule_target_dir_cleanup ( ) ;
412- }
406+ self . clean ( workspace) ;
407+ }
408+ }
413409
414- if let Err ( e) = workspace. purge_all_caches ( ) {
415- warn ! ( "failed to purge caches: {:?}" , e) ;
416- }
410+ fn clean ( & self , workspace : & dyn ToClean ) {
411+ warn ! ( "declaring interest in worker idle" ) ;
412+
413+ // Set interest in cleaning caches and then wait for cache use to drain to zero.
414+ let mut guard = self . cache_in_use . lock ( ) . unwrap ( ) ;
415+ guard. 1 = true ;
416+
417+ self . cache_waiter . notify_all ( ) ;
418+
419+ warn ! ( "declared interest in workers, waiting for everyone to idle" ) ;
420+
421+ let mut guard = self
422+ . cache_waiter
423+ . wait_while ( guard, |c| dbg ! ( c. 0 , "clean waiter" ) . 0 != self . worker_count )
424+ . unwrap ( ) ;
425+
426+ // OK, purging caches, clear interest.
427+ guard. 1 = false ;
428+
429+ self . cache_waiter . notify_all ( ) ;
430+
431+ warn ! ( "purging all build dirs and caches" ) ;
432+
433+ workspace. purge ( ) ;
434+ }
435+
436+ pub ( super ) fn worker_idle ( & self ) {
437+ log:: trace!( "worker at idle point" ) ;
438+ let mut guard = self . cache_in_use . lock ( ) . unwrap ( ) ;
439+ log:: trace!( "worker declared idle" ) ;
440+ // note that we're not running right now.
441+ guard. 0 += 1 ;
442+ self . cache_waiter . notify_all ( ) ;
443+ let mut guard = self . cache_waiter . wait_while ( guard, |c| c. 1 ) . unwrap ( ) ;
444+ info ! ( "worker resuming from idle" ) ;
445+ // Then set ourselves as running.
446+ guard. 0 -= 1 ;
447+ self . cache_waiter . notify_all ( ) ;
448+ }
449+ }
450+
451+ trait ToClean {
452+ fn purge ( & self ) ;
453+ }
454+
455+ impl ToClean for Workspace {
456+ fn purge ( & self ) {
457+ if let Err ( e) = self . purge_all_caches ( ) {
458+ warn ! ( "failed to purge caches: {:?}" , e) ;
459+ }
460+
461+ if let Err ( e) = self . purge_all_build_dirs ( ) {
462+ warn ! ( "failed to purge build directories: {:?}" , e) ;
417463 }
418464 }
419465}
466+
467+ #[ cfg( test) ]
468+ mod test;
0 commit comments