@@ -191,7 +191,6 @@ impl QueueRemoteFs {
191191 Ok ( ( ) )
192192 }
193193
194- const CLEANUP_INTERVAL : Duration = Duration :: from_secs ( 600 ) ;
195194 /// Periodically cleans up the local directory from the files removed on the remote side.
196195 /// This function currently removes only direct sibling files and does not touch subdirectories.
197196 /// So e.g. we remove the `.parquet` files, but not directories like `metastore` or heartbeat.
@@ -201,10 +200,12 @@ impl QueueRemoteFs {
201200 async fn cleanup_loop ( & self ) -> ( ) {
202201 let local_dir = self . local_path ( ) . await ;
203202 let mut stopped_rx = self . stopped_rx . clone ( ) ;
203+ let cleanup_interval = Duration :: from_secs ( self . config . local_files_cleanup_interval_secs ( ) ) ;
204+ let cleanup_files_size_threshold = self . config . local_files_cleanup_size_threshold ( ) ;
204205 loop {
205206 // Do the cleanup every now and then.
206207 tokio:: select! {
207- ( ) = tokio:: time:: sleep( Self :: CLEANUP_INTERVAL ) => { } ,
208+ ( ) = tokio:: time:: sleep( cleanup_interval ) => { } ,
208209 res = stopped_rx. changed( ) => {
209210 if res. is_err( ) || * stopped_rx. borrow( ) {
210211 return ;
@@ -216,9 +217,10 @@ impl QueueRemoteFs {
216217 // We rely on RemoteFs implementations to upload the file to the server before they make
217218 // it available on the local filesystem.
218219 let local_dir_copy = local_dir. clone ( ) ;
219- let res_local_files =
220- cube_ext :: spawn_blocking ( move || -> Result < HashSet < String > , std:: io:: Error > {
220+ let res_local_files = cube_ext :: spawn_blocking (
221+ move || -> Result < ( HashSet < String > , u64 ) , std:: io:: Error > {
221222 let mut local_files = HashSet :: new ( ) ;
223+ let mut local_files_size = 0 ;
222224 for res_entry in Path :: new ( & local_dir_copy) . read_dir ( ) ? {
223225 let entry = match res_entry {
224226 Err ( _) => continue , // ignore errors, might come from concurrent fs ops.
@@ -233,6 +235,12 @@ impl QueueRemoteFs {
233235 continue ;
234236 }
235237
238+ local_files_size = if let Ok ( metadata) = entry. metadata ( ) {
239+ metadata. len ( )
240+ } else {
241+ 0
242+ } ;
243+
236244 let file_name = match entry. file_name ( ) . into_string ( ) {
237245 Err ( _) => {
238246 log:: error!( "could not convert file name {:?}" , entry. file_name( ) ) ;
@@ -243,19 +251,24 @@ impl QueueRemoteFs {
243251
244252 local_files. insert ( file_name) ;
245253 }
246- Ok ( local_files)
247- } )
248- . await
249- . unwrap ( ) ;
254+ Ok ( ( local_files, local_files_size) )
255+ } ,
256+ )
257+ . await
258+ . unwrap ( ) ;
250259
251- let mut local_files = match res_local_files {
260+ let ( mut local_files, local_files_size ) = match res_local_files {
252261 Err ( e) => {
253262 log:: error!( "error while trying to list local files: {}" , e) ;
254263 continue ;
255264 }
256265 Ok ( f) => f,
257266 } ;
258267
268+ if local_files_size < cleanup_files_size_threshold {
269+ continue ;
270+ }
271+
259272 let res_remote_files = self . list ( "" ) . await ;
260273 let remote_files = match res_remote_files {
261274 Err ( e) => {
0 commit comments