@@ -113,6 +113,7 @@ impl RemoteFsCleanup {
113113 let cleanup_local_files_delay =
114114 Duration :: from_secs ( self . config . local_files_cleanup_delay_secs ( ) ) ;
115115
116+ let mut files_to_remove: HashSet < String > = HashSet :: new ( ) ;
116117 loop {
117118 // Do the cleanup every now and then.
118119 tokio:: select! {
@@ -122,6 +123,40 @@ impl RemoteFsCleanup {
122123 }
123124 }
124125
126+ //We delete files on the next iteration after building the file list in order to give time for requests that may use these files to complete
127+ if !files_to_remove. is_empty ( ) {
128+ log:: debug!(
129+ "Cleaning up {} files that were removed remotely" ,
130+ files_to_remove. len( )
131+ ) ;
132+ log:: trace!( "The files being removed are {:?}" , files_to_remove) ;
133+ //Double check that files don't exists in metastore
134+ let files_from_metastore = match self . metastore . get_all_filenames ( ) . await {
135+ Err ( e) => {
136+ log:: error!( "could not get the list of files from metastore: {}" , e) ;
137+ continue ;
138+ }
139+ Ok ( f) => f,
140+ } ;
141+
142+ // Only keep the files we want to remove in `local_files`.
143+ for f in files_from_metastore {
144+ files_to_remove. remove ( & f) ;
145+ }
146+
147+ let local_dir_copy = local_dir. clone ( ) ;
148+ let mut files_to_remove_to_move = HashSet :: new ( ) ;
149+ std:: mem:: swap ( & mut files_to_remove, & mut files_to_remove_to_move) ;
150+ cube_ext:: spawn_blocking ( move || {
151+ for f in files_to_remove_to_move {
152+ let _ = std:: fs:: remove_file ( Path :: new ( & local_dir_copy) . join ( f) ) ;
153+ }
154+ } )
155+ . await
156+ . unwrap ( ) ;
157+ }
158+ files_to_remove. clear ( ) ;
159+
125160 let local_dir_copy = local_dir. clone ( ) ;
126161 let res_local_files =
127162 cube_ext:: spawn_blocking ( move || -> Result < HashSet < String > , std:: io:: Error > {
@@ -213,21 +248,8 @@ impl RemoteFsCleanup {
213248 }
214249
215250 if !local_files. is_empty ( ) {
216- log:: debug!(
217- "Cleaning up {} files that were removed remotely" ,
218- local_files. len( )
219- ) ;
220- log:: trace!( "The files being removed are {:?}" , local_files) ;
251+ files_to_remove = local_files;
221252 }
222-
223- let local_dir_copy = local_dir. clone ( ) ;
224- cube_ext:: spawn_blocking ( move || {
225- for f in local_files {
226- let _ = std:: fs:: remove_file ( Path :: new ( & local_dir_copy) . join ( f) ) ;
227- }
228- } )
229- . await
230- . unwrap ( ) ;
231253 }
232254 }
233255}
@@ -237,6 +259,17 @@ mod test {
237259 use crate :: config:: Config ;
238260 use futures_timer:: Delay ;
239261
262+ fn is_root_partition ( name : & str ) -> bool {
263+ name. starts_with ( "1-" ) && !name. ends_with ( ".chunk.parquet" )
264+ }
265+
266+ fn remove_root_paritition ( names : Vec < String > ) -> Vec < String > {
267+ names
268+ . into_iter ( )
269+ . filter ( |name| !is_root_partition ( name) )
270+ . collect :: < Vec < _ > > ( )
271+ }
272+
240273 #[ tokio:: test]
241274 async fn queue_cleanup_local_files ( ) {
242275 Config :: test ( "cleanup_local_files" )
@@ -262,7 +295,7 @@ mod test {
262295 . exec_query ( "INSERT INTO test.tst (a, b) VALUES (20, 20), (40 , 40)" )
263296 . await
264297 . unwrap ( ) ;
265- let files = meta_store. get_all_filenames ( ) . await . unwrap ( ) ;
298+ let files = remove_root_paritition ( meta_store. get_all_filenames ( ) . await . unwrap ( ) ) ;
266299 assert_eq ! ( files. len( ) , 2 ) ;
267300 for f in files. iter ( ) {
268301 let path = remote_fs. local_file ( & f) . await . unwrap ( ) ;
@@ -276,12 +309,15 @@ mod test {
276309 . await
277310 . unwrap ( ) ;
278311
279- assert_eq ! ( meta_store. get_all_filenames( ) . await . unwrap( ) . len( ) , 1 ) ;
312+ assert_eq ! (
313+ remove_root_paritition( meta_store. get_all_filenames( ) . await . unwrap( ) ) . len( ) ,
314+ 1
315+ ) ;
280316 for f in files. iter ( ) {
281317 let path = remote_fs. local_file ( & f) . await . unwrap ( ) ;
282318 assert ! ( Path :: new( & path) . exists( ) ) ;
283319 }
284- Delay :: new ( Duration :: from_millis ( 3000 ) ) . await ; // TODO logger init conflict
320+ Delay :: new ( Duration :: from_millis ( 4000 ) ) . await ; // TODO logger init conflict
285321
286322 let path = remote_fs. local_file ( & files[ 0 ] ) . await . unwrap ( ) ;
287323 assert ! ( !Path :: new( & path) . exists( ) ) ;
0 commit comments