Skip to content

Commit 43bb693

Browse files
authored
feat(cubestore): Size limit for RocksStore logs file (#7201)
1 parent d02077e commit 43bb693

File tree

4 files changed

+246
-3
lines changed

4 files changed

+246
-3
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,36 @@ impl RocksStoreDetails for RocksCacheStoreDetails {
111111
.map_err(|err| CubeError::internal(format!("DB::open error for cachestore: {}", err)))
112112
}
113113

114+
fn open_readonly_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError> {
115+
let rocksdb_config = config.cachestore_rocksdb_config();
116+
117+
let mut opts = Options::default();
118+
opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13));
119+
120+
let block_opts = {
121+
let mut block_opts = BlockBasedOptions::default();
122+
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
123+
block_opts.set_format_version(5);
124+
block_opts.set_checksum_type(rocksdb_config.checksum_type.as_rocksdb_enum());
125+
126+
let cache = Cache::new_lru_cache(rocksdb_config.cache_capacity)?;
127+
block_opts.set_block_cache(&cache);
128+
129+
block_opts
130+
};
131+
132+
opts.set_block_based_table_factory(&block_opts);
133+
opts.set_compression_type(rocksdb_config.compression_type);
134+
opts.set_bottommost_compression_type(rocksdb_config.bottommost_compression_type);
135+
136+
DB::open_for_read_only(&opts, path, false).map_err(|err| {
137+
CubeError::internal(format!(
138+
"DB::open_for_read_only error for cachestore: {}",
139+
err
140+
))
141+
})
142+
}
143+
114144
fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError> {
115145
CacheItemRocksTable::new(table_ref.clone()).migrate()?;
116146
QueueItemRocksTable::new(table_ref.clone()).migrate()?;

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,8 @@ pub trait ConfigObj: DIService {
411411

412412
fn meta_store_log_upload_interval(&self) -> u64;
413413

414+
fn meta_store_log_upload_size_limit(&self) -> u64;
415+
414416
fn gc_loop_interval(&self) -> u64;
415417

416418
fn stale_stream_timeout(&self) -> u64;
@@ -562,6 +564,7 @@ pub struct ConfigObjImpl {
562564
pub in_memory_not_used_timeout: u64,
563565
pub import_job_timeout: u64,
564566
pub meta_store_log_upload_interval: u64,
567+
pub meta_store_log_upload_size_limit: u64,
565568
pub meta_store_snapshot_interval: u64,
566569
pub gc_loop_interval: u64,
567570
pub stale_stream_timeout: u64,
@@ -735,6 +738,10 @@ impl ConfigObj for ConfigObjImpl {
735738
self.meta_store_log_upload_interval
736739
}
737740

741+
fn meta_store_log_upload_size_limit(&self) -> u64 {
742+
self.meta_store_log_upload_size_limit
743+
}
744+
738745
fn gc_loop_interval(&self) -> u64 {
739746
self.gc_loop_interval
740747
}
@@ -1255,6 +1262,12 @@ impl Config {
12551262
in_memory_not_used_timeout: 30,
12561263
import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600),
12571264
meta_store_log_upload_interval: 30,
1265+
meta_store_log_upload_size_limit: env_parse_size(
1266+
"CUBESTORE_METASTORE_UPLOAD_LOG_SIZE_LIMIT",
1267+
100 * 1024 * 1024,
1268+
Some(1024 * 1024 * 1024),
1269+
Some(1 * 1024 * 1024),
1270+
) as u64,
12581271
meta_store_snapshot_interval: 300,
12591272
gc_loop_interval: 60,
12601273
stale_stream_timeout: env_parse("CUBESTORE_STALE_STREAM_TIMEOUT", 600),
@@ -1535,6 +1548,7 @@ impl Config {
15351548
metadata_cache_max_capacity_bytes: 0,
15361549
metadata_cache_time_to_idle_secs: 1_000,
15371550
meta_store_log_upload_interval: 30,
1551+
meta_store_log_upload_size_limit: 100 * 1024 * 1024,
15381552
meta_store_snapshot_interval: 300,
15391553
gc_loop_interval: 60,
15401554
stream_replay_check_interval_secs: 60,

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,6 +1287,35 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
12871287
.map_err(|err| CubeError::internal(format!("DB::open error for metastore: {}", err)))
12881288
}
12891289

1290+
fn open_readonly_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError> {
1291+
let rocksdb_config = config.metastore_rocksdb_config();
1292+
let mut opts = Options::default();
1293+
opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13));
1294+
1295+
let block_opts = {
1296+
let mut block_opts = BlockBasedOptions::default();
1297+
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
1298+
block_opts.set_format_version(5);
1299+
block_opts.set_checksum_type(rocksdb_config.checksum_type.as_rocksdb_enum());
1300+
1301+
let cache = Cache::new_lru_cache(rocksdb_config.cache_capacity)?;
1302+
block_opts.set_block_cache(&cache);
1303+
1304+
block_opts
1305+
};
1306+
1307+
opts.set_block_based_table_factory(&block_opts);
1308+
opts.set_compression_type(rocksdb_config.compression_type);
1309+
opts.set_bottommost_compression_type(rocksdb_config.bottommost_compression_type);
1310+
1311+
DB::open_for_read_only(&opts, path, false).map_err(|err| {
1312+
CubeError::internal(format!(
1313+
"DB::open_for_read_only error for metastore: {}",
1314+
err
1315+
))
1316+
})
1317+
}
1318+
12901319
fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError> {
12911320
SchemaRocksTable::new(table_ref.clone()).migrate()?;
12921321
TableRocksTable::new(table_ref.clone()).migrate()?;

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 173 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,15 @@ pub enum WriteBatchEntry {
520520
Delete { key: Box<[u8]> },
521521
}
522522

523+
impl WriteBatchEntry {
524+
pub fn size(&self) -> usize {
525+
match self {
526+
Self::Put { key, value } => key.len() + value.len(),
527+
Self::Delete { key } => key.len(),
528+
}
529+
}
530+
}
531+
523532
#[derive(Clone, Serialize, Deserialize, Debug)]
524533
pub struct WriteBatchContainer {
525534
entries: Vec<WriteBatchEntry>,
@@ -532,6 +541,10 @@ impl WriteBatchContainer {
532541
}
533542
}
534543

544+
pub fn size(&self) -> usize {
545+
self.entries.iter().fold(0, |acc, i| acc + i.size())
546+
}
547+
535548
pub fn write_batch(&self) -> WriteBatch {
536549
let mut batch = WriteBatch::default();
537550
for entry in self.entries.iter() {
@@ -767,6 +780,8 @@ impl RocksStoreConfig {
767780
pub trait RocksStoreDetails: Send + Sync {
768781
fn open_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError>;
769782

783+
fn open_readonly_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError>;
784+
770785
fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError>;
771786

772787
fn get_name(&self) -> &'static str;
@@ -1044,10 +1059,14 @@ impl RocksStore {
10441059
let mut serializer = WriteBatchContainer::new();
10451060

10461061
let mut seq_numbers = Vec::new();
1062+
let size_limit = self.config.meta_store_log_upload_size_limit() as usize;
10471063
for update in updates.into_iter() {
10481064
let (n, write_batch) = update?;
10491065
seq_numbers.push(n);
10501066
write_batch.iterate(&mut serializer);
1067+
if serializer.size() > size_limit {
1068+
break;
1069+
}
10511070
}
10521071

10531072
(
@@ -1075,10 +1094,7 @@ impl RocksStore {
10751094
+ time::Duration::from_secs(self.config.meta_store_snapshot_interval())
10761095
< SystemTime::now()
10771096
{
1078-
info!("Uploading {} check point", self.details.get_name());
10791097
self.upload_check_point().await?;
1080-
let mut check_seq = self.last_check_seq.write().await;
1081-
*check_seq = last_db_seq;
10821098
}
10831099

10841100
info!(
@@ -1175,6 +1191,7 @@ impl RocksStore {
11751191
}
11761192

11771193
pub async fn upload_check_point(&self) -> Result<(), CubeError> {
1194+
info!("Uploading {} check point", self.details.get_name());
11781195
let upload_stopped = self.snapshots_upload_stopped.lock().await;
11791196
if !*upload_stopped {
11801197
let mut check_point_time = self.last_checkpoint_time.write().await;
@@ -1185,11 +1202,25 @@ impl RocksStore {
11851202
self.prepare_checkpoint(&check_point_time).await?
11861203
};
11871204

1205+
let details = self.details.clone();
1206+
let config = self.config.clone();
1207+
let path_to_move = checkpoint_path.clone();
1208+
let checkpoint_last_seq =
1209+
cube_ext::spawn_blocking(move || -> Result<u64, CubeError> {
1210+
let snap_db = details.open_readonly_db(&path_to_move, &config)?;
1211+
Ok(snap_db.latest_sequence_number())
1212+
})
1213+
.await??;
1214+
11881215
self.metastore_fs
11891216
.upload_checkpoint(remote_path, checkpoint_path)
11901217
.await?;
11911218
let mut snapshot_uploaded = self.snapshot_uploaded.write().await;
11921219
*snapshot_uploaded = true;
1220+
let mut last_uploaded_check_seq = self.last_check_seq.write().await;
1221+
*last_uploaded_check_seq = checkpoint_last_seq;
1222+
let mut last_uploaded_seq = self.last_upload_seq.write().await;
1223+
*last_uploaded_seq = checkpoint_last_seq;
11931224
self.write_completed_notify.notify_waiters();
11941225
}
11951226
Ok(())
@@ -1203,6 +1234,11 @@ impl RocksStore {
12031234
*self.last_check_seq.read().await
12041235
}
12051236

1237+
#[cfg(test)]
1238+
pub fn last_seq(&self) -> u64 {
1239+
self.db.latest_sequence_number()
1240+
}
1241+
12061242
fn get_store_path(&self, checkpoint_time: &SystemTime) -> String {
12071243
format!(
12081244
"{}-{}",
@@ -1380,6 +1416,9 @@ impl RocksStore {
13801416
mod tests {
13811417
use super::*;
13821418
use crate::config::Config;
1419+
use crate::metastore::rocks_table::RocksTable;
1420+
use crate::metastore::schema::SchemaRocksTable;
1421+
use crate::metastore::Schema;
13831422
use crate::metastore::{BaseRocksStoreFs, RocksMetaStoreDetails};
13841423
use crate::remotefs::LocalDirRemoteFs;
13851424
use chrono::Timelike;
@@ -1527,4 +1566,135 @@ mod tests {
15271566

15281567
Ok(())
15291568
}
1569+
1570+
async fn write_test_data(rocks_store: &Arc<RocksStore>, name: String) {
1571+
rocks_store
1572+
.write_operation(move |db_ref, batch_pipe| {
1573+
let table = SchemaRocksTable::new(db_ref.clone());
1574+
let schema = Schema { name };
1575+
Ok(table.insert(schema, batch_pipe)?)
1576+
})
1577+
.await
1578+
.unwrap();
1579+
}
1580+
#[tokio::test]
1581+
async fn test_snapshot_uploads() -> Result<(), CubeError> {
1582+
let config = Config::test("test_snapshots_uploads").update_config(|mut c| {
1583+
c.meta_store_log_upload_size_limit = 300;
1584+
c
1585+
});
1586+
let store_path = env::current_dir()
1587+
.unwrap()
1588+
.join("test_snapshots_uploads-local");
1589+
let remote_store_path = env::current_dir()
1590+
.unwrap()
1591+
.join("test_snapshots_uploads-remote");
1592+
let _ = fs::remove_dir_all(store_path.clone());
1593+
let _ = fs::remove_dir_all(remote_store_path.clone());
1594+
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
1595+
1596+
let details = Arc::new(RocksMetaStoreDetails {});
1597+
1598+
let rocks_store = RocksStore::new(
1599+
store_path.join("metastore").as_path(),
1600+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
1601+
config.config_obj(),
1602+
details.clone(),
1603+
)?;
1604+
1605+
assert_eq!(rocks_store.last_upload_seq().await, 0);
1606+
assert_eq!(rocks_store.last_check_seq().await, 0);
1607+
1608+
write_test_data(&rocks_store, "test".to_string()).await;
1609+
write_test_data(&rocks_store, "test2".to_string()).await;
1610+
1611+
rocks_store.upload_check_point().await.unwrap();
1612+
1613+
let last_seq = rocks_store.last_seq();
1614+
1615+
assert_eq!(rocks_store.last_upload_seq().await, last_seq);
1616+
assert_eq!(rocks_store.last_check_seq().await, last_seq);
1617+
1618+
write_test_data(&rocks_store, "test3".to_string()).await;
1619+
1620+
rocks_store.run_upload().await.unwrap();
1621+
1622+
assert_eq!(
1623+
rocks_store.last_upload_seq().await,
1624+
rocks_store.last_seq() - 1
1625+
);
1626+
assert_eq!(rocks_store.last_check_seq().await, last_seq);
1627+
1628+
write_test_data(&rocks_store, "test4".to_string()).await;
1629+
1630+
rocks_store.run_upload().await.unwrap();
1631+
1632+
assert_eq!(
1633+
rocks_store.last_upload_seq().await,
1634+
rocks_store.last_seq() - 1
1635+
);
1636+
assert_eq!(rocks_store.last_check_seq().await, last_seq);
1637+
1638+
let last_upl = rocks_store.last_seq();
1639+
1640+
write_test_data(&rocks_store, "a".repeat(150)).await;
1641+
write_test_data(&rocks_store, "b".repeat(150)).await;
1642+
1643+
rocks_store.run_upload().await.unwrap();
1644+
1645+
assert_eq!(rocks_store.last_upload_seq().await, last_upl + 2); // +1 is seq number write and +1 first insert batch
1646+
assert!(rocks_store.last_upload_seq().await < rocks_store.last_seq() - 1);
1647+
assert_eq!(rocks_store.last_check_seq().await, last_seq);
1648+
1649+
rocks_store.run_upload().await.unwrap();
1650+
assert_eq!(
1651+
rocks_store.last_upload_seq().await,
1652+
rocks_store.last_seq() - 1
1653+
);
1654+
assert_eq!(rocks_store.last_check_seq().await, last_seq);
1655+
1656+
write_test_data(&rocks_store, "c".repeat(150)).await;
1657+
write_test_data(&rocks_store, "e".repeat(150)).await;
1658+
1659+
rocks_store.run_upload().await.unwrap();
1660+
assert_eq!(
1661+
rocks_store.last_upload_seq().await,
1662+
rocks_store.last_seq() - 4
1663+
);
1664+
assert_eq!(rocks_store.last_check_seq().await, last_seq);
1665+
1666+
let _ = fs::remove_dir_all(store_path.clone());
1667+
drop(rocks_store);
1668+
1669+
let rocks_fs = BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj());
1670+
let path = store_path.join("metastore").to_string_lossy().to_string();
1671+
let rocks_store = rocks_fs
1672+
.load_from_remote(&path, config.config_obj(), details)
1673+
.await
1674+
.unwrap();
1675+
let all_schemas = rocks_store
1676+
.read_operation_out_of_queue(move |db_ref| SchemaRocksTable::new(db_ref).all_rows())
1677+
.await
1678+
.unwrap();
1679+
let expected = vec![
1680+
"test".to_string(),
1681+
"test2".to_string(),
1682+
"test3".to_string(),
1683+
"test4".to_string(),
1684+
"a".repeat(150),
1685+
"b".repeat(150),
1686+
"c".repeat(150),
1687+
];
1688+
1689+
assert_eq!(expected.len(), all_schemas.len());
1690+
1691+
for (exp, row) in expected.into_iter().zip(all_schemas.into_iter()) {
1692+
assert_eq!(&exp, row.get_row().get_name());
1693+
}
1694+
1695+
let _ = fs::remove_dir_all(store_path.clone());
1696+
let _ = fs::remove_dir_all(remote_store_path.clone());
1697+
1698+
Ok(())
1699+
}
15301700
}

0 commit comments

Comments
 (0)