Skip to content

Commit da7285f

Browse files
authored
refactor: avoid using RWlock, use semaphore instead (#18601)
* feat(meta-service): implement MVCC state machine architecture with controlled data access NON-BREAKING CHANGE: State machine data access patterns changed from direct mutable references to controlled access with closures - Upgrade map-api to 0.3.0 and state-machine-api to 0.2.1 for MVCC support - Replace sys_data_mut() direct access with with_sys_data() closures - Add ApplierData and StateMachineView for MVCC operations - Implement ViewReadonly and ScopedView traits for user and expire namespaces - Refactor all state modifications to use controlled access patterns - Update applier to serialize state machine operations with permits - Migrate from direct field access to functional data manipulation * chore: fix state-machine-api version * chore: fix tests * chore: fix lint * chore: fix lint * chore: move out name space, key, value * chore remove unused MapApi impl * chore: fix test * refactor: avoid using RWlock, use semaphore instead
1 parent 294396e commit da7285f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1843
-1070
lines changed

Cargo.lock

Lines changed: 8 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ logforth = { git = "https://github.com/datafuse-extras/logforth", branch = "main
374374
'fastrace',
375375
] }
376376
lz4 = "1.24.0"
377-
map-api = { version = "0.2.5" }
377+
map-api = { version = "0.3.1" }
378378
maplit = "1.0.2"
379379
match-template = "0.0.1"
380380
md-5 = "0.10.5"
@@ -470,7 +470,7 @@ rustyline = "14"
470470
scroll = "0.12.0"
471471
self_cell = "1.2.0"
472472
semver = "1.0.14"
473-
seq-marked = { version = "0.3.1", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
473+
seq-marked = { version = "0.3.3", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
474474
serde = { version = "1.0.164", features = ["derive", "rc"] }
475475
serde_derive = "1"
476476
serde_ignored = "0.1.10"
@@ -496,7 +496,7 @@ socket2 = "0.5.3"
496496
span-map = { version = "0.2.0" }
497497
sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] }
498498
state = "0.6.0"
499-
state-machine-api = { version = "0.1.1" }
499+
state-machine-api = { version = "0.2.1" }
500500
stream-more = "0.1.3"
501501
strength_reduce = "0.2.4"
502502
stringslice = "0.2.0"
@@ -656,12 +656,12 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
656656
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
657657
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
658658
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.2.0" }
659-
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.7" }
659+
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.3.1" }
660660
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" }
661661
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "d82aa6d" }
662662
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
663663
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
664-
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.1.1" }
664+
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.2.1" }
665665
sub-cache = { git = "https://github.com/databendlabs/sub-cache", tag = "v0.2.1" }
666666
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
667667
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }

