Skip to content

Commit 1c650e8

Browse files
authored
feat(meta): optimize expired key cleanup with time-range filtering (#18441)
Update state-machine-api from v0.1.0 to v0.1.1 and improve expired key cleanup efficiency: - Add `cleanup_start_timestamp` tracking to avoid re-scanning processed tombstone entries - Simplify expired key removal logic by eliminating redundant checks
1 parent 378c6cc commit 1c650e8

File tree

6 files changed

+54
-24
lines changed

6 files changed

+54
-24
lines changed

โ€ŽCargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€ŽCargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ socket2 = "0.5.3"
503503
span-map = { version = "0.2.0" }
504504
sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] }
505505
state = "0.6.0"
506-
state-machine-api = { version = "0.1.0" }
506+
state-machine-api = { version = "0.1.1" }
507507
stream-more = "0.1.3"
508508
strength_reduce = "0.2.4"
509509
stringslice = "0.2.0"
@@ -666,7 +666,7 @@ openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-al
666666
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "d82aa6d" }
667667
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
668668
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
669-
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.1.0" }
669+
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.1.1" }
670670
sub-cache = { git = "https://github.com/databendlabs/sub-cache", tag = "v0.2.1" }
671671
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
672672
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }

โ€Žsrc/meta/raft-store/src/applier/mod.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,11 @@ where SM: StateMachineApi<SysData> + 'static
660660
let mut to_clean = vec![];
661661
let mut strm = self.sm.list_expire_index(log_time_ms).await?;
662662

663+
// Save the log time for next cleaning.
664+
// Avoid listing tombstone records.
665+
self.sm
666+
.set_cleanup_start_timestamp(Duration::from_millis(log_time_ms));
667+
663668
{
664669
let mut strm = std::pin::pin!(strm);
665670
while let Some((expire_key, key)) = strm.try_next().await? {
@@ -672,19 +677,9 @@ where SM: StateMachineApi<SysData> + 'static
672677
}
673678

674679
for (expire_key, key) in to_clean {
675-
let curr = self.sm.get_maybe_expired_kv(&key).await?;
676-
677-
if let Some(seq_v) = &curr {
678-
assert_eq!(expire_key.seq, seq_v.seq);
679-
info!("clean expired: {}, {}", key, expire_key);
680-
681-
self.upsert_kv(&UpsertKV::delete(key.clone())).await?;
682-
} else {
683-
unreachable!(
684-
"trying to remove un-cleanable: {}, {}, kv-entry: {:?}",
685-
key, expire_key, curr
686-
);
687-
}
680+
let upsert = UpsertKV::delete(key);
681+
self.upsert_kv(&upsert).await?;
682+
info!("clean expired: {} {expire_key}", upsert.key);
688683
}
689684

690685
Ok(())

โ€Žsrc/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::fmt;
1616
use std::fmt::Formatter;
1717
use std::io;
18+
use std::time::Duration;
1819

1920
use databend_common_meta_types::raft_types::Entry;
2021
use databend_common_meta_types::raft_types::StorageError;
@@ -40,6 +41,9 @@ type OnChange = Box<dyn Fn((String, Option<SeqV>, Option<SeqV>)) + Send + Sync>;
4041
pub struct SMV003 {
4142
levels: LeveledMap,
4243

44+
/// Since when to start cleaning expired keys.
45+
cleanup_start_time_ms: Duration,
46+
4347
/// Callback when a change is applied to state machine
4448
pub(crate) on_change_applied: Option<OnChange>,
4549
}
@@ -77,6 +81,14 @@ impl StateMachineApi<SysData> for SMV003 {
7781
&mut self.levels
7882
}
7983

84+
fn cleanup_start_timestamp(&self) -> Duration {
85+
self.cleanup_start_time_ms
86+
}
87+
88+
fn set_cleanup_start_timestamp(&mut self, timestamp: Duration) {
89+
self.cleanup_start_time_ms = timestamp;
90+
}
91+
8092
fn sys_data_mut(&mut self) -> &mut SysData {
8193
self.levels.sys_data_mut()
8294
}

โ€Žsrc/meta/raft-store/src/state_machine_api_ext.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use display_more::DisplayUnixTimeStampExt;
2727
use futures_util::StreamExt;
2828
use futures_util::TryStreamExt;
2929
use log::debug;
30+
use log::info;
3031
use log::warn;
3132
use map_api::map_api::MapApi;
3233
use map_api::map_api_ro::MapApiRO;
@@ -192,12 +193,36 @@ pub trait StateMachineApiExt: StateMachineApi<SysData> {
192193
&self,
193194
curr_time_ms: u64,
194195
) -> Result<IOResultStream<(ExpireKey, String)>, io::Error> {
196+
// Since the last saved cleanup timestamp
197+
let start_ms = self.cleanup_start_timestamp().as_millis() as u64;
198+
let start = ExpireKey::new(start_ms, 0);
199+
195200
// curr_time > expire_at => expired
196201
let end = ExpireKey::new(curr_time_ms, 0);
197202

198-
let strm = self.expire_map().range(..end).await?;
203+
let msg = {
204+
let start = Duration::from_millis(start_ms);
205+
let end = Duration::from_millis(curr_time_ms);
206+
let msg = format!(
207+
"list_expire_index: [{}, {}); interval: {:?}",
208+
start.display_unix_timestamp_short(),
209+
end.display_unix_timestamp_short(),
210+
end.saturating_sub(start)
211+
);
212+
213+
info!("{}", msg);
214+
215+
msg
216+
};
217+
218+
if start >= end {
219+
// No expired entries
220+
return Ok(futures_util::stream::empty().boxed());
221+
}
222+
223+
let strm = self.expire_map().range(start..end).await?;
199224

200-
let strm = add_cooperative_yielding(strm, format!("list_expire_index up to {end}"))
225+
let strm = add_cooperative_yielding(strm, msg)
201226
// Return only non-deleted records
202227
.try_filter_map(|(k, seq_marked)| {
203228
let expire_entry = seq_marked.into_data().map(|v| (k, v));

โ€Žsrc/meta/raft-store/src/utils.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@ where
3131
T: Send + 'static,
3232
{
3333
stream.enumerate().then(move |(index, item)| {
34-
// Yield control every 100 items to prevent blocking other tasks
35-
let to_yield = if index % 100 == 0 {
36-
if index % 5000 == 0 {
37-
info!("{stream_name} yield control to allow other tasks to run: index={index}");
38-
}
34+
// Yield control every n items to prevent blocking other tasks
35+
let to_yield = if index > 0 && index % 5000 == 0 {
36+
info!("{stream_name} yield control to allow other tasks to run: index={index}");
3937
true
4038
} else {
4139
false

0 commit comments

Comments
ย (0)