Skip to content

Commit 602a040

Browse files
authored
refactor(raft-store): update trait interfaces and restructure leveled map (#18719)
* refactor(raft-store): update trait interfaces and restructure leveled map Upgrade map-api to 0.4.0 * M Cargo.lock * A src/meta/raft-store/src/leveled_store/leveled_map/impl_scoped_seq_bounded_get.rs * M src/meta/raft-store/src/leveled_store/leveled_map/impl_scoped_seq_bounded_get.rs * M src/meta/types/src/snapshot_db.rs
1 parent 36fd488 commit 602a040

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2896
-1182
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ logforth = { git = "https://github.com/datafuse-extras/logforth", branch = "main
376376
'fastrace',
377377
] }
378378
lz4 = "1.24.0"
379-
map-api = { version = "0.3.1" }
379+
map-api = { version = "0.4.1" }
380380
maplit = "1.0.2"
381381
match-template = "0.0.1"
382382
md-5 = "0.10.5"
@@ -498,7 +498,7 @@ socket2 = "0.5.3"
498498
span-map = { version = "0.2.0" }
499499
sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] }
500500
state = "0.6.0"
501-
state-machine-api = { version = "0.2.1" }
501+
state-machine-api = { version = "0.3.1" }
502502
stream-more = "0.1.3"
503503
strength_reduce = "0.2.4"
504504
stringslice = "0.2.0"
@@ -657,12 +657,12 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
657657
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
658658
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
659659
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.2.0" }
660-
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.3.1" }
660+
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.4.1" }
661661
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" }
662662
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "d82aa6d" }
663663
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "16e433a" }
664664
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
665-
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.2.1" }
665+
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.3.1" }
666666
sub-cache = { git = "https://github.com/databendlabs/sub-cache", tag = "v0.2.1" }
667667
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
668668
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }

src/meta/raft-store/src/applier/applier_data/impl_expire_scoped_view_readonly.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/meta/raft-store/src/applier/applier_data/impl_user_scoped_view_readonly.rs

Lines changed: 0 additions & 62 deletions
This file was deleted.

src/meta/raft-store/src/applier/applier_data/mod.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,10 @@ use std::sync::Arc;
1616
use std::sync::Mutex;
1717
use std::time::Duration;
1818

19-
use map_api::mvcc;
20-
2119
use crate::leveled_store::leveled_map::applier_acquirer::WriterPermit;
22-
use crate::leveled_store::leveled_map::leveled_map_data::LeveledMapData;
23-
use crate::leveled_store::types::Key;
24-
use crate::leveled_store::types::Namespace;
25-
use crate::leveled_store::types::Value;
20+
use crate::leveled_store::view::StateMachineView;
2621
use crate::sm_v003::OnChange;
2722

