@@ -6,11 +6,9 @@ use crate::CubeError;
66use async_trait:: async_trait;
77use datafusion:: cube_ext;
88use futures:: future:: join_all;
9- use itertools:: Itertools ;
109use log:: { error, info} ;
1110use regex:: Regex ;
12- use std:: collections:: BTreeSet ;
13- use std:: collections:: HashSet ;
11+ use std:: collections:: { BTreeSet , HashMap } ;
1412use std:: path:: { Path , PathBuf } ;
1513use std:: str:: FromStr ;
1614use std:: sync:: Arc ;
@@ -57,6 +55,7 @@ pub struct BaseRocksStoreFs {
5755 name : & ' static str ,
5856 minimum_snapshots_count : u64 ,
5957 snapshots_lifetime : u64 ,
58+ remote_files_cleanup_batch_size : u64 ,
6059}
6160
6261impl BaseRocksStoreFs {
@@ -66,11 +65,13 @@ impl BaseRocksStoreFs {
6665 ) -> Arc < Self > {
6766 let minimum_snapshots_count = config. minimum_metastore_snapshots_count ( ) ;
6867 let snapshots_lifetime = config. metastore_snapshots_lifetime ( ) ;
68+ let remote_files_cleanup_batch_size = config. remote_files_cleanup_batch_size ( ) ;
6969 Arc :: new ( Self {
7070 remote_fs,
7171 name : "metastore" ,
7272 minimum_snapshots_count,
7373 snapshots_lifetime,
74+ remote_files_cleanup_batch_size,
7475 } )
7576 }
7677 pub fn new_for_cachestore (
@@ -79,11 +80,13 @@ impl BaseRocksStoreFs {
7980 ) -> Arc < Self > {
8081 let minimum_snapshots_count = config. minimum_cachestore_snapshots_count ( ) ;
8182 let snapshots_lifetime = config. cachestore_snapshots_lifetime ( ) ;
83+ let remote_files_cleanup_batch_size = config. remote_files_cleanup_batch_size ( ) ;
8284 Arc :: new ( Self {
8385 remote_fs,
8486 name : "cachestore" ,
8587 minimum_snapshots_count,
8688 snapshots_lifetime,
89+ remote_files_cleanup_batch_size,
8790 } )
8891 }
8992
@@ -135,63 +138,89 @@ impl BaseRocksStoreFs {
135138
136139 Ok ( upload_results)
137140 }
141+
142+ // Exposed for tests
143+ pub async fn list_files_by_snapshot (
144+ remote_fs : & dyn RemoteFs ,
145+ name : & str ,
146+ ) -> Result < HashMap < u128 , Vec < String > > , CubeError > {
147+ let existing_metastore_files = remote_fs. list ( format ! ( "{}-" , name) ) . await ?;
148+ // Log a debug statement so that we can rule out the filename list itself being too large for memory.
149+ log:: debug!(
150+ "Listed existing {} files, count = {}" ,
151+ name,
152+ existing_metastore_files. len( )
153+ ) ;
154+ let mut snapshot_map = HashMap :: < u128 , Vec < String > > :: new ( ) ;
155+ for existing in existing_metastore_files. into_iter ( ) {
156+ let path = existing. split ( "/" ) . nth ( 0 ) . map ( |p| {
157+ u128:: from_str (
158+ & p. replace ( & format ! ( "{}-" , name) , "" )
159+ . replace ( "-index-logs" , "" )
160+ . replace ( "-logs" , "" ) ,
161+ )
162+ } ) ;
163+ if let Some ( Ok ( millis) ) = path {
164+ snapshot_map
165+ . entry ( millis)
166+ . or_insert ( Vec :: new ( ) )
167+ . push ( existing) ;
168+ }
169+ }
170+ Ok ( snapshot_map)
171+ }
172+
138173 pub async fn delete_old_snapshots ( & self ) -> Result < Vec < String > , CubeError > {
139- let existing_metastore_files = self . remote_fs . list ( format ! ( "{}-" , self . name) ) . await ?;
140- let candidates = existing_metastore_files
141- . iter ( )
142- . filter_map ( |existing| {
143- let path = existing. split ( "/" ) . nth ( 0 ) . map ( |p| {
144- u128:: from_str (
145- & p. replace ( & format ! ( "{}-" , self . name) , "" )
146- . replace ( "-index-logs" , "" )
147- . replace ( "-logs" , "" ) ,
148- )
149- } ) ;
150- if let Some ( Ok ( millis) ) = path {
151- Some ( ( existing, millis) )
152- } else {
153- None
154- }
155- } )
156- . collect :: < Vec < _ > > ( ) ;
174+ let candidates_map =
175+ Self :: list_files_by_snapshot ( self . remote_fs . as_ref ( ) , & self . name ) . await ?;
157176
158177 let lifetime_ms = ( self . snapshots_lifetime as u128 ) * 1000 ;
159178 let min_snapshots_count = self . minimum_snapshots_count as usize ;
160179
161- let mut snapshots_list = candidates
162- . iter ( )
163- . map ( |( _, ms) | ms. to_owned ( ) )
164- . unique ( )
165- . collect :: < Vec < _ > > ( ) ;
166- snapshots_list. sort_unstable_by ( |a, b| b. cmp ( a) ) ;
180+ // snapshots_list sorted by oldest first.
181+ let mut snapshots_list: Vec < u128 > = candidates_map. keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
182+ snapshots_list. sort_unstable ( ) ;
167183
168- let snapshots_to_delete = snapshots_list
169- . into_iter ( )
170- . skip ( min_snapshots_count)
171- . filter ( |ms| {
172- SystemTime :: now ( )
173- . duration_since ( SystemTime :: UNIX_EPOCH )
174- . unwrap ( )
175- . as_millis ( )
176- - ms
177- > lifetime_ms
178- } )
179- . collect :: < HashSet < _ > > ( ) ;
184+ if snapshots_list. len ( ) <= min_snapshots_count {
185+ return Ok ( vec ! [ ] ) ;
186+ }
187+ snapshots_list. truncate ( snapshots_list. len ( ) - min_snapshots_count) ;
180188
181- if !snapshots_to_delete. is_empty ( ) {
182- let to_delete = candidates
183- . into_iter ( )
184- . filter_map ( |( path, ms) | {
185- if snapshots_to_delete. contains ( & ms) {
186- Some ( path. to_owned ( ) )
187- } else {
188- None
189- }
190- } )
191- . unique ( )
192- . collect :: < Vec < _ > > ( ) ;
189+ let cutoff_time_ms: u128 = SystemTime :: now ( )
190+ . duration_since ( SystemTime :: UNIX_EPOCH )
191+ . unwrap ( )
192+ . as_millis ( )
193+ - lifetime_ms;
194+
195+ while !snapshots_list. is_empty ( ) && * snapshots_list. last ( ) . unwrap ( ) >= cutoff_time_ms {
196+ snapshots_list. pop ( ) ;
197+ }
198+
199+ let snapshots_list = snapshots_list;
200+
201+ if snapshots_list. is_empty ( ) {
202+ // Avoid empty join_all, iteration, etc.
203+ return Ok ( vec ! [ ] ) ;
204+ }
205+
206+ let mut to_delete: Vec < String > = Vec :: new ( ) ;
207+
208+ let mut candidates_map = candidates_map;
209+ for ms in snapshots_list {
210+ to_delete. append (
211+ candidates_map
212+ . get_mut ( & ms)
213+ . expect ( "delete_old_snapshots candidates_map lookup should succeed" ) ,
214+ ) ;
215+ }
216+
217+ for batch in to_delete. chunks (
218+ self . remote_files_cleanup_batch_size
219+ . try_into ( )
220+ . unwrap_or ( usize:: MAX ) ,
221+ ) {
193222 for v in join_all (
194- to_delete
223+ batch
195224 . iter ( )
196225 . map ( |f| self . remote_fs . delete_file ( f. to_string ( ) ) )
197226 . collect :: < Vec < _ > > ( ) ,
@@ -201,11 +230,9 @@ impl BaseRocksStoreFs {
201230 {
202231 v?;
203232 }
204-
205- Ok ( to_delete)
206- } else {
207- Ok ( vec ! [ ] )
208233 }
234+
235+ Ok ( to_delete)
209236 }
210237
211238 pub async fn is_remote_metadata_exists ( & self ) -> Result < bool , CubeError > {
@@ -367,10 +394,10 @@ impl MetaStoreFs for BaseRocksStoreFs {
367394 self . upload_snapsots_files ( & remote_path, & checkpoint_path)
368395 . await ?;
369396
370- self . delete_old_snapshots ( ) . await ?;
371-
372397 self . write_metastore_current ( & remote_path) . await ?;
373398
399+ self . delete_old_snapshots ( ) . await ?;
400+
374401 Ok ( ( ) )
375402 }
376403 async fn load_metastore_logs (
0 commit comments