Skip to content

Commit 92bbef3

Browse files
authored
feat(cubestore): Upgrade RocksDB to 7.9.2 (0.20) (#6221)
1 parent d8dd8a6 commit 92bbef3

File tree

7 files changed

+81
-76
lines changed

7 files changed

+81
-76
lines changed

rust/cubestore/Cargo.lock

Lines changed: 32 additions & 50 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore-sql-tests/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ tar = "0.4.38"
5353

5454
[dev-dependencies]
5555
criterion = { version = "0.4.0", features = ["async_tokio", "html_reports"] }
56-
rocksdb = { version = "0.16.0", default-features = false, features = ["bzip2"] }
56+
rocksdb = { version = "0.20.1", default-features = false, features = ["bzip2"] }
5757

5858
[[bench]]
5959
name = "in_process"

rust/cubestore/cubestore/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ simple_logger = "1.7.0"
4141
async-trait = "0.1.36"
4242
actix-rt = "2.7.0"
4343
regex = "1.3.9"
44-
rocksdb = { version = "0.16.0", default-features = false, features = ["bzip2"] }
44+
rocksdb = { version = "0.20.1", default-features = false, features = ["bzip2"] }
4545
uuid = { version = "0.8", features = ["serde", "v4"] }
4646
num = "0.3.0"
4747
enum_primitive = "0.1.1"

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::CubeError;
2424
use async_trait::async_trait;
2525

2626
use futures_timer::Delay;
27-
use rocksdb::{Options, DB};
27+
use rocksdb::{BlockBasedOptions, Options, DB};
2828

2929
use crate::cachestore::compaction::CompactionPreloadedState;
3030
use crate::cachestore::listener::RocksCacheStoreListener;
@@ -81,6 +81,15 @@ impl RocksStoreDetails for RocksCacheStoreDetails {
8181
// Disable automatic compaction before migration, will be enabled later in after_migration
8282
opts.set_disable_auto_compactions(true);
8383

84+
let mut block_opts = BlockBasedOptions::default();
85+
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
86+
// RocksDB 7.x uses a new format = 5, but our previous version if RocksDB has some issues with it
87+
// Let force the usage of old format for 1 month to save an ability to revert releases
88+
// todo(ovr): Migrate to 5
89+
block_opts.set_format_version(2);
90+
91+
opts.set_block_based_table_factory(&block_opts);
92+
8493
DB::open(&opts, path)
8594
.map_err(|err| CubeError::internal(format!("DB::open error for cachestore: {}", err)))
8695
}

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub use rocks_table::*;
2121
use crate::cluster::node_name_by_partition;
2222
use async_trait::async_trait;
2323
use log::info;
24-
use rocksdb::{MergeOperands, Options, DB};
24+
use rocksdb::{BlockBasedOptions, Env, MergeOperands, Options, DB};
2525
use serde::{Deserialize, Serialize};
2626
use std::hash::Hash;
2727
use std::{env, io::Cursor, sync::Arc};
@@ -67,7 +67,7 @@ use parquet::basic::{ConvertedType, Repetition};
6767
use parquet::{basic::Type, schema::types};
6868
use partition::{PartitionRocksIndex, PartitionRocksTable};
6969
use regex::Regex;
70-
use rocksdb::backup::BackupEngineOptions;
70+
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
7171

7272
use schema::{SchemaRocksIndex, SchemaRocksTable};
7373
use smallvec::alloc::fmt::Formatter;
@@ -1127,15 +1127,17 @@ pub enum MetaStoreEvent {
11271127
fn meta_store_merge(
11281128
_new_key: &[u8],
11291129
existing_val: Option<&[u8]>,
1130-
operands: &mut MergeOperands,
1130+
operands: &MergeOperands,
11311131
) -> Option<Vec<u8>> {
11321132
let mut result: Vec<u8> = Vec::with_capacity(8);
11331133
let mut counter = existing_val
11341134
.map(|v| Cursor::new(v).read_u64::<BigEndian>().unwrap())
11351135
.unwrap_or(0);
1136+
11361137
for op in operands {
11371138
counter += Cursor::new(op).read_u64::<BigEndian>().unwrap()
11381139
}
1140+
11391141
result.write_u64::<BigEndian>(counter).unwrap();
11401142
Some(result)
11411143
}
@@ -1149,6 +1151,15 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
11491151
opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13));
11501152
opts.set_merge_operator_associative("meta_store merge", meta_store_merge);
11511153

