11use std:: {
2+ io,
23 path:: {
34 Path ,
45 PathBuf ,
@@ -51,18 +52,24 @@ use super::{
5152} ;
5253use crate :: SearchFileType ;
5354
54- struct IndexMeta {
55- size : u64 ,
56- path : PathBuf ,
55+ struct IndexTempDir {
56+ dir : PathBuf ,
5757 cleaner : CacheCleaner ,
5858}
5959
60- impl Drop for IndexMeta {
60+ impl Drop for IndexTempDir {
6161 fn drop ( & mut self ) {
62- let _ = self . cleaner . attempt_cleanup ( self . path . clone ( ) ) ;
62+ let _ = self . cleaner . attempt_cleanup ( self . dir . clone ( ) ) ;
6363 }
6464}
6565
66+ struct IndexMeta {
67+ size : u64 ,
68+ /// A path under `tempdir.dir`; may not be the directory itself
69+ path : PathBuf ,
70+ _tempdir : IndexTempDir ,
71+ }
72+
6673impl SizedValue for IndexMeta {
6774 fn size ( & self ) -> u64 {
6875 self . size
@@ -135,7 +142,7 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
135142 search_storage : Arc < dyn Storage > ,
136143 key : ObjectKey ,
137144 search_file_type : SearchFileType ,
138- destination : PathBuf ,
145+ destination : IndexTempDir ,
139146 ) -> anyhow:: Result < IndexMeta > {
140147 let timer = metrics:: archive_fetch_timer ( ) ;
141148 let archive = search_storage
@@ -145,27 +152,20 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
145152 . into_tokio_reader ( ) ;
146153 let extract_archive_timer = metrics:: extract_archive_timer ( ) ;
147154 let extract_archive_result = self
148- . extract_archive ( search_file_type, destination. clone ( ) , archive)
155+ . extract_archive ( search_file_type, destination. dir . clone ( ) , archive)
149156 . await ;
150157 extract_archive_timer. finish ( ) ;
151158
152- match extract_archive_result {
153- Ok ( ( bytes_used, path) ) => {
154- if is_immutable ( search_file_type) {
155- set_readonly ( & path, true ) . await ?;
156- }
157- metrics:: finish_archive_fetch ( timer, bytes_used, search_file_type) ;
158- Ok ( IndexMeta {
159- path,
160- size : bytes_used,
161- cleaner : self . cleaner . clone ( ) ,
162- } )
163- } ,
164- Err ( e) => {
165- self . cleaner . attempt_cleanup ( destination) ?;
166- Err ( e)
167- } ,
159+ let ( bytes_used, path) = extract_archive_result?;
160+ if is_immutable ( search_file_type) {
161+ set_readonly ( & path, true ) . await ?;
168162 }
163+ metrics:: finish_archive_fetch ( timer, bytes_used, search_file_type) ;
164+ Ok ( IndexMeta {
165+ _tempdir : destination,
166+ size : bytes_used,
167+ path,
168+ } )
169169 }
170170
171171 async fn extract_archive (
@@ -226,33 +226,32 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
226226 let mut timeout_fut = self . rt . wait ( * ARCHIVE_FETCH_TIMEOUT_SECONDS ) . fuse ( ) ;
227227 let destination = self . cache_path . join ( Uuid :: new_v4 ( ) . simple ( ) . to_string ( ) ) ;
228228
229- let new_destination = destination. clone ( ) ;
229+ // Create this right away so its Drop impl (which deletes the path) runs
230+ // even on failure
231+ let tempdir = IndexTempDir {
232+ cleaner : self . cleaner . clone ( ) ,
233+ dir : destination. clone ( ) ,
234+ } ;
230235 let new_self = self . clone ( ) ;
231236 let new_key = key. clone ( ) ;
232237 // Many parts of the fetch perform blocking operations. To avoid blocking the
233238 // calling thread's scheduling, punt all fetches to a separate OS thread.
234239 let fetch_fut = self
235240 . blocking_thread_pool
236241 . execute_async ( move || {
237- new_self. fetch ( search_storage, new_key, search_file_type, new_destination )
242+ new_self. fetch ( search_storage, new_key, search_file_type, tempdir )
238243 } )
239244 . fuse ( ) ;
240245 pin_mut ! ( fetch_fut) ;
241- let res = select_biased ! {
246+ select_biased ! {
242247 meta = fetch_fut => {
243- meta
248+ meta?
244249 } ,
245250 _ = timeout_fut => {
246251 metrics:: log_cache_fetch_timeout( ) ;
247252 tracing:: error!( "Timed out fetching archive for key {key:?}" ) ;
248- Err ( anyhow:: anyhow!( "Timed out" ) ) }
249- } ;
250-
251- if let Ok ( Ok ( index_meta) ) = res {
252- Ok ( index_meta)
253- } else {
254- self . cleaner . attempt_cleanup ( destination) ?;
255- res?
253+ Err ( anyhow:: anyhow!( "Timed out" ) )
254+ }
256255 }
257256 }
258257}
@@ -402,7 +401,7 @@ fn is_immutable(search_file_type: SearchFileType) -> bool {
402401 }
403402}
404403
405- async fn set_readonly ( path : & PathBuf , readonly : bool ) -> anyhow :: Result < ( ) > {
404+ async fn set_readonly ( path : & PathBuf , readonly : bool ) -> io :: Result < ( ) > {
406405 let metadata = fs:: metadata ( path) . await ?;
407406 let mut permissions = metadata. permissions ( ) ;
408407 permissions. set_readonly ( readonly) ;
@@ -444,11 +443,16 @@ async fn cleanup_thread(mut rx: mpsc::UnboundedReceiver<PathBuf>) {
444443 // production here, we should investigate further but for now, it's simpler
445444 // to disallow inconsistent filesystem state.
446445 tracing:: debug!( "Removing path {} from disk" , path. display( ) ) ;
447- let result: anyhow :: Result < ( ) > = try {
446+ let result: io :: Result < ( ) > = try {
448447 set_readonly ( & path, false ) . await ?;
449448 fs:: remove_dir_all ( path) . await ?;
450449 } ;
451- result. expect ( "ArchiveCacheManager failed to clean up archive directory" ) ;
450+ match result {
451+ Ok ( ( ) ) => ( ) ,
452+ // Can happen if the path to clean up was never created
453+ Err ( e) if e. kind ( ) == io:: ErrorKind :: NotFound => ( ) ,
454+ Err ( e) => panic ! ( "ArchiveCacheManager failed to clean up archive directory: {e:?}" ) ,
455+ }
452456 }
453457}
454458
0 commit comments