Skip to content

Commit 0ec1b91

Browse files
committed
shard DB
1 parent 3897cf4 commit 0ec1b91

File tree

2 files changed

+125
-70
lines changed

2 files changed

+125
-70
lines changed

core/src/common/engines/parity_backend.rs

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use parking_lot::Mutex;
77
use ruc::*;
88
use std::{
99
borrow::Cow,
10+
fs,
1011
ops::{Bound, RangeBounds},
12+
path::Path,
1113
sync::{
1214
LazyLock,
1315
atomic::{AtomicUsize, Ordering},
@@ -18,22 +20,32 @@ use std::{
1820
// The last COLID is preserved for the meta storage,
1921
// so the max value should be `u8::MAX - 1`
2022
const DATA_SET_NUM: u8 = 2;
23+
const SHARD_CNT: usize = 16;
2124

2225
const META_COLID: u8 = DATA_SET_NUM;
2326

2427
const META_KEY_MAX_KEYLEN: [u8; 1] = [u8::MAX];
2528
const META_KEY_PREFIX_ALLOCATOR: [u8; 1] = [u8::MIN];
2629
const META_KEY_NULL: [u8; 0] = [0; 0];
2730

28-
static HDR: LazyLock<DB> = LazyLock::new(|| paritydb_open().unwrap());
29-
3031
pub struct ParityEngine {
3132
hdr: &'static DB,
33+
shards: Vec<&'static DB>,
3234
prefix_allocator: PreAllocator,
3335
max_keylen: AtomicUsize,
3436
}
3537

3638
impl ParityEngine {
39+
#[inline(always)]
40+
fn get_shard_idx(&self, prefix: PreBytes) -> usize {
41+
(prefix[0] as usize) % SHARD_CNT
42+
}
43+
44+
#[inline(always)]
45+
fn get_db(&self, prefix: PreBytes) -> &'static DB {
46+
self.shards[self.get_shard_idx(prefix)]
47+
}
48+
3749
#[inline(always)]
3850
fn get_max_keylen(&self) -> usize {
3951
self.max_keylen.load(Ordering::Relaxed)
@@ -70,7 +82,22 @@ impl ParityEngine {
7082

7183
impl Engine for ParityEngine {
7284
fn new() -> Result<Self> {
73-
let hdr = &HDR;
85+
let base_dir = vsdb_get_base_dir();
86+
// avoid setting again on an opened DB
87+
omit!(vsdb_set_base_dir(&base_dir));
88+
89+
let mut shards = Vec::with_capacity(SHARD_CNT);
90+
91+
// Ensure base dir exists
92+
fs::create_dir_all(&base_dir).c(d!())?;
93+
94+
for i in 0..SHARD_CNT {
95+
let dir = base_dir.join(format!("shard_{}", i));
96+
let db = paritydb_open_shard(&dir)?;
97+
shards.push(Box::leak(Box::new(db)) as &'static DB);
98+
}
99+
100+
let hdr = shards[0];
74101

75102
let (prefix_allocator, initial_value) = PreAllocator::init();
76103

@@ -103,6 +130,7 @@ impl Engine for ParityEngine {
103130

104131
Ok(ParityEngine {
105132
hdr,
133+
shards,
106134
prefix_allocator,
107135
// length of the raw key, exclude the hdr prefix
108136
max_keylen,
@@ -143,12 +171,13 @@ impl Engine for ParityEngine {
143171
fn flush(&self) {}
144172

145173
fn iter(&self, hdr_prefix: PreBytes) -> ParityIter {
174+
let db = self.get_db(hdr_prefix);
146175
let area_idx = self.area_idx(hdr_prefix);
147176

148-
let mut inner = self.hdr.iter(area_idx as u8).unwrap();
177+
let mut inner = db.iter(area_idx as u8).unwrap();
149178
inner.seek(&hdr_prefix).unwrap();
150179

151-
let mut inner_rev = self.hdr.iter(area_idx as u8).unwrap();
180+
let mut inner_rev = db.iter(area_idx as u8).unwrap();
152181
inner_rev
153182
.seek(&self.get_upper_bound_value(hdr_prefix))
154183
.unwrap();
@@ -166,9 +195,10 @@ impl Engine for ParityEngine {
166195
hdr_prefix: PreBytes,
167196
bounds: R,
168197
) -> ParityIter {
198+
let db = self.get_db(hdr_prefix);
169199
let area_idx = self.area_idx(hdr_prefix);
170200

171-
let mut inner = self.hdr.iter(area_idx as u8).unwrap();
201+
let mut inner = db.iter(area_idx as u8).unwrap();
172202
let mut b_lo = hdr_prefix.to_vec();
173203
let l = match bounds.start_bound() {
174204
Bound::Included(lo) => {
@@ -188,7 +218,7 @@ impl Engine for ParityEngine {
188218
}
189219
};
190220

191-
let mut inner_rev = self.hdr.iter(area_idx as u8).unwrap();
221+
let mut inner_rev = db.iter(area_idx as u8).unwrap();
192222
let mut b_hi = hdr_prefix.to_vec();
193223
let h = match bounds.end_bound() {
194224
Bound::Included(hi) => {
@@ -228,11 +258,12 @@ impl Engine for ParityEngine {
228258
}
229259

230260
fn get(&self, hdr_prefix: PreBytes, key: &[u8]) -> Option<RawValue> {
261+
let db = self.get_db(hdr_prefix);
231262
let area_idx = self.area_idx(hdr_prefix);
232263

233264
let mut k = hdr_prefix.to_vec();
234265
k.extend_from_slice(key);
235-
self.hdr.get(area_idx as u8, &k).unwrap()
266+
db.get(area_idx as u8, &k).unwrap()
236267
}
237268

238269
fn insert(
@@ -241,6 +272,7 @@ impl Engine for ParityEngine {
241272
key: &[u8],
242273
value: &[u8],
243274
) -> Option<RawValue> {
275+
let db = self.get_db(hdr_prefix);
244276
let area_idx = self.area_idx(hdr_prefix);
245277

246278
let mut k = hdr_prefix.to_vec();
@@ -250,20 +282,20 @@ impl Engine for ParityEngine {
250282
self.set_max_key_len(key.len());
251283
}
252284

253-
let old_v = self.hdr.get(area_idx as u8, &k).unwrap();
254-
self.hdr
255-
.commit([(area_idx as u8, k, Some(value.to_vec()))])
285+
let old_v = db.get(area_idx as u8, &k).unwrap();
286+
db.commit([(area_idx as u8, k, Some(value.to_vec()))])
256287
.unwrap();
257288
old_v
258289
}
259290

260291
fn remove(&self, hdr_prefix: PreBytes, key: &[u8]) -> Option<RawValue> {
292+
let db = self.get_db(hdr_prefix);
261293
let area_idx = self.area_idx(hdr_prefix);
262294

263295
let mut k = hdr_prefix.to_vec();
264296
k.extend_from_slice(key);
265-
let old_v = self.hdr.get(area_idx as u8, &k).unwrap();
266-
self.hdr.commit([(area_idx as u8, k, None)]).unwrap();
297+
let old_v = db.get(area_idx as u8, &k).unwrap();
298+
db.commit([(area_idx as u8, k, None)]).unwrap();
267299
old_v
268300
}
269301

@@ -384,13 +416,8 @@ impl PreAllocator {
384416
// }
385417
}
386418

387-
fn paritydb_open() -> Result<DB> {
388-
let dir = vsdb_get_base_dir();
389-
390-
// avoid setting again on an opened DB
391-
omit!(vsdb_set_base_dir(&dir));
392-
393-
let mut cfg = Options::with_columns(&dir, 1 + DATA_SET_NUM);
419+
fn paritydb_open_shard(dir: &Path) -> Result<DB> {
420+
let mut cfg = Options::with_columns(dir, 1 + DATA_SET_NUM);
394421
cfg.columns.iter_mut().for_each(|c| {
395422
c.btree_index = true;
396423
});

0 commit comments

Comments
 (0)