src/meta/control/src/import.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ async fn init_new_cluster(
201201
let mut sto = RaftStore::open(&raft_config).await?;
202202

203203
let last_applied = {
204-
let sm2 = sto.get_state_machine_write("get-last-applied").await;
205-
*sm2.sys_data_ref().last_applied_ref()
204+
let sm2 = sto.state_machine();
205+
*sm2.sys_data().last_applied_ref()
206206
};
207207

208208
let last_log_id = std::cmp::max(last_applied, max_log_id);
@@ -213,17 +213,15 @@ async fn init_new_cluster(
213213

214214
// Update snapshot: Replace nodes set and membership config.
215215
{
216-
let mut sm2 = sto
217-
.get_state_machine_write("init_new_cluster-get-last-membership")
218-
.await;
219-
220-
*sm2.sys_data_mut().nodes_mut() = nodes.clone();
216+
let sm2 = sto.state_machine();
221217

222218
// It must set membership to state machine because
223219
// the snapshot may contain more logs than the last_log_id.
224220
// In which case, logs will be purged upon startup.
225-
*sm2.sys_data_mut().last_membership_mut() =
226-
StoredMembership::new(last_applied, membership.clone());
221+
sm2.with_sys_data(|s| {
222+
*s.nodes_mut() = nodes.clone();
223+
*s.last_membership_mut() = StoredMembership::new(last_applied, membership.clone());
224+
});
227225
}
228226

229227
// Build snapshot to persist state machine.

src/meta/kvapi-test-suite/src/kvapi_test_suite.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,15 @@ impl TestSuite {
8888
self.kv_write_read(&builder.build().await).await?;
8989
self.kv_delete(&builder.build().await).await?;
9090
self.kv_update(&builder.build().await).await?;
91+
9192
self.kv_timeout(&builder.build().await).await?;
9293
self.kv_expire_sec_or_ms(&builder.build().await).await?;
9394
self.kv_upsert_with_ttl(&builder.build().await).await?;
95+
9496
self.kv_meta(&builder.build().await).await?;
9597
self.kv_list(&builder.build().await).await?;
9698
self.kv_mget(&builder.build().await).await?;
99+
97100
self.kv_txn_absent_seq_0(&builder.build().await).await?;
98101
self.kv_transaction(&builder.build().await).await?;
99102
self.kv_transaction_fetch_add_u64(&builder.build().await)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::ops::RangeBounds;
17+
use std::sync::Arc;
18+
use std::sync::Mutex;
19+
use std::time::Duration;
20+
21+
use futures_util::StreamExt;
22+
use futures_util::TryStreamExt;
23+
use map_api::mvcc;
24+
use map_api::mvcc::ViewReadonly;
25+
use map_api::IOResultStream;
26+
use seq_marked::InternalSeq;
27+
use seq_marked::SeqMarked;
28+
use state_machine_api::ExpireKey;
29+
use state_machine_api::MetaValue;
30+
use state_machine_api::UserKey;
31+
32+
use crate::leveled_store::leveled_map::applier_acquirer::WriterPermit;
33+
use crate::leveled_store::leveled_map::LeveledMapData;
34+
use crate::leveled_store::types::Key;
35+
use crate::leveled_store::types::Namespace;
36+
use crate::leveled_store::types::Value;
37+
use crate::sm_v003::OnChange;
38+
39+
pub(crate) type StateMachineView = mvcc::View<Namespace, Key, Value, Arc<LeveledMapData>>;
40+
41+
pub(crate) struct ApplierData {
42+
/// Hold a unique permit to serialize all apply operations to the state machine.
43+
pub(crate) _permit: WriterPermit,
44+
45+
pub(crate) view: StateMachineView,
46+
47+
/// Since when to start cleaning expired keys.
48+
pub(crate) cleanup_start_time: Arc<Mutex<Duration>>,
49+
50+
pub(crate) on_change_applied: Arc<Option<OnChange>>,
51+
}
52+
53+
#[async_trait::async_trait]
54+
impl mvcc::ScopedViewReadonly<UserKey, MetaValue> for ApplierData {
55+
fn base_seq(&self) -> InternalSeq {
56+
self.view.base_seq()
57+
}
58+
59+
async fn get(&self, key: UserKey) -> Result<SeqMarked<MetaValue>, io::Error> {
60+
let got = self.view.get(Namespace::User, Key::User(key)).await?;
61+
Ok(got.map(|x| x.into_user()))
62+
}
63+
64+
async fn range<R>(
65+
&self,
66+
range: R,
67+
) -> Result<IOResultStream<(UserKey, SeqMarked<MetaValue>)>, io::Error>
68+
where
69+
R: RangeBounds<UserKey> + Send + Sync + Clone + 'static,
70+
{
71+
let start = range.start_bound().cloned();
72+
let end = range.end_bound().cloned();
73+
74+
let start = start.map(Key::User);
75+
let end = end.map(Key::User);
76+
77+
let strm = self.view.range(Namespace::User, (start, end)).await?;
78+
79+
Ok(strm
80+
.map_ok(|(k, v)| (k.into_user(), v.map(|x| x.into_user())))
81+
.boxed())
82+
}
83+
}
84+
85+
#[async_trait::async_trait]
86+
impl mvcc::ScopedView<UserKey, MetaValue> for ApplierData {
87+
fn set(&mut self, key: UserKey, value: Option<MetaValue>) -> SeqMarked<()> {
88+
self.view
89+
.set(Namespace::User, Key::User(key), value.map(Value::User))
90+
}
91+
}
92+
93+
#[async_trait::async_trait]
94+
impl mvcc::ScopedViewReadonly<ExpireKey, String> for ApplierData {
95+
fn base_seq(&self) -> InternalSeq {
96+
self.view.base_seq()
97+
}
98+
99+
async fn get(&self, key: ExpireKey) -> Result<SeqMarked<String>, io::Error> {
100+
let got = self.view.get(Namespace::Expire, Key::Expire(key)).await?;
101+
Ok(got.map(|x| x.into_expire()))
102+
}
103+
104+
async fn range<R>(
105+
&self,
106+
range: R,
107+
) -> Result<IOResultStream<(ExpireKey, SeqMarked<String>)>, io::Error>
108+
where
109+
R: RangeBounds<ExpireKey> + Send + Sync + Clone + 'static,
110+
{
111+
let start = range.start_bound().cloned();
112+
let end = range.end_bound().cloned();
113+
114+
let start = start.map(Key::Expire);
115+
let end = end.map(Key::Expire);
116+
117+
let strm = self.view.range(Namespace::Expire, (start, end)).await?;
118+
119+
Ok(strm
120+
.map_ok(|(k, v)| (k.into_expire(), v.map(|x| x.into_expire())))
121+
.boxed())
122+
}
123+
}
124+
125+
#[async_trait::async_trait]
126+
impl mvcc::ScopedView<ExpireKey, String> for ApplierData {
127+
fn set(&mut self, key: ExpireKey, value: Option<String>) -> SeqMarked<()> {
128+
self.view.set(
129+
Namespace::Expire,
130+
Key::Expire(key),
131+
value.map(Value::Expire),
132+
)
133+
}
134+
}

0 commit comments

Comments
 (0)