28-
mod impl_expire_scoped_view;
29-
mod impl_expire_scoped_view_readonly;
30-
mod impl_user_scoped_view;
31-
mod impl_user_scoped_view_readonly;
32-
33-
pub(crate) type StateMachineView = mvcc::View<Namespace, Key, Value, Arc<LeveledMapData>>;
34-
3523
pub(crate) struct ApplierData {
3624
/// Hold a unique permit to serialize all apply operations to the state machine.
3725
pub(crate) _permit: WriterPermit,

src/meta/raft-store/src/leveled_store/db_exporter.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ use databend_common_meta_types::SeqV;
2323
use futures_util::StreamExt;
2424
use futures_util::TryStreamExt;
2525
use log::info;
26-
use map_api::map_api_ro::MapApiRO;
26+
use map_api::mvcc::ScopedSeqBoundedRange;
2727
use map_api::IOResultStream;
28+
use state_machine_api::ExpireKey;
2829
use state_machine_api::ExpireValue;
30+
use state_machine_api::UserKey;
2931

3032
use crate::key_spaces::SMEntry;
31-
use crate::leveled_store::db_map_api_ro_impl::MapView;
32-
use crate::leveled_store::map_api::AsMap;
33+
use crate::leveled_store::db_impl_scoped_seq_bounded_read::ScopedSeqBoundedRead;
3334
use crate::state_machine::StateMachineMetaKey;
3435
use crate::state_machine::StateMachineMetaValue;
3536

@@ -92,7 +93,9 @@ impl<'a> DBExporter<'a> {
9293

9394
// expire index
9495

95-
let strm = MapView(self.db).as_expire_map().range(..).await?;
96+
let strm = ScopedSeqBoundedRead(self.db)
97+
.range(ExpireKey::default().., u64::MAX)
98+
.await?;
9699
let expire_strm = strm.try_filter_map(|(exp_k, marked)| {
97100
// Tombstone will be converted to None and be ignored.
98101
let exp_val = ExpireValue::from_marked(marked);
@@ -102,7 +105,9 @@ impl<'a> DBExporter<'a> {
102105

103106
// kv
104107

105-
let strm = MapView(self.db).as_user_map().range(..).await?;
108+
let strm = ScopedSeqBoundedRead(self.db)
109+
.range(UserKey::default().., u64::MAX)
110+
.await?;
106111
let kv_strm = strm.try_filter_map(|(user_key, seq_marked)| {
107112
// Tombstone will be converted to None and be ignored.
108113
let seqv: Option<SeqV<_>> = seq_marked.into();

src/meta/raft-store/src/leveled_store/db_map_api_ro_impl.rs renamed to src/meta/raft-store/src/leveled_store/db_impl_scoped_seq_bounded_read.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,40 @@
1313
// limitations under the License.
1414

1515
use std::io;
16+
use std::io::Error;
1617
use std::ops::RangeBounds;
1718

1819
use databend_common_meta_types::snapshot_db::DB;
1920
use futures_util::StreamExt;
20-
use map_api::map_api_ro::MapApiRO;
21+
use map_api::mvcc;
22+
use map_api::mvcc::ViewKey;
23+
use map_api::mvcc::ViewValue;
24+
use map_api::IOResultStream;
2125
use rotbl::v001::SeqMarked;
2226

23-
use crate::leveled_store::map_api::KVResultStream;
2427
use crate::leveled_store::map_api::MapKey;
2528
use crate::leveled_store::map_api::MapKeyDecode;
2629
use crate::leveled_store::map_api::MapKeyEncode;
2730
use crate::leveled_store::rotbl_codec::RotblCodec;
2831
use crate::leveled_store::value_convert::ValueConvert;
2932

30-
/// A wrapper that implements the `MapApiRO` trait for the `DB`.
33+
/// A wrapper that implements the `ScopedSnapshot*` trait for the `DB`.
3134
#[derive(Debug, Clone)]
32-
pub struct MapView<'a>(pub &'a DB);
35+
pub struct ScopedSeqBoundedRead<'a>(pub &'a DB);
3336

37+
// TODO: test
3438
#[async_trait::async_trait]
35-
impl<K> MapApiRO<K> for MapView<'_>
39+
impl<K> mvcc::ScopedSeqBoundedGet<K, K::V> for ScopedSeqBoundedRead<'_>
3640
where
3741
K: MapKey,
38-
K: MapKeyEncode,
39-
K: MapKeyDecode,
42+
K: ViewKey,
43+
K: MapKeyEncode + MapKeyDecode,
44+
K::V: ViewValue,
4045
SeqMarked<K::V>: ValueConvert<SeqMarked>,
4146
{
42-
async fn get(&self, key: &K) -> Result<SeqMarked<K::V>, io::Error> {
43-
let key = RotblCodec::encode_key(key)
47+
async fn get(&self, key: K, _snapshot_seq: u64) -> Result<SeqMarked<K::V>, io::Error> {
48+
// TODO: DB does not consider snapshot_seq
49+
let key = RotblCodec::encode_key(&key)
4450
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
4551

4652
let res = self.0.rotbl.get(&key).await?;
@@ -52,9 +58,28 @@ where
5258
let marked = SeqMarked::<K::V>::conv_from(seq_marked)?;
5359
Ok(marked)
5460
}
61+
}
62+
63+
// TODO: test
64+
#[async_trait::async_trait]
65+
impl<K> mvcc::ScopedSeqBoundedRange<K, K::V> for ScopedSeqBoundedRead<'_>
66+
where
67+
K: MapKey,
68+
K: ViewKey,
69+
K: MapKeyEncode + MapKeyDecode,
70+
K::V: ViewValue,
71+
SeqMarked<K::V>: ValueConvert<SeqMarked>,
72+
{
73+
async fn range<R>(
74+
&self,
75+
range: R,
76+
_snapshot_seq: u64,
77+
) -> Result<IOResultStream<(K, SeqMarked<K::V>)>, Error>
78+
where
79+
R: RangeBounds<K> + Send + Sync + Clone + 'static,
80+
{
81+
// TODO: DB does not consider snapshot_seq
5582

56-
async fn range<R>(&self, range: R) -> Result<KVResultStream<K>, io::Error>
57-
where R: RangeBounds<K> + Clone + Send + Sync + 'static {
5883
let rng = RotblCodec::encode_range(&range)
5984
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
6085

0 commit comments

Comments
 (0)