Skip to content

Commit f590a20

Browse files
authored
feat(cubestore): Set metastore current snapshot system command (#5940)
1 parent 47956ea commit f590a20

File tree

9 files changed

+502
-36
lines changed

9 files changed

+502
-36
lines changed

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod rocks_fs;
99
mod rocks_store;
1010
mod rocks_table;
1111
pub mod schema;
12+
pub mod snapshot_info;
1213
pub mod source;
1314
pub mod table;
1415
pub mod wal;
@@ -79,6 +80,7 @@ use std::str::FromStr;
7980

8081
use crate::cachestore::CacheItem;
8182
use crate::remotefs::LocalDirRemoteFs;
83+
use snapshot_info::SnapshotInfo;
8284
use std::time::Duration;
8385
use table::Table;
8486
use table::{TableRocksIndex, TableRocksTable};
@@ -1013,6 +1015,9 @@ pub trait MetaStore: DIService + Send + Sync {
10131015
async fn debug_dump(&self, out_path: String) -> Result<(), CubeError>;
10141016
// Force compaction for the whole RocksDB
10151017
async fn compaction(&self) -> Result<(), CubeError>;
1018+
1019+
async fn get_snapshots_list(&self) -> Result<Vec<SnapshotInfo>, CubeError>;
1020+
async fn set_current_snapshot(&self, snapshot_id: u128) -> Result<(), CubeError>;
10161021
}
10171022

10181023
crate::di_service!(RocksMetaStore, [MetaStore]);
@@ -3916,6 +3921,12 @@ impl MetaStore for RocksMetaStore {
39163921
})
39173922
.await
39183923
}
3924+
async fn get_snapshots_list(&self) -> Result<Vec<SnapshotInfo>, CubeError> {
3925+
self.store.get_snapshots_list().await
3926+
}
3927+
async fn set_current_snapshot(&self, snapshot_id: u128) -> Result<(), CubeError> {
3928+
self.store.set_current_snapshot(snapshot_id).await
3929+
}
39193930
}
39203931

39213932
pub async fn deactivate_table_on_corrupt_data<'a, T: 'static>(
@@ -4933,6 +4944,268 @@ mod tests {
49334944
}
49344945
}
49354946

