Skip to content

Commit ac4ffa3

Browse files
authored
Add batched version MultiGet API (tikv#633)
1 parent da4b714 commit ac4ffa3

File tree

2 files changed

+121
-0
lines changed

2 files changed

+121
-0
lines changed

src/db.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,17 @@ pub trait DBAccess {
177177
K: AsRef<[u8]>,
178178
I: IntoIterator<Item = (&'b W, K)>,
179179
W: AsColumnFamilyRef + 'b;
180+
181+
fn batched_multi_get_cf_opt<K, I>(
182+
&self,
183+
cf: &impl AsColumnFamilyRef,
184+
keys: I,
185+
sorted_input: bool,
186+
readopts: &ReadOptions,
187+
) -> Vec<Result<Option<DBPinnableSlice>, Error>>
188+
where
189+
K: AsRef<[u8]>,
190+
I: IntoIterator<Item = K>;
180191
}
181192

182193
impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
@@ -242,6 +253,20 @@ impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
242253
{
243254
self.multi_get_cf_opt(keys_cf, readopts)
244255
}
256+
257+
fn batched_multi_get_cf_opt<K, I>(
258+
&self,
259+
cf: &impl AsColumnFamilyRef,
260+
keys: I,
261+
sorted_input: bool,
262+
readopts: &ReadOptions,
263+
) -> Vec<Result<Option<DBPinnableSlice>, Error>>
264+
where
265+
K: AsRef<[u8]>,
266+
I: IntoIterator<Item = K>,
267+
{
268+
self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts)
269+
}
245270
}
246271

247272
/// A type alias to DB instance type with the single-threaded column family
@@ -1022,6 +1047,75 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
10221047
convert_values(values, values_sizes, errors)
10231048
}
10241049

1050+
/// Return the values associated with the given keys and the specified column family
1051+
/// where internally the read requests are processed in batch if block-based table
1052+
/// SST format is used. It is a more optimized version of multi_get_cf.
1053+
pub fn batched_multi_get_cf<K, I>(
1054+
&self,
1055+
cf: &impl AsColumnFamilyRef,
1056+
keys: I,
1057+
sorted_input: bool,
1058+
) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1059+
where
1060+
K: AsRef<[u8]>,
1061+
I: IntoIterator<Item = K>,
1062+
{
1063+
self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1064+
}
1065+
1066+
/// Return the values associated with the given keys and the specified column family
1067+
/// where internally the read requests are processed in batch if block-based table
1068+
/// SST format is used. It is a more optimized version of multi_get_cf_opt.
1069+
pub fn batched_multi_get_cf_opt<K, I>(
1070+
&self,
1071+
cf: &impl AsColumnFamilyRef,
1072+
keys: I,
1073+
sorted_input: bool,
1074+
readopts: &ReadOptions,
1075+
) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1076+
where
1077+
K: AsRef<[u8]>,
1078+
I: IntoIterator<Item = K>,
1079+
{
1080+
let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1081+
.into_iter()
1082+
.map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1083+
.unzip();
1084+
let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1085+
1086+
let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1087+
let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1088+
1089+
unsafe {
1090+
ffi::rocksdb_batched_multi_get_cf(
1091+
self.inner,
1092+
readopts.inner,
1093+
cf.inner(),
1094+
ptr_keys.len(),
1095+
ptr_keys.as_ptr(),
1096+
keys_sizes.as_ptr(),
1097+
pinned_values.as_mut_ptr(),
1098+
errors.as_mut_ptr(),
1099+
sorted_input,
1100+
);
1101+
pinned_values
1102+
.into_iter()
1103+
.zip(errors.into_iter())
1104+
.map(|(v, e)| {
1105+
if e.is_null() {
1106+
if v.is_null() {
1107+
Ok(None)
1108+
} else {
1109+
Ok(Some(DBPinnableSlice::from_c(v)))
1110+
}
1111+
} else {
1112+
Err(Error::new(crate::ffi_util::error_message(e)))
1113+
}
1114+
})
1115+
.collect()
1116+
}
1117+
}
1118+
10251119
/// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
10261120
/// `true`. This function uses default `ReadOptions`.
10271121
pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {

tests/test_db.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,33 @@ fn multi_get_cf() {
12441244
}
12451245
}
12461246

1247+
#[test]
1248+
fn batched_multi_get_cf() {
1249+
let path = DBPath::new("_rust_rocksdb_batched_multi_get_cf");
1250+
1251+
{
1252+
let mut opts = Options::default();
1253+
opts.create_if_missing(true);
1254+
opts.create_missing_column_families(true);
1255+
let db = DB::open_cf(&opts, &path, &["cf0"]).unwrap();
1256+
1257+
let cf = db.cf_handle("cf0").unwrap();
1258+
db.put_cf(&cf, b"k1", b"v1").unwrap();
1259+
db.put_cf(&cf, b"k2", b"v2").unwrap();
1260+
1261+
let values = db
1262+
.batched_multi_get_cf(&cf, vec![b"k0", b"k1", b"k2"], true) // sorted_input
1263+
.into_iter()
1264+
.map(Result::unwrap)
1265+
.collect::<Vec<_>>();
1266+
assert_eq!(3, values.len());
1267+
assert!(values[0].is_none());
1268+
assert!(values[1].is_some());
1269+
assert_eq!(&(values[1].as_ref().unwrap())[0..2], b"v1");
1270+
assert_eq!(&(values[2].as_ref().unwrap())[0..2], b"v2");
1271+
}
1272+
}
1273+
12471274
#[test]
12481275
fn key_may_exist() {
12491276
let path = DBPath::new("_rust_key_may_exist");

0 commit comments

Comments
 (0)