Skip to content

Commit 3e86955

Browse files
authored
refactor: restructure leveled map data architecture (#18619)
- Move LeveledMapData to separate module with impl files - Split ApplierData impl blocks into dedicated files - Change immutable_levels.indexes() return type to Vec - Consolidate mutex access patterns inside LeveledMapDataInner - Remove redundant impl_leveled_map_data.rs module This restructuring improves code organization by separating concerns and makes the codebase more maintainable with clearer module boundaries and consistent access patterns.
1 parent ff96040 commit 3e86955

File tree

15 files changed

+559
-423
lines changed

15 files changed

+559
-423
lines changed

src/meta/raft-store/src/applier/applier_data.rs

Lines changed: 0 additions & 134 deletions
This file was deleted.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use map_api::mvcc;
16+
use seq_marked::SeqMarked;
17+
use state_machine_api::ExpireKey;
18+
19+
use crate::applier::applier_data::ApplierData;
20+
use crate::leveled_store::types::Key;
21+
use crate::leveled_store::types::Namespace;
22+
use crate::leveled_store::types::Value;
23+
24+
#[async_trait::async_trait]
25+
impl mvcc::ScopedView<ExpireKey, String> for ApplierData {
26+
fn set(&mut self, key: ExpireKey, value: Option<String>) -> SeqMarked<()> {
27+
self.view.set(
28+
Namespace::Expire,
29+
Key::Expire(key),
30+
value.map(Value::Expire),
31+
)
32+
}
33+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::ops::RangeBounds;
17+
18+
use futures_util::StreamExt;
19+
use futures_util::TryStreamExt;
20+
use map_api::mvcc;
21+
use map_api::mvcc::ViewReadonly;
22+
use map_api::IOResultStream;
23+
use seq_marked::InternalSeq;
24+
use seq_marked::SeqMarked;
25+
use state_machine_api::ExpireKey;
26+
27+
use crate::applier::applier_data::ApplierData;
28+
use crate::leveled_store::types::Key;
29+
use crate::leveled_store::types::Namespace;
30+
31+
#[async_trait::async_trait]
32+
impl mvcc::ScopedViewReadonly<ExpireKey, String> for ApplierData {
33+
fn base_seq(&self) -> InternalSeq {
34+
self.view.base_seq()
35+
}
36+
37+
async fn get(&self, key: ExpireKey) -> Result<SeqMarked<String>, io::Error> {
38+
let got = self.view.get(Namespace::Expire, Key::Expire(key)).await?;
39+
Ok(got.map(|x| x.into_expire()))
40+
}
41+
42+
async fn range<R>(
43+
&self,
44+
range: R,
45+
) -> Result<IOResultStream<(ExpireKey, SeqMarked<String>)>, io::Error>
46+
where
47+
R: RangeBounds<ExpireKey> + Send + Sync + Clone + 'static,
48+
{
49+
let start = range.start_bound().cloned();
50+
let end = range.end_bound().cloned();
51+
52+
let start = start.map(Key::Expire);
53+
let end = end.map(Key::Expire);
54+
55+
let strm = self.view.range(Namespace::Expire, (start, end)).await?;
56+
57+
Ok(strm
58+
.map_ok(|(k, v)| (k.into_expire(), v.map(|x| x.into_expire())))
59+
.boxed())
60+
}
61+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use map_api::mvcc;
16+
use seq_marked::SeqMarked;
17+
use state_machine_api::MetaValue;
18+
use state_machine_api::UserKey;
19+
20+
use crate::applier::applier_data::ApplierData;
21+
use crate::leveled_store::types::Key;
22+
use crate::leveled_store::types::Namespace;
23+
use crate::leveled_store::types::Value;
24+
25+
#[async_trait::async_trait]
26+
impl mvcc::ScopedView<UserKey, MetaValue> for ApplierData {
27+
fn set(&mut self, key: UserKey, value: Option<MetaValue>) -> SeqMarked<()> {
28+
self.view
29+
.set(Namespace::User, Key::User(key), value.map(Value::User))
30+
}
31+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::ops::RangeBounds;
17+
18+
use futures_util::StreamExt;
19+
use futures_util::TryStreamExt;
20+
use map_api::mvcc;
21+
use map_api::mvcc::ViewReadonly;
22+
use map_api::IOResultStream;
23+
use seq_marked::InternalSeq;
24+
use seq_marked::SeqMarked;
25+
use state_machine_api::MetaValue;
26+
use state_machine_api::UserKey;
27+
28+
use crate::applier::applier_data::ApplierData;
29+
use crate::leveled_store::types::Key;
30+
use crate::leveled_store::types::Namespace;
31+
32+
#[async_trait::async_trait]
33+
impl mvcc::ScopedViewReadonly<UserKey, MetaValue> for ApplierData {
34+
fn base_seq(&self) -> InternalSeq {
35+
self.view.base_seq()
36+
}
37+
38+
async fn get(&self, key: UserKey) -> Result<SeqMarked<MetaValue>, io::Error> {
39+
let got = self.view.get(Namespace::User, Key::User(key)).await?;
40+
Ok(got.map(|x| x.into_user()))
41+
}
42+
43+
async fn range<R>(
44+
&self,
45+
range: R,
46+
) -> Result<IOResultStream<(UserKey, SeqMarked<MetaValue>)>, io::Error>
47+
where
48+
R: RangeBounds<UserKey> + Send + Sync + Clone + 'static,
49+
{
50+
let start = range.start_bound().cloned();
51+
let end = range.end_bound().cloned();
52+
53+
let start = start.map(Key::User);
54+
let end = end.map(Key::User);
55+
56+
let strm = self.view.range(Namespace::User, (start, end)).await?;
57+
58+
Ok(strm
59+
.map_ok(|(k, v)| (k.into_user(), v.map(|x| x.into_user())))
60+
.boxed())
61+
}
62+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::sync::Mutex;
17+
use std::time::Duration;
18+
19+
use map_api::mvcc;
20+
21+
use crate::leveled_store::leveled_map::applier_acquirer::WriterPermit;
22+
use crate::leveled_store::leveled_map::leveled_map_data::LeveledMapData;
23+
use crate::leveled_store::types::Key;
24+
use crate::leveled_store::types::Namespace;
25+
use crate::leveled_store::types::Value;
26+
use crate::sm_v003::OnChange;
27+
28+
mod impl_expire_scoped_view;
29+
mod impl_expire_scoped_view_readonly;
30+
mod impl_user_scoped_view;
31+
mod impl_user_scoped_view_readonly;
32+
33+
pub(crate) type StateMachineView = mvcc::View<Namespace, Key, Value, Arc<LeveledMapData>>;
34+
35+
pub(crate) struct ApplierData {
36+
/// Hold a unique permit to serialize all apply operations to the state machine.
37+
pub(crate) _permit: WriterPermit,
38+
39+
pub(crate) view: StateMachineView,
40+
41+
/// Since when to start cleaning expired keys.
42+
pub(crate) cleanup_start_time: Arc<Mutex<Duration>>,
43+
44+
pub(crate) on_change_applied: Arc<Option<OnChange>>,
45+
}

src/meta/raft-store/src/applier/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ use state_machine_api::StateMachineApi;
6565
use crate::state_machine_api_ext::StateMachineApiExt;
6666

6767
pub(crate) mod applier_data;
68-
mod impl_leveled_map_data;
6968

7069
/// A helper that applies raft log `Entry` to the state machine.
7170
pub struct Applier<SM>

0 commit comments

Comments
 (0)