Skip to content

Commit ff96040

Browse files
authored
refactor(meta-service): replace Vec with BTreeMap in ImmutableLevels (#18608)
* refactor(meta-service): replace Vec with BTreeMap in ImmutableLevels Replace `Vec<Immutable>` with `BTreeMap<LevelIndex, Immutable>` in `ImmutableLevels` to improve level management and ordering guarantees. Key changes: - Use `LevelIndex` as map key for O(1) lookups and automatic ordering - Add level index validation in insert operations - Update compaction logic to work with indexed levels - Rename methods for clarity: `new` -> `new_form_iter`, `push` -> `insert` - Add `remove_levels_upto` method for efficient level cleanup - Update all tests and dependent code * chore: fix export bug
1 parent 8718060 commit ff96040

File tree

9 files changed

+152
-66
lines changed

9 files changed

+152
-66
lines changed

src/meta/raft-store/src/leveled_store/immutable_levels.rs

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::io;
1617
use std::ops::RangeBounds;
1718

@@ -22,52 +23,82 @@ use seq_marked::SeqMarked;
2223

2324
use crate::leveled_store::immutable::Immutable;
2425
use crate::leveled_store::level::Level;
26+
use crate::leveled_store::level_index::LevelIndex;
2527
use crate::leveled_store::map_api::KVResultStream;
2628
use crate::leveled_store::map_api::MapKey;
2729

2830
/// A readonly leveled map that owns the data.
2931
#[derive(Debug, Default, Clone)]
3032
pub struct ImmutableLevels {
31-
/// From oldest to newest, i.e., levels[0] is the oldest
32-
levels: Vec<Immutable>,
33+
/// From oldest to newest, i.e., first is the oldest
34+
immutables: BTreeMap<LevelIndex, Immutable>,
3335
}
3436

3537
impl ImmutableLevels {
36-
pub(crate) fn new(levels: impl IntoIterator<Item = Immutable>) -> Self {
38+
pub(crate) fn new_form_iter(immutables: impl IntoIterator<Item = Immutable>) -> Self {
3739
Self {
38-
levels: levels.into_iter().collect(),
40+
immutables: immutables
41+
.into_iter()
42+
.map(|immu| (*immu.level_index(), immu))
43+
.collect(),
3944
}
4045
}
4146

47+
pub(crate) fn indexes(&self) -> impl Iterator<Item = LevelIndex> + use<'_> {
48+
self.immutables.keys().cloned()
49+
}
50+
4251
/// Return an iterator of all Arc of levels from newest to oldest.
4352
pub(crate) fn iter_immutable_levels(&self) -> impl Iterator<Item = &Immutable> {
44-
self.levels.iter().rev()
53+
self.immutables.values().rev()
4554
}
4655

4756
/// Return an iterator of all levels from newest to oldest.
4857
pub(crate) fn iter_levels(&self) -> impl Iterator<Item = &Level> {
49-
self.levels.iter().map(|x| x.as_ref()).rev()
58+
self.immutables.values().map(|x| x.as_ref()).rev()
5059
}
5160

5261
pub(crate) fn newest(&self) -> Option<&Immutable> {
53-
self.levels.last()
62+
self.immutables.values().next_back()
5463
}
5564

56-
pub(crate) fn push(&mut self, level: Immutable) {
57-
self.levels.push(level);
65+
pub(crate) fn newest_level_index(&self) -> Option<LevelIndex> {
66+
self.newest().map(|x| *x.level_index())
5867
}
5968

60-
pub(crate) fn len(&self) -> usize {
61-
self.levels.len()
69+
pub(crate) fn insert(&mut self, level: Immutable) {
70+
let key = *level.level_index();
71+
72+
let last_key = self.newest_level_index().unwrap_or_default();
73+
74+
// The newly added must have greater data index or there are no data added.
75+
assert!(
76+
key > last_key || (key == LevelIndex::default() && key == last_key),
77+
"new level to insert {:?} must have greater index than the newest level {:?}",
78+
key,
79+
last_key
80+
);
81+
82+
self.immutables.insert(key, level);
6283
}
6384

64-
pub(crate) fn levels(&self) -> &Vec<Immutable> {
65-
&self.levels
85+
/// Remove all levels up to and including the given level index.
86+
pub(crate) fn remove_levels_upto(&mut self, level_index: LevelIndex) {
87+
assert!(
88+
self.immutables.contains_key(&level_index),
89+
"level_index to remove {:?} must exist",
90+
level_index
91+
);
92+
93+
let mut left = self.immutables.split_off(&level_index);
94+
// split_off() also returns the given key, remove it.
95+
left.pop_first();
96+
self.immutables = left;
6697
}
6798

6899
#[allow(dead_code)]
69-
pub(crate) fn levels_mut(&mut self) -> &mut Vec<Immutable> {
70-
&mut self.levels
100+
pub(crate) fn levels_mut(&mut self) -> &mut BTreeMap<LevelIndex, Immutable> {
101+
&mut self.immutables
71102
}
72103
}
73104

@@ -89,3 +120,28 @@ where
89120
compacted_range::<_, _, Level, _, Level>(range, None, levels, []).await
90121
}
91122
}
123+
124+
#[cfg(test)]
125+
mod tests {
126+
use std::sync::Arc;
127+
128+
use crate::leveled_store::immutable::Immutable;
129+
use crate::leveled_store::immutable_levels::ImmutableLevels;
130+
use crate::leveled_store::level::Level;
131+
132+
#[test]
133+
fn test_remove_levels_upto() {
134+
let mut immutables = ImmutableLevels::new_form_iter(vec![
135+
Immutable::new(Arc::new(Level::default())),
136+
Immutable::new(Arc::new(Level::default())),
137+
Immutable::new(Arc::new(Level::default())),
138+
]);
139+
140+
let index = immutables.indexes().collect::<Vec<_>>()[1];
141+
let left_index = immutables.indexes().collect::<Vec<_>>()[2];
142+
143+
immutables.remove_levels_upto(index);
144+
145+
assert_eq!(immutables.indexes().collect::<Vec<_>>(), vec![left_index]);
146+
}
147+
}

src/meta/raft-store/src/leveled_store/level_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
/// Represents a unique identifier for a level.
1616
///
1717
/// The magnitude of the index indicates the relative freshness of the level.
18-
#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd, Ord)]
18+
#[derive(Clone, Debug, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
1919
pub struct LevelIndex {
2020
/// The internal sequence number of the level.
2121
internal_seq: u64,

src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use crate::utils::add_cooperative_yielding;
4141
/// The data to compact.
4242
///
4343
/// Including several in-memory immutable levels and an optional persisted db.
44+
#[derive(Debug)]
4445
pub(crate) struct CompactingData {
4546
pub(crate) immutable_levels: Arc<ImmutableLevels>,
4647
pub(crate) persisted: Option<Arc<DB>>,
@@ -81,7 +82,10 @@ impl CompactingData {
8182
let table = mvcc::Table::from_stream(strm).await?;
8283
data.replace_kv(table);
8384

84-
self.immutable_levels = Arc::new(ImmutableLevels::new([Immutable::new_from_level(data)]));
85+
self.immutable_levels =
86+
Arc::new(ImmutableLevels::new_form_iter([Immutable::new_from_level(
87+
data,
88+
)]));
8589
Ok(())
8690
}
8791

src/meta/raft-store/src/leveled_store/leveled_map/compactor.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,16 @@ use crate::leveled_store::leveled_map::compactor_acquirer::CompactorPermit;
2828
/// Compactor is responsible for compacting the immutable levels and db.
2929
///
3030
/// Only one Compactor can be running at a time.
31+
#[derive(Debug)]
3132
pub struct Compactor {
3233
/// Acquired permit for this compactor.
3334
///
3435
/// This is used to ensure that only one compactor can run at a time.
3536
pub(crate) _permit: CompactorPermit,
3637

3738
pub(crate) compacting_data: CompactingData,
38-
// pub(crate) immutable_levels: Arc<ImmutableLevels>,
39-
// pub(crate) persisted: Option<Arc<DB>>,
4039
/// Remember the newest level included in this compactor.
41-
pub(super) since: Option<LevelIndex>,
40+
pub(super) upto: Option<LevelIndex>,
4241
}
4342

4443
impl Compactor {

src/meta/raft-store/src/leveled_store/leveled_map/compactor_acquirer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl CompactorAcquirer {
3535
}
3636
}
3737

38+
#[derive(Debug)]
3839
pub struct CompactorPermit {
3940
_permit: OwnedSemaphorePermit,
4041
}

src/meta/raft-store/src/leveled_store/leveled_map/map_api_impl.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ where
3838
K: MapKeyDecode,
3939
SeqMarked<K::V>: ValueConvert<SeqMarked>,
4040
Level: GetTable<K, K::V>,
41+
<K as MapKey>::V: fmt::Debug,
4142
for<'a> MapView<'a>: MapApiRO<K>,
4243
{
4344
async fn get(&self, key: &K) -> Result<SeqMarked<K::V>, io::Error> {
@@ -58,6 +59,7 @@ where
5859
K: MapKeyDecode,
5960
SeqMarked<K::V>: ValueConvert<SeqMarked>,
6061
Level: GetTable<K, K::V>,
62+
<K as MapKey>::V: fmt::Debug,
6163
for<'a> MapView<'a>: MapApiRO<K>,
6264
{
6365
async fn get(&self, key: &K) -> Result<SeqMarked<K::V>, io::Error> {

src/meta/raft-store/src/leveled_store/leveled_map/mod.rs

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::fmt;
1617
use std::io;
1718
use std::ops::RangeBounds;
1819
use std::sync::Arc;
@@ -185,6 +186,7 @@ impl LeveledMapData {
185186
K: MapKeyDecode,
186187
SeqMarked<K::V>: ValueConvert<SeqMarked>,
187188
Level: GetTable<K, K::V>,
189+
<K as MapKey>::V: fmt::Debug,
188190
{
189191
// TODO: test it
190192

@@ -207,18 +209,32 @@ impl LeveledMapData {
207209
);
208210
}
209211

210-
let strm = futures::stream::iter(vec).map(Ok).boxed();
212+
let strm = futures::stream::iter(vec)
213+
// .map(|x| {
214+
// debug!("range-item from writable: {:?}", x);
215+
// x
216+
// })
217+
.map(Ok)
218+
.boxed();
211219
kmerge = kmerge.merge(strm);
212220

213221
// Immutable levels
214222

215223
let immutable_levels = self.immutable_levels();
216224

217225
for level in immutable_levels.iter_levels() {
226+
let index = level.with_sys_data(|s| s.curr_seq());
227+
let _ = index;
218228
let it = level.get_table().range(range.clone(), upto_seq);
219229

220230
let vec = it.map(|(k, v)| (k.clone(), v.cloned())).collect::<Vec<_>>();
221-
let strm = futures::stream::iter(vec).map(Ok).boxed();
231+
let strm = futures::stream::iter(vec)
232+
// .map(move |x| {
233+
// debug!("range-item from immutable seq={}: {:?}", index, x);
234+
// x
235+
// })
236+
.map(Ok)
237+
.boxed();
222238
kmerge = kmerge.merge(strm);
223239
}
224240

@@ -227,6 +243,10 @@ impl LeveledMapData {
227243
if let Some(db) = self.persisted() {
228244
let map_view = MapView(&db);
229245
let strm = map_view.range(range.clone()).await?;
246+
// let strm = strm.map(|x| {
247+
// debug!("range-item from db: {:?}", x);
248+
// x
249+
// });
230250
kmerge = kmerge.merge(strm);
231251
};
232252

@@ -309,8 +329,7 @@ impl LeveledMap {
309329
/// Return the [`LevelIndex`] of the newest **immutable** data
310330
pub(crate) fn immutable_level_index(&self) -> Option<LevelIndex> {
311331
let immutables = self.data.immutable_levels();
312-
let newest = immutables.newest()?;
313-
Some(*newest.level_index())
332+
immutables.newest_level_index()
314333
}
315334

316335
/// Freeze the current writable level and create a new empty writable level.
@@ -326,23 +345,24 @@ impl LeveledMap {
326345
}
327346

328347
pub fn do_freeze_writable(&self) -> Arc<ImmutableLevels> {
329-
let new_immutable = {
330-
let mut writable = self.data.writable();
331-
let new_writable = writable.new_level();
348+
let mut writable = self.data.writable();
349+
let mut immutables = self.data.immutable_levels.lock().unwrap();
332350

333-
std::mem::replace(&mut *writable, new_writable)
334-
};
351+
let new_writable = writable.new_level();
352+
let new_immutable = std::mem::replace(&mut *writable, new_writable);
335353

336-
let mut immutable_levels = self.data.immutable_levels().as_ref().clone();
354+
let mut immutable_levels = immutables.as_ref().clone();
355+
immutable_levels.insert(Immutable::new_from_level(new_immutable));
337356

338-
immutable_levels.push(Immutable::new_from_level(new_immutable));
357+
*immutables = Arc::new(immutable_levels);
339358

340-
{
341-
let mut immutables = self.data.immutable_levels.lock().unwrap();
342-
*immutables = Arc::new(immutable_levels)
343-
}
359+
info!(
360+
"do_freeze_writable: after writable: {:?}, immutables: {:?}",
361+
writable,
362+
immutables.indexes().collect::<Vec<_>>()
363+
);
344364

345-
self.data.immutable_levels()
365+
immutables.clone()
346366
}
347367

348368
pub fn persisted(&self) -> Option<Arc<DB>> {
@@ -385,38 +405,33 @@ impl LeveledMap {
385405
/// **Important**: Do not drop the compactor within this function when called
386406
/// under a state machine lock, as dropping may take ~250ms.
387407
pub fn replace_with_compacted(&self, compactor: &mut Compactor, db: DB) {
388-
let len = compactor.immutable_levels().len();
389-
390-
// Get the level index of the newest level.
391-
let corresponding_index = compactor
392-
.immutable_levels()
393-
.levels()
394-
.get(len - 1)
395-
.map(|l| l.level_index())
396-
.copied();
397-
398-
assert_eq!(
399-
compactor.since, corresponding_index,
400-
"unexpected change to sm during compaction"
408+
info!(
409+
"replace_with_compacted: compacted upto {:?} immutable levels; my levels: {:?}; compacted levels: {:?}",
410+
compactor.upto,
411+
self.data.immutable_levels().indexes().collect::<Vec<_>>(),
412+
compactor.compacting_data.immutable_levels.indexes().collect::<Vec<_>>(),
401413
);
402414

403-
let my_immutables = self.data.immutable_levels();
404-
let mut levels = my_immutables.levels().clone();
415+
let mut immutables = self.data.immutable_levels().as_ref().clone();
405416

406-
// Remove the levels already included in `persisted`
407-
let uncompacted = levels.split_off(len);
417+
// If there is immutable levels compacted, remove them.
418+
if let Some(upto) = compactor.upto {
419+
immutables.remove_levels_upto(upto);
420+
}
408421

409-
let uncompacted_immutables = ImmutableLevels::new(uncompacted);
410-
411-
// replace the immutables
412-
self.data
413-
.with_immutable_levels(|x| *x = Arc::new(uncompacted_immutables));
422+
// NOTE: Replace data from bottom to top.
423+
// replace the db first, db contains more data.
424+
// Otherwise, there is a chance some data is removed from immutables and the new db containing this data is not inserted.
414425

415426
// replace the persisted
416427
self.data.with_persisted(|p| {
417428
*p = Some(Arc::new(db));
418429
});
419430

431+
// replace the immutables
432+
self.data
433+
.with_immutable_levels(|x| *x = Arc::new(immutables));
434+
420435
info!("replace_with_compacted: finished replacing the db");
421436
}
422437

@@ -437,10 +452,12 @@ impl LeveledMap {
437452
}
438453

439454
pub fn new_compactor(&self, permit: CompactorPermit) -> Compactor {
455+
let level_index = self.immutable_level_index();
456+
440457
Compactor {
441458
_permit: permit,
442459
compacting_data: CompactingData::new(self.immutable_levels(), self.persisted()),
443-
since: self.immutable_level_index(),
460+
upto: level_index,
444461
}
445462
}
446463
}

0 commit comments

Comments
 (0)