Skip to content

Commit c592c8b

Browse files
committed
fix(cubestore): Delete old metastore snapshots in batches, after updating metastore-current.
This limits some damage of a potential scenario where we have a backlog of a large number of snapshots.
1 parent 24abdea commit c592c8b

File tree

2 files changed

+143
-58
lines changed

2 files changed

+143
-58
lines changed

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6544,6 +6544,87 @@ mod tests {
65446544
let _ = fs::remove_dir_all(remote_store_path.clone());
65456545
}
65466546

6547+
#[tokio::test]
6548+
async fn delete_old_snapshots() {
6549+
let config = Config::test("delete_old_snapshots")
6550+
.update_config(|mut obj| {
6551+
obj.metastore_snapshots_lifetime = 1;
6552+
obj.minimum_metastore_snapshots_count = 2;
6553+
obj
6554+
});
6555+
let store_path = env::current_dir()
6556+
.unwrap()
6557+
.join("delete_old_snapshots-local");
6558+
let remote_store_path = env::current_dir()
6559+
.unwrap()
6560+
.join("delete_old_snapshots-remote");
6561+
let _ = fs::remove_dir_all(&store_path);
6562+
let _ = fs::remove_dir_all(&remote_store_path);
6563+
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
6564+
{
6565+
let meta_store = RocksMetaStore::new(
6566+
store_path.join("metastore").as_path(),
6567+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
6568+
config.config_obj(),
6569+
)
6570+
.unwrap();
6571+
6572+
// let list = remote_fs.list("metastore-".to_owned()).await.unwrap();
6573+
// assert_eq!(0, list.len(), "remote fs list: {:?}", list);
6574+
6575+
let uploaded = BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await.unwrap();
6576+
assert_eq!(uploaded.len(), 0);
6577+
6578+
meta_store
6579+
.create_schema("foo1".to_string(), false)
6580+
.await
6581+
.unwrap();
6582+
6583+
meta_store.upload_check_point().await.unwrap();
6584+
let uploaded1 = BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await.unwrap();
6585+
6586+
assert_eq!(uploaded1.len(), 1);
6587+
6588+
meta_store
6589+
.create_schema("foo2".to_string(), false)
6590+
.await
6591+
.unwrap();
6592+
6593+
meta_store.upload_check_point().await.unwrap();
6594+
6595+
let uploaded2 = BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await.unwrap();
6596+
6597+
assert_eq!(uploaded2.len(), 2);
6598+
6599+
meta_store
6600+
.create_schema("foo3".to_string(), false)
6601+
.await
6602+
.unwrap();
6603+
6604+
meta_store.upload_check_point().await.unwrap();
6605+
6606+
let uploaded3 = BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await.unwrap();
6607+
6608+
assert_eq!(uploaded3.len(), 3);
6609+
6610+
meta_store
6611+
.create_schema("foo4".to_string(), false)
6612+
.await
6613+
.unwrap();
6614+
6615+
tokio::time::sleep(Duration::from_millis(1100)).await;
6616+
meta_store.upload_check_point().await.unwrap();
6617+
6618+
let uploaded4 = BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await.unwrap();
6619+
6620+
// Should have 2 remaining snapshots because 2 is the minimum.
6621+
assert_eq!(uploaded4.len(), 2);
6622+
}
6623+
6624+
let _ = fs::remove_dir_all(&store_path);
6625+
let _ = fs::remove_dir_all(&remote_store_path);
6626+
}
6627+
65476628
#[tokio::test]
65486629
async fn swap_active_partitions() {
65496630
let config = Config::test("swap_active_partitions");

rust/cubestore/cubestore/src/metastore/rocks_fs.rs

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ use crate::CubeError;
66
use async_trait::async_trait;
77
use datafusion::cube_ext;
88
use futures::future::join_all;
9-
use itertools::Itertools;
109
use log::{error, info};
1110
use regex::Regex;
12-
use std::collections::BTreeSet;
13-
use std::collections::HashSet;
11+
use std::collections::{BTreeSet, HashMap};
1412
use std::path::{Path, PathBuf};
1513
use std::str::FromStr;
1614
use std::sync::Arc;
@@ -57,6 +55,7 @@ pub struct BaseRocksStoreFs {
5755
name: &'static str,
5856
minimum_snapshots_count: u64,
5957
snapshots_lifetime: u64,
58+
remote_files_cleanup_batch_size: u64,
6059
}
6160

6261
impl BaseRocksStoreFs {
@@ -66,11 +65,13 @@ impl BaseRocksStoreFs {
6665
) -> Arc<Self> {
6766
let minimum_snapshots_count = config.minimum_metastore_snapshots_count();
6867
let snapshots_lifetime = config.metastore_snapshots_lifetime();
68+
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
6969
Arc::new(Self {
7070
remote_fs,
7171
name: "metastore",
7272
minimum_snapshots_count,
7373
snapshots_lifetime,
74+
remote_files_cleanup_batch_size,
7475
})
7576
}
7677
pub fn new_for_cachestore(
@@ -79,11 +80,13 @@ impl BaseRocksStoreFs {
7980
) -> Arc<Self> {
8081
let minimum_snapshots_count = config.minimum_cachestore_snapshots_count();
8182
let snapshots_lifetime = config.cachestore_snapshots_lifetime();
83+
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
8284
Arc::new(Self {
8385
remote_fs,
8486
name: "cachestore",
8587
minimum_snapshots_count,
8688
snapshots_lifetime,
89+
remote_files_cleanup_batch_size,
8790
})
8891
}
8992

@@ -135,63 +138,66 @@ impl BaseRocksStoreFs {
135138

136139
Ok(upload_results)
137140
}
141+
142+
// Exposed for tests
143+
pub async fn list_files_by_snapshot(remote_fs: &dyn RemoteFs, name: &str) -> Result<HashMap<u128, Vec<String>>, CubeError> {
144+
let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?;
145+
let mut snapshot_map = HashMap::<u128, Vec<String>>::new();
146+
for existing in existing_metastore_files.into_iter() {
147+
let path = existing.split("/").nth(0).map(|p| {
148+
u128::from_str(
149+
&p.replace(&format!("{}-", name), "")
150+
.replace("-index-logs", "")
151+
.replace("-logs", ""),
152+
)
153+
});
154+
if let Some(Ok(millis)) = path {
155+
snapshot_map.entry(millis).or_insert(Vec::new()).push(existing);
156+
}
157+
}
158+
Ok(snapshot_map)
159+
}
160+
138161
pub async fn delete_old_snapshots(&self) -> Result<Vec<String>, CubeError> {
139-
let existing_metastore_files = self.remote_fs.list(format!("{}-", self.name)).await?;
140-
let candidates = existing_metastore_files
141-
.iter()
142-
.filter_map(|existing| {
143-
let path = existing.split("/").nth(0).map(|p| {
144-
u128::from_str(
145-
&p.replace(&format!("{}-", self.name), "")
146-
.replace("-index-logs", "")
147-
.replace("-logs", ""),
148-
)
149-
});
150-
if let Some(Ok(millis)) = path {
151-
Some((existing, millis))
152-
} else {
153-
None
154-
}
155-
})
156-
.collect::<Vec<_>>();
162+
let candidates_map = Self::list_files_by_snapshot(self.remote_fs.as_ref(), &self.name).await?;
157163

158164
let lifetime_ms = (self.snapshots_lifetime as u128) * 1000;
159165
let min_snapshots_count = self.minimum_snapshots_count as usize;
160166

161-
let mut snapshots_list = candidates
162-
.iter()
163-
.map(|(_, ms)| ms.to_owned())
164-
.unique()
165-
.collect::<Vec<_>>();
166-
snapshots_list.sort_unstable_by(|a, b| b.cmp(a));
167+
// snapshots_list sorted by oldest first.
168+
let mut snapshots_list: Vec<u128> = candidates_map.keys().cloned().collect::<Vec<_>>();
169+
snapshots_list.sort_unstable();
167170

168-
let snapshots_to_delete = snapshots_list
169-
.into_iter()
170-
.skip(min_snapshots_count)
171-
.filter(|ms| {
172-
SystemTime::now()
173-
.duration_since(SystemTime::UNIX_EPOCH)
174-
.unwrap()
175-
.as_millis()
176-
- ms
177-
> lifetime_ms
178-
})
179-
.collect::<HashSet<_>>();
171+
if snapshots_list.len() <= min_snapshots_count {
172+
return Ok(vec![]);
173+
}
174+
snapshots_list.truncate(snapshots_list.len() - min_snapshots_count);
180175

181-
if !snapshots_to_delete.is_empty() {
182-
let to_delete = candidates
183-
.into_iter()
184-
.filter_map(|(path, ms)| {
185-
if snapshots_to_delete.contains(&ms) {
186-
Some(path.to_owned())
187-
} else {
188-
None
189-
}
190-
})
191-
.unique()
192-
.collect::<Vec<_>>();
176+
let cutoff_time_ms: u128 = SystemTime::now()
177+
.duration_since(SystemTime::UNIX_EPOCH)
178+
.unwrap()
179+
.as_millis() - lifetime_ms;
180+
181+
while !snapshots_list.is_empty() && *snapshots_list.last().unwrap() >= cutoff_time_ms {
182+
snapshots_list.pop();
183+
}
184+
185+
let snapshots_list = snapshots_list;
186+
187+
if snapshots_list.is_empty() {
188+
// Avoid empty join_all, iteration, etc.
189+
return Ok(vec![]);
190+
}
191+
192+
let mut to_delete = Vec::new();
193+
194+
for ms in snapshots_list {
195+
to_delete.extend_from_slice(&candidates_map[&ms]);
196+
}
197+
198+
for batch in to_delete.chunks(self.remote_files_cleanup_batch_size.try_into().unwrap_or(usize::MAX)) {
193199
for v in join_all(
194-
to_delete
200+
batch
195201
.iter()
196202
.map(|f| self.remote_fs.delete_file(f.to_string()))
197203
.collect::<Vec<_>>(),
@@ -201,11 +207,9 @@ impl BaseRocksStoreFs {
201207
{
202208
v?;
203209
}
204-
205-
Ok(to_delete)
206-
} else {
207-
Ok(vec![])
208210
}
211+
212+
Ok(to_delete)
209213
}
210214

211215
pub async fn is_remote_metadata_exists(&self) -> Result<bool, CubeError> {
@@ -367,10 +371,10 @@ impl MetaStoreFs for BaseRocksStoreFs {
367371
self.upload_snapsots_files(&remote_path, &checkpoint_path)
368372
.await?;
369373

370-
self.delete_old_snapshots().await?;
371-
372374
self.write_metastore_current(&remote_path).await?;
373375

376+
self.delete_old_snapshots().await?;
377+
374378
Ok(())
375379
}
376380
async fn load_metastore_logs(

0 commit comments

Comments
 (0)