4947+
#[tokio::test]
4948+
async fn get_snapshots_list() {
4949+
{
4950+
let config = Config::test("get_snapshots_list");
4951+
4952+
let _ = fs::remove_dir_all(config.local_dir());
4953+
let _ = fs::remove_dir_all(config.remote_dir());
4954+
4955+
let services = config.configure().await;
4956+
services.start_processing_loops().await.unwrap();
4957+
let snapshots = services
4958+
.rocks_meta_store
4959+
.as_ref()
4960+
.unwrap()
4961+
.get_snapshots_list()
4962+
.await
4963+
.unwrap();
4964+
assert_eq!(snapshots.len(), 0);
4965+
services
4966+
.meta_store
4967+
.create_schema("foo1".to_string(), false)
4968+
.await
4969+
.unwrap();
4970+
assert_eq!(snapshots.len(), 0);
4971+
services
4972+
.rocks_meta_store
4973+
.as_ref()
4974+
.unwrap()
4975+
.upload_check_point()
4976+
.await
4977+
.unwrap();
4978+
let snapshots = services
4979+
.rocks_meta_store
4980+
.as_ref()
4981+
.unwrap()
4982+
.get_snapshots_list()
4983+
.await
4984+
.unwrap();
4985+
assert_eq!(snapshots.len(), 1);
4986+
assert!(snapshots[0].current);
4987+
services
4988+
.meta_store
4989+
.create_schema("foo".to_string(), false)
4990+
.await
4991+
.unwrap();
4992+
services
4993+
.rocks_meta_store
4994+
.as_ref()
4995+
.unwrap()
4996+
.upload_check_point()
4997+
.await
4998+
.unwrap();
4999+
let snapshots = services
5000+
.rocks_meta_store
5001+
.as_ref()
5002+
.unwrap()
5003+
.get_snapshots_list()
5004+
.await
5005+
.unwrap();
5006+
assert_eq!(snapshots.len(), 2);
5007+
assert!(!snapshots[0].current);
5008+
assert!(snapshots[1].current);
5009+
services
5010+
.meta_store
5011+
.create_schema("bar".to_string(), false)
5012+
.await
5013+
.unwrap();
5014+
services
5015+
.rocks_meta_store
5016+
.as_ref()
5017+
.unwrap()
5018+
.upload_check_point()
5019+
.await
5020+
.unwrap();
5021+
let snapshots = services
5022+
.rocks_meta_store
5023+
.as_ref()
5024+
.unwrap()
5025+
.get_snapshots_list()
5026+
.await
5027+
.unwrap();
5028+
assert_eq!(snapshots.len(), 3);
5029+
assert!(!snapshots[0].current);
5030+
assert!(!snapshots[1].current);
5031+
assert!(snapshots[2].current);
5032+
services.stop_processing_loops().await.unwrap();
5033+
5034+
Delay::new(Duration::from_millis(1000)).await; // TODO logger init conflict
5035+
fs::remove_dir_all(config.local_dir()).unwrap();
5036+
}
5037+
5038+
{
5039+
let config = Config::test("get_snapshots_list");
5040+
5041+
let services2 = config.configure().await;
5042+
let snapshots = services2
5043+
.rocks_meta_store
5044+
.as_ref()
5045+
.unwrap()
5046+
.get_snapshots_list()
5047+
.await
5048+
.unwrap();
5049+
assert_eq!(snapshots.len(), 3);
5050+
assert!(!snapshots[0].current);
5051+
assert!(!snapshots[1].current);
5052+
assert!(snapshots[2].current);
5053+
fs::remove_dir_all(config.local_dir()).unwrap();
5054+
fs::remove_dir_all(config.remote_dir()).unwrap();
5055+
}
5056+
}
5057+
#[tokio::test]
5058+
async fn set_current_snapshot() {
5059+
{
5060+
let config = Config::test("set_current_snapshot");
5061+
5062+
let _ = fs::remove_dir_all(config.local_dir());
5063+
let _ = fs::remove_dir_all(config.remote_dir());
5064+
5065+
let services = config.configure().await;
5066+
services.start_processing_loops().await.unwrap();
5067+
let rocks_meta_store = services.rocks_meta_store.as_ref().unwrap();
5068+
services
5069+
.meta_store
5070+
.create_schema("foo1".to_string(), false)
5071+
.await
5072+
.unwrap();
5073+
rocks_meta_store.upload_check_point().await.unwrap();
5074+
services
5075+
.meta_store
5076+
.create_schema("foo".to_string(), false)
5077+
.await
5078+
.unwrap();
5079+
rocks_meta_store.upload_check_point().await.unwrap();
5080+
services
5081+
.meta_store
5082+
.create_schema("bar".to_string(), false)
5083+
.await
5084+
.unwrap();
5085+
rocks_meta_store.upload_check_point().await.unwrap();
5086+
let snapshots = services.meta_store.get_snapshots_list().await.unwrap();
5087+
assert_eq!(snapshots.len(), 3);
5088+
assert!(!snapshots[0].current);
5089+
assert!(!snapshots[1].current);
5090+
assert!(snapshots[2].current);
5091+
5092+
let res = services.meta_store.set_current_snapshot(111).await;
5093+
assert_eq!(
5094+
res.unwrap_err().to_string(),
5095+
"Metastore snapshot with id 111 don't exists".to_string()
5096+
);
5097+
5098+
let res = services
5099+
.meta_store
5100+
.set_current_snapshot(snapshots[2].id)
5101+
.await;
5102+
assert_eq!(
5103+
res.unwrap_err().to_string(),
5104+
format!(
5105+
"Metastore snapshot with id {} is already current snapshot",
5106+
snapshots[2].id
5107+
)
5108+
);
5109+
5110+
let res = services
5111+
.meta_store
5112+
.set_current_snapshot(snapshots[1].id)
5113+
.await;
5114+
assert!(res.is_ok());
5115+
5116+
services
5117+
.meta_store
5118+
.create_schema("bar_after".to_string(), false)
5119+
.await
5120+
.unwrap();
5121+
rocks_meta_store.upload_check_point().await.unwrap();
5122+
rocks_meta_store.run_upload().await.unwrap();
5123+
5124+
let snapshots = services.meta_store.get_snapshots_list().await.unwrap();
5125+
assert_eq!(snapshots.len(), 3);
5126+
assert!(!snapshots[0].current);
5127+
assert!(snapshots[1].current);
5128+
assert!(!snapshots[2].current);
5129+
5130+
services.stop_processing_loops().await.unwrap();
5131+
5132+
Delay::new(Duration::from_millis(1000)).await; // TODO logger init conflict
5133+
fs::remove_dir_all(config.local_dir()).unwrap();
5134+
}
5135+
5136+
{
5137+
let config = Config::test("set_current_snapshot");
5138+
5139+
let services2 = config.configure().await;
5140+
let snapshots = services2.meta_store.get_snapshots_list().await.unwrap();
5141+
assert_eq!(snapshots.len(), 3);
5142+
assert!(!snapshots[0].current);
5143+
assert!(snapshots[1].current);
5144+
assert!(!snapshots[2].current);
5145+
services2
5146+
.meta_store
5147+
.get_schema("foo1".to_string())
5148+
.await
5149+
.unwrap();
5150+
services2
5151+
.meta_store
5152+
.get_schema("foo".to_string())
5153+
.await
5154+
.unwrap();
5155+
assert!(services2
5156+
.meta_store
5157+
.get_schema("bar".to_string())
5158+
.await
5159+
.is_err());
5160+
assert!(services2
5161+
.meta_store
5162+
.get_schema("bar_after".to_string())
5163+
.await
5164+
.is_err());
5165+
5166+
let res = services2
5167+
.meta_store
5168+
.set_current_snapshot(snapshots[2].id)
5169+
.await;
5170+
assert!(res.is_ok());
5171+
Delay::new(Duration::from_millis(1000)).await; // TODO logger init conflict
5172+
fs::remove_dir_all(config.local_dir()).unwrap();
5173+
}
5174+
5175+
{
5176+
let config = Config::test("set_current_snapshot");
5177+
5178+
let services3 = config.configure().await;
5179+
let snapshots = services3.meta_store.get_snapshots_list().await.unwrap();
5180+
assert_eq!(snapshots.len(), 3);
5181+
assert!(!snapshots[0].current);
5182+
assert!(!snapshots[1].current);
5183+
assert!(snapshots[2].current);
5184+
services3
5185+
.meta_store
5186+
.get_schema("foo1".to_string())
5187+
.await
5188+
.unwrap();
5189+
services3
5190+
.meta_store
5191+
.get_schema("foo".to_string())
5192+
.await
5193+
.unwrap();
5194+
services3
5195+
.meta_store
5196+
.get_schema("bar".to_string())
5197+
.await
5198+
.unwrap();
5199+
services3
5200+
.meta_store
5201+
.get_schema("bar_after".to_string())
5202+
.await
5203+
.unwrap();
5204+
fs::remove_dir_all(config.local_dir()).unwrap();
5205+
fs::remove_dir_all(config.remote_dir()).unwrap();
5206+
}
5207+
}
5208+
49365209
#[tokio::test]
49375210
async fn log_replay_ordering() {
49385211
{

0 commit comments

Comments
 (0)