Skip to content

Commit 72d5c82

Browse files
authored
refactor(meta): add state machine lock guards with debugging (#18534)
Replace direct RwLock access with tracked guards that log lock acquisition timing and purpose for better debugging visibility. Key changes: - Add new lock_guards.rs with WriteGuard and ReadGuard wrappers - Replace state_machine.read/write() calls with get_state_machine_read/write() - Add timing instrumentation with purpose tracking for all lock acquisitions - Update all meta service components to use new guard pattern
1 parent d8415ee commit 72d5c82

File tree

13 files changed

+262
-52
lines changed

13 files changed

+262
-52
lines changed

src/meta/control/src/import.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ async fn init_new_cluster(
201201
let mut sto = RaftStore::open(&raft_config).await?;
202202

203203
let last_applied = {
204-
let sm2 = sto.get_state_machine().await;
204+
let sm2 = sto.get_state_machine_write("get-last-applied").await;
205205
*sm2.sys_data_ref().last_applied_ref()
206206
};
207207

@@ -213,7 +213,9 @@ async fn init_new_cluster(
213213

214214
// Update snapshot: Replace nodes set and membership config.
215215
{
216-
let mut sm2 = sto.get_state_machine().await;
216+
let mut sm2 = sto
217+
.get_state_machine_write("init_new_cluster-get-last-membership")
218+
.await;
217219

218220
*sm2.sys_data_mut().nodes_mut() = nodes.clone();
219221

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,10 @@ impl MetaService for MetaServiceImpl {
457457
// This approach prevents race conditions and guarantees that no events will be
458458
// delivered out of order to the watcher.
459459
let stream = {
460-
let sm = &mn.raft_store.state_machine;
461-
let sm = sm.write().await;
462-
463-
info!("enter sm write lock for watch {}", watch);
460+
let sm = mn
461+
.raft_store
462+
.get_state_machine_read("new-watch-stream")
463+
.await;
464464

465465
let sender = mn.new_watch_sender(watch, tx.clone())?;
466466
let sender_str = sender.to_string();

src/meta/service/src/api/http/v1/features.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ pub async fn set(
114114

115115
pub async fn features_state(meta_node: &Arc<MetaNode>) -> FeatureResponse {
116116
let enabled = {
117-
let mut sm = meta_node.raft_store.state_machine.write().await;
117+
let mut sm = meta_node
118+
.raft_store
119+
.get_state_machine_write("get-state-machine-features")
120+
.await;
118121
let x = sm.sys_data_mut().features();
119122
x.iter().cloned().collect()
120123
};

src/meta/service/src/meta_service/meta_leader.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ use std::time::Duration;
1717
use std::time::SystemTime;
1818

1919
use anyerror::AnyError;
20-
use databend_common_base::base::tokio::sync::RwLockReadGuard;
2120
use databend_common_meta_client::MetaGrpcReadReq;
2221
use databend_common_meta_kvapi::kvapi::KVApi;
23-
use databend_common_meta_raft_store::sm_v003::SMV003;
2422
use databend_common_meta_sled_store::openraft::ChangeMembers;
2523
use databend_common_meta_stoerr::MetaStorageError;
2624
use databend_common_meta_types::node::Node;
@@ -92,17 +90,35 @@ impl Handler<ForwardRequestBody> for MetaLeader<'_> {
9290
}
9391

9492
ForwardRequestBody::GetKV(req) => {
95-
let sm = self.get_state_machine().await;
93+
let sm = self
94+
.sto
95+
.get_state_machine_read(format!(
96+
"MetaLeader::handle(ForwardRequestBody: {:?})",
97+
req
98+
))
99+
.await;
96100
let res = sm.kv_api().get_kv(&req.key).await.unwrap();
97101
Ok(ForwardResponse::GetKV(res))
98102
}
99103
ForwardRequestBody::MGetKV(req) => {
100-
let sm = self.get_state_machine().await;
104+
let sm = self
105+
.sto
106+
.get_state_machine_read(format!(
107+
"MetaLeader::handle(ForwardRequestBody: {:?})",
108+
req
109+
))
110+
.await;
101111
let res = sm.kv_api().mget_kv(&req.keys).await.unwrap();
102112
Ok(ForwardResponse::MGetKV(res))
103113
}
104114
ForwardRequestBody::ListKV(req) => {
105-
let sm = self.get_state_machine().await;
115+
let sm = self
116+
.sto
117+
.get_state_machine_read(format!(
118+
"MetaLeader::handle(ForwardRequestBody: {:?})",
119+
req
120+
))
121+
.await;
106122
let res = sm.kv_api().list_kv_collect(&req.prefix).await.unwrap();
107123
Ok(ForwardResponse::ListKV(res))
108124
}
@@ -119,7 +135,10 @@ impl Handler<MetaGrpcReadReq> for MetaLeader<'_> {
119135
) -> Result<BoxStream<StreamItem>, MetaOperationError> {
120136
debug!(req :? =(&req); "handle(MetaGrpcReadReq)");
121137

122-
let sm = self.get_state_machine().await;
138+
let sm = self
139+
.sto
140+
.get_state_machine_read(format!("MetaLeader::handle(MetaGrpcReadReq: {:?})", req))
141+
.await;
123142
let kv_api = sm.kv_api();
124143

125144
match req.body {
@@ -290,7 +309,7 @@ impl<'a> MetaLeader<'a> {
290309
/// A cluster must have at least one node in it.
291310
async fn can_leave(&self, id: NodeId) -> Result<Result<(), String>, MetaStorageError> {
292311
let membership = {
293-
let sm = self.get_state_machine().await;
312+
let sm = self.sto.get_state_machine_read("can_leave").await;
294313
sm.sys_data_ref().last_membership_ref().membership().clone()
295314
};
296315
info!("check can_leave: id: {}, membership: {:?}", id, membership);
@@ -306,10 +325,6 @@ impl<'a> MetaLeader<'a> {
306325

307326
Ok(Ok(()))
308327
}
309-
310-
async fn get_state_machine(&self) -> RwLockReadGuard<'_, SMV003> {
311-
self.sto.state_machine.read().await
312-
}
313328
}
314329

315330
fn since_epoch() -> Duration {

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl MetaNodeBuilder {
180180
move |change| h.send_change(change)
181181
};
182182

183-
sto.get_state_machine()
183+
sto.get_state_machine_write("set_on_change_applied-hook")
184184
.await
185185
.set_on_change_applied(Box::new(on_change_applied));
186186

@@ -775,7 +775,10 @@ impl MetaNode {
775775
/// Only when the membership is committed, this node can be sure it is in a cluster.
776776
async fn is_in_cluster(&self) -> Result<Result<String, String>, MetaStorageError> {
777777
let membership = {
778-
let sm = self.raft_store.get_state_machine().await;
778+
let sm = self
779+
.raft_store
780+
.get_state_machine_write("is_in_cluster-get-membership")
781+
.await;
779782
sm.sys_data_ref().last_membership_ref().membership().clone()
780783
};
781784
info!("is_in_cluster: membership: {:?}", membership);
@@ -879,7 +882,7 @@ impl MetaNode {
879882
pub async fn get_node(&self, node_id: &NodeId) -> Option<Node> {
880883
// inconsistent get: from local state machine
881884

882-
let sm = self.raft_store.state_machine.read().await;
885+
let sm = self.raft_store.get_state_machine_read("get-node").await;
883886
let n = sm.sys_data_ref().nodes_ref().get(node_id).cloned();
884887
n
885888
}
@@ -888,7 +891,7 @@ impl MetaNode {
888891
pub async fn get_nodes(&self) -> Vec<Node> {
889892
// inconsistent get: from local state machine
890893

891-
let sm = self.raft_store.state_machine.read().await;
894+
let sm = self.raft_store.get_state_machine_read("get-nodes").await;
892895
let nodes = sm
893896
.sys_data_ref()
894897
.nodes_ref()
@@ -976,7 +979,7 @@ impl MetaNode {
976979
}
977980

978981
pub(crate) async fn get_last_seq(&self) -> u64 {
979-
let sm = self.raft_store.state_machine.read().await;
982+
let sm = self.raft_store.get_state_machine_read("get-last-seq").await;
980983
sm.sys_data_ref().curr_seq()
981984
}
982985

@@ -985,7 +988,10 @@ impl MetaNode {
985988
// Maybe stale get: from local state machine
986989

987990
let nodes = {
988-
let sm = self.raft_store.state_machine.read().await;
991+
let sm = self
992+
.raft_store
993+
.get_state_machine_read("get-grpc-advertise-addrs")
994+
.await;
989995
sm.sys_data_ref()
990996
.nodes_ref()
991997
.values()
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
use std::ops::Deref;
17+
use std::ops::DerefMut;
18+
use std::time::Instant;
19+
20+
use log::info;
21+
use tokio::sync::RwLockReadGuard;
22+
use tokio::sync::RwLockWriteGuard;
23+
24+
#[derive(Debug)]
25+
pub struct WriteGuard<'a, T: ?Sized> {
26+
acquired: Instant,
27+
purpose: String,
28+
inner: RwLockWriteGuard<'a, T>,
29+
}
30+
31+
impl<'a, T> WriteGuard<'a, T> {
32+
pub fn new(purpose: impl ToString, inner: RwLockWriteGuard<'a, T>) -> Self {
33+
Self {
34+
acquired: Instant::now(),
35+
purpose: purpose.to_string(),
36+
inner,
37+
}
38+
}
39+
}
40+
41+
impl<T> Deref for WriteGuard<'_, T> {
42+
type Target = T;
43+
44+
fn deref(&self) -> &Self::Target {
45+
self.inner.deref()
46+
}
47+
}
48+
49+
impl<T> DerefMut for WriteGuard<'_, T> {
50+
fn deref_mut(&mut self) -> &mut Self::Target {
51+
self.inner.deref_mut()
52+
}
53+
}
54+
55+
impl<T> fmt::Display for WriteGuard<'_, T>
56+
where T: fmt::Display
57+
{
58+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59+
write!(
60+
f,
61+
"StateMachineLock-Write-Guard({}): {}",
62+
self.purpose, self.inner
63+
)
64+
}
65+
}
66+
67+
impl<T: ?Sized> Drop for WriteGuard<'_, T> {
68+
fn drop(&mut self) {
69+
let elapsed = self.acquired.elapsed();
70+
info!(
71+
"StateMachineLock-Write-Guard({}) released after {:?}",
72+
self.purpose, elapsed
73+
);
74+
}
75+
}
76+
77+
#[derive(Debug)]
78+
pub struct ReadGuard<'a, T: ?Sized> {
79+
acquired: Instant,
80+
purpose: String,
81+
inner: RwLockReadGuard<'a, T>,
82+
}
83+
84+
impl<'a, T> ReadGuard<'a, T> {
85+
pub fn new(purpose: impl ToString, inner: RwLockReadGuard<'a, T>) -> Self {
86+
Self {
87+
acquired: Instant::now(),
88+
purpose: purpose.to_string(),
89+
inner,
90+
}
91+
}
92+
}
93+
94+
impl<T> Deref for ReadGuard<'_, T> {
95+
type Target = T;
96+
97+
fn deref(&self) -> &Self::Target {
98+
self.inner.deref()
99+
}
100+
}
101+
102+
impl<T> fmt::Display for ReadGuard<'_, T>
103+
where T: fmt::Display
104+
{
105+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106+
write!(
107+
f,
108+
"StateMachineLock-Read-Guard({}): {}",
109+
self.purpose, self.inner
110+
)
111+
}
112+
}
113+
114+
impl<T: ?Sized> Drop for ReadGuard<'_, T> {
115+
fn drop(&mut self) {
116+
let elapsed = self.acquired.elapsed();
117+
info!(
118+
"StateMachineLock-Read-Guard({}) released after {:?}",
119+
self.purpose, elapsed
120+
);
121+
}
122+
}

src/meta/service/src/store/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ mod raft_state_machine_impl;
1818
mod store;
1919
mod store_inner;
2020

21+
pub mod lock_guards;
22+
2123
pub use store::RaftStore;
2224
pub use store_inner::RaftStoreInner;

src/meta/service/src/store/raft_state_machine_impl.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ impl RaftStateMachine<TypeConfig> for RaftStore {
4444
type SnapshotBuilder = RaftStore;
4545

4646
async fn applied_state(&mut self) -> Result<(Option<LogId>, StoredMembership), StorageError> {
47-
let sm = self.state_machine.read().await;
47+
let sm = self
48+
.get_state_machine_read("RaftStateMachine::applied_state")
49+
.await;
4850
let last_applied = *sm.sys_data_ref().last_applied_ref();
4951
let last_membership = sm.sys_data_ref().last_membership_ref().clone();
5052

@@ -62,7 +64,9 @@ impl RaftStateMachine<TypeConfig> for RaftStore {
6264
I: IntoIterator<Item = Entry> + OptionalSend,
6365
I::IntoIter: OptionalSend,
6466
{
65-
let mut sm = self.state_machine.write().await;
67+
let mut sm = self
68+
.get_state_machine_write("RaftStateMachine::apply")
69+
.await;
6670
let res = sm.apply_entries(entries).await?;
6771

6872
Ok(res)
@@ -127,7 +131,9 @@ impl RaftStateMachine<TypeConfig> for RaftStore {
127131
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot>, StorageError> {
128132
info!(id = self.id; "get snapshot start");
129133

130-
let r = self.state_machine.read().await;
134+
let r = self
135+
.get_state_machine_read("RaftStateMachine::get_current_snapshot")
136+
.await;
131137
let db = r.levels().persisted().cloned();
132138

133139
let snapshot = db.map(|x| Snapshot {

0 commit comments

Comments
 (0)