1154+
let mut block_opts = BlockBasedOptions::default();
1155+
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
1156+
// RocksDB 7.x uses a new format = 5, but our previous version if RocksDB has some issues with it
1157+
// Let force the usage of old format for 1 month to save an ability to revert releases
1158+
// todo(ovr): Migrate to 5
1159+
block_opts.set_format_version(2);
1160+
1161+
opts.set_block_based_table_factory(&block_opts);
1162+
11521163
DB::open(&opts, path)
11531164
.map_err(|err| CubeError::internal(format!("DB::open error for metastore: {}", err)))
11541165
}
@@ -3895,8 +3906,8 @@ impl MetaStore for RocksMetaStore {
38953906

38963907
async fn debug_dump(&self, out_path: String) -> Result<(), CubeError> {
38973908
self.read_operation(|db| {
3898-
let mut e =
3899-
rocksdb::backup::BackupEngine::open(&BackupEngineOptions::default(), out_path)?;
3909+
let opts = BackupEngineOptions::new(out_path)?;
3910+
let mut e = BackupEngine::open(&opts, &Env::new()?)?;
39003911
Ok(e.create_new_backup_flush(db.db, true)?)
39013912
})
39023913
.await
@@ -4805,7 +4816,8 @@ mod tests {
48054816
let iterator = meta_store.store.db.iterator(IteratorMode::Start);
48064817

48074818
println!("Keys in db");
4808-
for (key, _) in iterator {
4819+
for kv_res in iterator {
4820+
let (key, _) = kv_res.unwrap();
48094821
println!("Key {:?}", RowKey::from_bytes(&key));
48104822
}
48114823

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
1010
use datafusion::cube_ext;
1111

1212
use log::{info, trace};
13-
use rocksdb::backup::BackupEngineOptions;
13+
use rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions};
1414
use rocksdb::checkpoint::Checkpoint;
15-
use rocksdb::{Snapshot, WriteBatch, WriteBatchIterator, DB};
15+
use rocksdb::{Env, Snapshot, WriteBatch, WriteBatchIterator, DB};
1616
use serde::{Deserialize, Serialize};
1717
use std::collections::HashMap;
1818
use std::fmt::Debug;
@@ -610,13 +610,9 @@ impl RocksStore {
610610
details: Arc<dyn RocksStoreDetails>,
611611
) -> Result<Arc<Self>, CubeError> {
612612
if !fs::metadata(path).await.is_ok() {
613-
let mut backup =
614-
rocksdb::backup::BackupEngine::open(&BackupEngineOptions::default(), dump_path)?;
615-
backup.restore_from_latest_backup(
616-
&path,
617-
&path,
618-
&rocksdb::backup::RestoreOptions::default(),
619-
)?;
613+
let opts = BackupEngineOptions::new(dump_path)?;
614+
let mut backup = BackupEngine::open(&opts, &Env::new()?)?;
615+
backup.restore_from_latest_backup(&path, &path, &RestoreOptions::default())?;
620616
} else {
621617
info!(
622618
"Using existing {} in {}",
@@ -751,11 +747,12 @@ impl RocksStore {
751747
let mut serializer = WriteBatchContainer::new();
752748

753749
let mut seq_numbers = Vec::new();
754-
755-
updates.into_iter().for_each(|(n, write_batch)| {
750+
for update in updates.into_iter() {
751+
let (n, write_batch) = update?;
756752
seq_numbers.push(n);
757753
write_batch.iterate(&mut serializer);
758-
});
754+
}
755+
759756
(
760757
serializer,
761758
seq_numbers.iter().min().map(|v| *v),

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ where
246246

247247
fn next(&mut self) -> Option<Self::Item> {
248248
let option = self.iter.next();
249-
if let Some((key, value)) = option {
249+
if let Some(res) = option {
250+
let (key, value) = res.unwrap();
250251
if let RowKey::Table(table_id, row_id) = RowKey::from_bytes(&key) {
251252
if table_id != self.table_id {
252253
return None;
@@ -278,7 +279,8 @@ where
278279
fn next(&mut self) -> Option<Self::Item> {
279280
loop {
280281
let option = self.iter.next();
281-
if let Some((key, value)) = option {
282+
if let Some(res) = option {
283+
let (key, value) = res.unwrap();
282284
if let RowKey::SecondaryIndex(_, secondary_index_hash, row_id) =
283285
RowKey::from_bytes(&key)
284286
{
@@ -923,7 +925,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
923925
);
924926
let index = self.get_index_by_id(secondary_id);
925927

926-
for (key, value) in iter {
928+
for kv_res in iter {
929+
let (key, value) = kv_res?;
927930
if let RowKey::SecondaryIndex(_, secondary_index_hash, row_id) =
928931
RowKey::from_bytes(&key)
929932
{
@@ -972,7 +975,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
972975
Direction::Forward,
973976
));
974977

975-
for (key, _) in iter {
978+
for kv_res in iter {
979+
let (key, _) = kv_res?;
976980
let row_key = RowKey::from_bytes(&key);
977981
if let RowKey::Table(row_table_id, _) = row_key {
978982
if row_table_id == table_id {
@@ -1000,7 +1004,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
10001004
Direction::Forward,
10011005
));
10021006

1003-
for (key, _) in iter {
1007+
for kv_res in iter {
1008+
let (key, _) = kv_res?;
10041009
let row_key = RowKey::from_bytes(&key);
10051010
if let RowKey::SecondaryIndex(index_id, _, _) = row_key {
10061011
if index_id == Self::index_id(secondary_id) {

0 commit comments

Comments
 (0)