Skip to content

Commit 5ed4495

Browse files
authored
feat(meta-service): add timing monitoring for Raft-Log IO (#18509)
Enhance `IODesc` with timing fields to track operation phases: - Add `start_time`, `flush_time`, `flush_done_time`, `done_time` fields - Update Callback constructors to accept `IODesc` instead of string context - Add slow operation warnings for flush operations > 50ms - Improve logging with detailed timing information display This provides better observability for raft log performance debugging and helps identify slow IO operations in production systems.
1 parent a02c999 commit 5ed4495

File tree

4 files changed

+146
-17
lines changed

4 files changed

+146
-17
lines changed

src/meta/raft-store/src/raft_log_v004/callback.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,56 +14,72 @@
1414

1515
use std::io;
1616
use std::sync::mpsc::SyncSender;
17+
use std::time::Duration;
1718

1819
use databend_common_meta_types::raft_types;
1920
use log::info;
2021
use log::warn;
2122
use tokio::sync::oneshot;
2223

2324
use crate::raft_log_v004::callback_data::CallbackData;
25+
use crate::raft_log_v004::IODesc;
2426

2527
/// The callback to be called when the IO is completed.
2628
///
2729
/// This is used as a wrapper of Openraft callback or used directly internally in RaftLog.
2830
pub struct Callback {
29-
context: String,
31+
io_desc: IODesc,
3032
data: CallbackData,
3133
}
3234

3335
impl Callback {
34-
pub fn new_io_flushed(io_flushed: raft_types::IOFlushed, context: impl ToString) -> Self {
36+
pub fn new_io_flushed(io_flushed: raft_types::IOFlushed, mut io_desc: IODesc) -> Self {
37+
io_desc.set_flush_time();
38+
3539
Callback {
36-
context: context.to_string(),
40+
io_desc,
3741
data: CallbackData::IOFlushed(io_flushed),
3842
}
3943
}
4044

41-
pub fn new_oneshot(tx: oneshot::Sender<Result<(), io::Error>>, context: impl ToString) -> Self {
45+
pub fn new_oneshot(tx: oneshot::Sender<Result<(), io::Error>>, mut io_desc: IODesc) -> Self {
46+
io_desc.set_flush_time();
47+
4248
Callback {
43-
context: context.to_string(),
49+
io_desc,
4450
data: CallbackData::Oneshot(tx),
4551
}
4652
}
4753

48-
pub fn new_sync_oneshot(tx: SyncSender<Result<(), io::Error>>, context: impl ToString) -> Self {
54+
pub fn new_sync_oneshot(tx: SyncSender<Result<(), io::Error>>, mut io_desc: IODesc) -> Self {
55+
io_desc.set_flush_time();
56+
4957
Callback {
50-
context: context.to_string(),
58+
io_desc,
5159
data: CallbackData::SyncOneshot(tx),
5260
}
5361
}
5462
}
5563

5664
impl raft_log::Callback for Callback {
57-
fn send(self, res: Result<(), io::Error>) {
58-
info!("{}: Callback is called with: {:?}", self.context, res);
65+
fn send(mut self, res: Result<(), io::Error>) {
66+
self.io_desc.set_flush_done_time();
67+
68+
info!("{}: Callback is called with: {:?}", self.io_desc, res);
69+
70+
if let Some(duration) = self.io_desc.total_duration() {
71+
if duration > Duration::from_millis(50) {
72+
warn!("{}: Slow IO operation: {:?}", self.io_desc, res);
73+
}
74+
}
5975

6076
match self.data {
6177
CallbackData::Oneshot(tx) => {
6278
let send_res = tx.send(res);
6379
if send_res.is_err() {
6480
warn!(
6581
"{}: Callback failed to send Oneshot result back to caller",
66-
self.context
82+
self.io_desc
6783
);
6884
}
6985
}

src/meta/raft-store/src/raft_log_v004/io_desc.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,66 @@
1414

1515
use std::error::Error;
1616
use std::fmt;
17+
use std::time::Duration;
18+
use std::time::SystemTime;
1719

1820
use databend_common_meta_types::anyerror::AnyError;
1921
use databend_common_meta_types::raft_types::ErrorSubject;
2022
use databend_common_meta_types::raft_types::ErrorVerb;
2123
use databend_common_meta_types::raft_types::StorageError;
24+
use display_more::DisplayUnixTimeStampExt;
2225
use log::debug;
2326
use log::error;
2427

2528
use crate::raft_log_v004::io_phase::IOPhase;
2629

2730
/// Describe an IO operation.
31+
#[derive(Clone)]
2832
pub struct IODesc {
2933
pub subject: ErrorSubject,
3034
pub verb: ErrorVerb,
3135
pub ctx: String,
36+
37+
/// Unix timestamp when this IO operation started.
38+
pub start_time: Duration,
39+
40+
/// Unix timestamp when the flush request is sent.
41+
pub flush_time: Option<Duration>,
42+
43+
/// Unix timestamp when the flush request is completed.
44+
pub flush_done_time: Option<Duration>,
45+
46+
/// Unix timestamp when the IO operation is done.
47+
pub done_time: Option<Duration>,
3248
}
3349

3450
impl fmt::Display for IODesc {
3551
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36-
write!(f, "{}: ({}-{:?})", self.ctx, self.verb, self.subject)
52+
write!(
53+
f,
54+
"IODesc({}): ({}-{:?})",
55+
self.ctx, self.verb, self.subject
56+
)?;
57+
58+
let start_time = self.start_time;
59+
write!(f, "[start: {}", start_time.display_unix_timestamp_short())?;
60+
61+
if let Some(flush_time) = self.flush_time {
62+
let elapsed = flush_time.saturating_sub(start_time);
63+
write!(f, ", flush: +{:?}", elapsed,)?;
64+
65+
if let Some(flushed_time) = self.flush_done_time {
66+
let elapsed = flushed_time.saturating_sub(flush_time);
67+
write!(f, ", flushed: +{:?}", elapsed,)?;
68+
}
69+
}
70+
71+
if let Some(done_time) = self.done_time {
72+
let elapsed = done_time.saturating_sub(self.start_time);
73+
write!(f, ", total: {:?}", elapsed)?;
74+
}
75+
76+
write!(f, "]")
3777
}
3878
}
3979

@@ -45,9 +85,40 @@ impl IODesc {
4585
subject,
4686
verb,
4787
ctx: s,
88+
start_time: Self::since_epoch(),
89+
flush_time: None,
90+
flush_done_time: None,
91+
done_time: None,
4892
}
4993
}
5094

95+
pub fn set_flush_time(&mut self) {
96+
self.flush_time = Some(Self::since_epoch());
97+
}
98+
99+
pub fn set_flush_done_time(&mut self) {
100+
self.flush_done_time = Some(Self::since_epoch());
101+
}
102+
103+
pub fn set_done_time(&mut self) {
104+
self.done_time = Some(Self::since_epoch());
105+
}
106+
107+
pub fn total_duration(&self) -> Option<Duration> {
108+
self.flush_done_time
109+
.map(|flushed_time| flushed_time.saturating_sub(self.start_time))
110+
}
111+
112+
fn since_epoch() -> Duration {
113+
SystemTime::now()
114+
.duration_since(SystemTime::UNIX_EPOCH)
115+
.unwrap()
116+
}
117+
118+
pub fn unknown(ctx: impl ToString) -> Self {
119+
Self::start(ErrorSubject::None, ErrorVerb::Write, ctx)
120+
}
121+
51122
pub fn save_vote(ctx: impl ToString) -> Self {
52123
Self::start(ErrorSubject::Vote, ErrorVerb::Write, ctx)
53124
}
@@ -114,3 +185,40 @@ impl IODesc {
114185
self.to_storage_error(IOPhase::ReceiveFlushCallback, err)
115186
}
116187
}
188+
189+
#[cfg(test)]
190+
mod tests {
191+
use super::*;
192+
193+
#[test]
194+
fn test_display() {
195+
let now_us = 1754708580036171;
196+
197+
let mut desc = IODesc::start(ErrorSubject::Logs, ErrorVerb::Write, "test");
198+
assert!(desc.start_time.as_secs() > 0);
199+
desc.start_time = Duration::from_micros(now_us);
200+
201+
assert_eq!(
202+
format!("{}", desc),
203+
"IODesc(test): (Write-Logs)[start: 2025-08-09T03:03:00.036]"
204+
);
205+
206+
desc.flush_time = Some(Duration::from_micros(now_us + 1_000_000));
207+
assert_eq!(
208+
format!("{}", desc),
209+
"IODesc(test): (Write-Logs)[start: 2025-08-09T03:03:00.036, flush: +1s]"
210+
);
211+
212+
desc.flush_done_time = Some(Duration::from_micros(now_us + 2_000_000));
213+
assert_eq!(
214+
format!("{}", desc),
215+
"IODesc(test): (Write-Logs)[start: 2025-08-09T03:03:00.036, flush: +1s, flushed: +1s]"
216+
);
217+
218+
desc.done_time = Some(Duration::from_micros(now_us + 3_000_000));
219+
assert_eq!(
220+
format!("{}", desc),
221+
"IODesc(test): (Write-Logs)[start: 2025-08-09T03:03:00.036, flush: +1s, flushed: +1s, total: 3s]"
222+
);
223+
}
224+
}

src/meta/raft-store/src/raft_log_v004/util.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ use raft_log::api::raft_log_writer::RaftLogWriter;
1818
use tokio::sync::oneshot;
1919

2020
use crate::raft_log_v004::callback::Callback;
21+
use crate::raft_log_v004::IODesc;
2122
use crate::raft_log_v004::RaftLogV004;
2223

2324
pub async fn blocking_flush(rl: &mut RaftLogV004) -> Result<(), io::Error> {
2425
let (tx, rx) = oneshot::channel();
25-
let callback = Callback::new_oneshot(tx, "blocking_flush");
26+
let callback = Callback::new_oneshot(tx, IODesc::unknown("blocking_flush(RaftLogV004)"));
2627

2728
rl.flush(callback)?;
2829
rx.await

src/meta/service/src/store/raft_log_storage_impl.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl RaftLogReader<TypeConfig> for RaftStore {
103103
) -> Result<Vec<Entry>, StorageError> {
104104
let (start, end) = range_boundary(range);
105105

106-
let io = IODesc::read_logs(format!(
106+
let mut io = IODesc::read_logs(format!(
107107
"RaftStore(id={})::try_get_log_entries([{},{})",
108108
self.id, start, end
109109
));
@@ -119,7 +119,8 @@ impl RaftLogReader<TypeConfig> for RaftStore {
119119
.collect::<Result<Vec<_>, _>>()
120120
.map_err(|e| io.err_submit(e))?;
121121

122-
debug!("{}", io.ok_done());
122+
io.set_done_time();
123+
info!("{}", io.ok_done());
123124
Ok(entries)
124125
}
125126

@@ -193,7 +194,7 @@ impl RaftLogStorage<TypeConfig> for RaftStore {
193194
let mut log = self.log.write().await;
194195

195196
log.save_vote(Cw(*vote)).map_err(|e| io.err_submit(e))?;
196-
log.flush(raft_log_v004::Callback::new_oneshot(tx, &io))
197+
log.flush(raft_log_v004::Callback::new_oneshot(tx, io.clone()))
197198
.map_err(|e| io.err_submit_flush(e))?;
198199
}
199200

@@ -230,8 +231,11 @@ impl RaftLogStorage<TypeConfig> for RaftStore {
230231

231232
debug!("{}", io.ok_submit());
232233

233-
log.flush(raft_log_v004::Callback::new_io_flushed(callback, &io))
234-
.map_err(|e| io.err_submit_flush(e))?;
234+
log.flush(raft_log_v004::Callback::new_io_flushed(
235+
callback,
236+
io.clone(),
237+
))
238+
.map_err(|e| io.err_submit_flush(e))?;
235239

236240
info!("{}", io.ok_submit_flush());
237241

0 commit comments

Comments
 (0)