From 6c376a5aec8e63f184915124c04acafa3cdeddb2 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Mon, 2 Jun 2025 17:28:27 -0700 Subject: [PATCH 1/2] fix(cubestore): Delete old metastore snapshots in batches, after updating metastore-current. This limits some damage of a potential scenario where we have a backlog of a large number of snapshots. --- rust/cubestore/cubestore/src/metastore/mod.rs | 95 +++++++++++++ .../cubestore/src/metastore/rocks_fs.rs | 132 ++++++++++-------- 2 files changed, 169 insertions(+), 58 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 414ef7353eae8..5da496af1fcbd 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -6544,6 +6544,101 @@ mod tests { let _ = fs::remove_dir_all(remote_store_path.clone()); } + #[tokio::test] + async fn delete_old_snapshots() { + let config = Config::test("delete_old_snapshots").update_config(|mut obj| { + obj.metastore_snapshots_lifetime = 1; + obj.minimum_metastore_snapshots_count = 2; + obj + }); + let store_path = env::current_dir() + .unwrap() + .join("delete_old_snapshots-local"); + let remote_store_path = env::current_dir() + .unwrap() + .join("delete_old_snapshots-remote"); + let _ = fs::remove_dir_all(&store_path); + let _ = fs::remove_dir_all(&remote_store_path); + let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); + { + let meta_store = RocksMetaStore::new( + store_path.join("metastore").as_path(), + BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), + config.config_obj(), + ) + .unwrap(); + + // let list = remote_fs.list("metastore-".to_owned()).await.unwrap(); + // assert_eq!(0, list.len(), "remote fs list: {:?}", list); + + let uploaded = + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") + .await + .unwrap(); + assert_eq!(uploaded.len(), 0); + + meta_store + .create_schema("foo1".to_string(), false) + .await + .unwrap(); + + meta_store.upload_check_point().await.unwrap(); + let uploaded1 = + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") + .await + .unwrap(); + + assert_eq!(uploaded1.len(), 1); + + meta_store + .create_schema("foo2".to_string(), false) + .await + .unwrap(); + + meta_store.upload_check_point().await.unwrap(); + + let uploaded2 = + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") + .await + .unwrap(); + + assert_eq!(uploaded2.len(), 2); + + meta_store + .create_schema("foo3".to_string(), false) + .await + .unwrap(); + + meta_store.upload_check_point().await.unwrap(); + + let uploaded3 = + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") + .await + .unwrap(); + + assert_eq!(uploaded3.len(), 3); + + meta_store + .create_schema("foo4".to_string(), false) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(1100)).await; + meta_store.upload_check_point().await.unwrap(); + + let uploaded4 = + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") + .await + .unwrap(); + + // Should have 2 remaining snapshots because 2 is the minimum. + assert_eq!(uploaded4.len(), 2); + } + + let _ = fs::remove_dir_all(&store_path); + let _ = fs::remove_dir_all(&remote_store_path); + } + #[tokio::test] async fn swap_active_partitions() { let config = Config::test("swap_active_partitions"); diff --git a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs index 8ff0dca9d0712..c917eaca5e22c 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs @@ -6,11 +6,9 @@ use crate::CubeError; use async_trait::async_trait; use datafusion::cube_ext; use futures::future::join_all; -use itertools::Itertools; use log::{error, info}; use regex::Regex; -use std::collections::BTreeSet; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashMap}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; @@ -57,6 +55,7 @@ pub struct BaseRocksStoreFs { name: &'static str, minimum_snapshots_count: u64, snapshots_lifetime: u64, + remote_files_cleanup_batch_size: u64, } impl BaseRocksStoreFs { @@ -66,11 +65,13 @@ impl BaseRocksStoreFs { ) -> Arc { let minimum_snapshots_count = config.minimum_metastore_snapshots_count(); let snapshots_lifetime = config.metastore_snapshots_lifetime(); + let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size(); Arc::new(Self { remote_fs, name: "metastore", minimum_snapshots_count, snapshots_lifetime, + remote_files_cleanup_batch_size, }) } pub fn new_for_cachestore( @@ -79,11 +80,13 @@ impl BaseRocksStoreFs { ) -> Arc { let minimum_snapshots_count = config.minimum_cachestore_snapshots_count(); let snapshots_lifetime = config.cachestore_snapshots_lifetime(); + let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size(); Arc::new(Self { remote_fs, name: "cachestore", minimum_snapshots_count, snapshots_lifetime, + remote_files_cleanup_batch_size, }) } @@ -135,63 +138,78 @@ impl BaseRocksStoreFs { Ok(upload_results) } + + // Exposed for tests + pub async fn list_files_by_snapshot( + remote_fs: &dyn RemoteFs, + name: &str, + ) -> Result>, CubeError> { + let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?; + let mut snapshot_map = HashMap::>::new(); + for existing in existing_metastore_files.into_iter() { + let path = existing.split("/").nth(0).map(|p| { + u128::from_str( + &p.replace(&format!("{}-", name), "") + .replace("-index-logs", "") + .replace("-logs", ""), + ) + }); + if let Some(Ok(millis)) = path { + snapshot_map + .entry(millis) + .or_insert(Vec::new()) + .push(existing); + } + } + Ok(snapshot_map) + } + pub async fn delete_old_snapshots(&self) -> Result, CubeError> { - let existing_metastore_files = self.remote_fs.list(format!("{}-", self.name)).await?; - let candidates = existing_metastore_files - .iter() - .filter_map(|existing| { - let path = existing.split("/").nth(0).map(|p| { - u128::from_str( - &p.replace(&format!("{}-", self.name), "") - .replace("-index-logs", "") - .replace("-logs", ""), - ) - }); - if let Some(Ok(millis)) = path { - Some((existing, millis)) - } else { - None - } - }) - .collect::>(); + let candidates_map = + Self::list_files_by_snapshot(self.remote_fs.as_ref(), &self.name).await?; let lifetime_ms = (self.snapshots_lifetime as u128) * 1000; let min_snapshots_count = self.minimum_snapshots_count as usize; - let mut snapshots_list = candidates - .iter() - .map(|(_, ms)| ms.to_owned()) - .unique() - .collect::>(); - snapshots_list.sort_unstable_by(|a, b| b.cmp(a)); + // snapshots_list sorted by oldest first. + let mut snapshots_list: Vec = candidates_map.keys().cloned().collect::>(); + snapshots_list.sort_unstable(); - let snapshots_to_delete = snapshots_list - .into_iter() - .skip(min_snapshots_count) - .filter(|ms| { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() - - ms - > lifetime_ms - }) - .collect::>(); + if snapshots_list.len() <= min_snapshots_count { + return Ok(vec![]); + } + snapshots_list.truncate(snapshots_list.len() - min_snapshots_count); - if !snapshots_to_delete.is_empty() { - let to_delete = candidates - .into_iter() - .filter_map(|(path, ms)| { - if snapshots_to_delete.contains(&ms) { - Some(path.to_owned()) - } else { - None - } - }) - .unique() - .collect::>(); + let cutoff_time_ms: u128 = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() + - lifetime_ms; + + while !snapshots_list.is_empty() && *snapshots_list.last().unwrap() >= cutoff_time_ms { + snapshots_list.pop(); + } + + let snapshots_list = snapshots_list; + + if snapshots_list.is_empty() { + // Avoid empty join_all, iteration, etc. + return Ok(vec![]); + } + + let mut to_delete = Vec::new(); + + for ms in snapshots_list { + to_delete.extend_from_slice(&candidates_map[&ms]); + } + + for batch in to_delete.chunks( + self.remote_files_cleanup_batch_size + .try_into() + .unwrap_or(usize::MAX), + ) { for v in join_all( - to_delete + batch .iter() .map(|f| self.remote_fs.delete_file(f.to_string())) .collect::>(), @@ -201,11 +219,9 @@ impl BaseRocksStoreFs { { v?; } - - Ok(to_delete) - } else { - Ok(vec![]) } + + Ok(to_delete) } pub async fn is_remote_metadata_exists(&self) -> Result { @@ -367,10 +383,10 @@ impl MetaStoreFs for BaseRocksStoreFs { self.upload_snapsots_files(&remote_path, &checkpoint_path) .await?; - self.delete_old_snapshots().await?; - self.write_metastore_current(&remote_path).await?; + self.delete_old_snapshots().await?; + Ok(()) } async fn load_metastore_logs( From 4615882e6971625e705d4141d125c544902051d2 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Tue, 3 Jun 2025 04:01:18 -0700 Subject: [PATCH 2/2] chore(cubestore): Make delete_old_snapshots having logging, avoid additional memory use --- .../cubestore/cubestore/src/metastore/rocks_fs.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs index c917eaca5e22c..109585ca85668 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs @@ -145,6 +145,12 @@ impl BaseRocksStoreFs { name: &str, ) -> Result>, CubeError> { let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?; + // Log a debug statement so that we can rule out the filename list itself being too large for memory. + log::debug!( + "Listed existing {} files, count = {}", + name, + existing_metastore_files.len() + ); let mut snapshot_map = HashMap::>::new(); for existing in existing_metastore_files.into_iter() { let path = existing.split("/").nth(0).map(|p| { @@ -197,10 +203,15 @@ impl BaseRocksStoreFs { return Ok(vec![]); } - let mut to_delete = Vec::new(); + let mut to_delete: Vec = Vec::new(); + let mut candidates_map = candidates_map; for ms in snapshots_list { - to_delete.extend_from_slice(&candidates_map[&ms]); + to_delete.append( + candidates_map + .get_mut(&ms) + .expect("delete_old_snapshots candidates_map lookup should succeed"), + ); } for batch in to_delete.chunks(