Skip to content

Commit e9262e8

Browse files
authored
refactor: compactor internal structure (#18738)
- Add DropCallback to call a callback when being dropped. - Remove `CompactingData`, use `LeveledMap` directly. - Refine `WriterPermit` and `CompactorPermit` logging. - When building snapshot, it should acquire both the writer and compactor permits, because it needs to modify both the writable and the `immutable` data.
1 parent c762a5a commit e9262e8

File tree

21 files changed

+291
-275
lines changed

21 files changed

+291
-275
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
17+
/// Call a callback when dropped.
18+
pub struct DropCallback {
19+
callback: Option<Box<dyn FnOnce() + Send + 'static>>,
20+
}
21+
22+
impl fmt::Debug for DropCallback {
23+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24+
write!(f, "DropCallback")
25+
}
26+
}
27+
28+
impl DropCallback {
29+
pub fn new(callback: impl FnOnce() + Send + 'static) -> Self {
30+
DropCallback {
31+
callback: Some(Box::new(callback)),
32+
}
33+
}
34+
}
35+
36+
impl Drop for DropCallback {
37+
fn drop(&mut self) {
38+
if let Some(callback) = self.callback.take() {
39+
callback();
40+
}
41+
}
42+
}
43+
44+
#[cfg(test)]
45+
mod tests {
46+
use std::sync::atomic::AtomicBool;
47+
use std::sync::atomic::Ordering;
48+
use std::sync::Arc;
49+
50+
use super::*;
51+
52+
#[test]
53+
fn test_drop_callback() {
54+
let called = Arc::new(AtomicBool::new(false));
55+
let called_clone = called.clone();
56+
{
57+
let _drop_callback = DropCallback::new(move || {
58+
called_clone.store(true, Ordering::SeqCst);
59+
});
60+
assert!(!called.load(Ordering::SeqCst));
61+
}
62+
assert!(called.load(Ordering::SeqCst));
63+
}
64+
}

src/common/base/src/base/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod build_info;
1616
mod dma;
17+
mod drop_callback;
1718
mod net;
1819
mod ordered_float;
1920
mod profiling;
@@ -37,6 +38,7 @@ pub use dma::dma_write_file_vectored;
3738
pub use dma::Alignment;
3839
pub use dma::DmaAllocator;
3940
pub use dma::DmaWriteBuf;
41+
pub use drop_callback::DropCallback;
4042
pub use net::get_free_tcp_port;
4143
pub use net::get_free_udp_port;
4244
pub use ordered_float::OrderedFloat;

src/meta/raft-store/src/applier/applier_data/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use crate::sm_v003::OnChange;
2222

2323
pub(crate) struct ApplierData {
2424
/// Hold a unique permit to serialize all apply operations to the state machine.
25-
pub(crate) _permit: WriterPermit,
25+
///
26+
/// Wrapping it in a Mutex to make it `Sync` while the permit itself is only `Send`.
27+
pub(crate) _permit: Mutex<WriterPermit>,
2628

2729
pub(crate) view: StateMachineView,
2830

src/meta/raft-store/src/leveled_store/db_builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ impl DBBuilder {
137137
lm: &mut LeveledMap,
138138
make_snapshot_id: impl FnOnce(&SysData) -> String + Send,
139139
) -> Result<DB, io::Error> {
140-
lm.testing_freeze_writable();
140+
lm.freeze_writable_without_permit();
141141

142-
let compacting_data = lm.new_compacting_data();
143-
let (sys_data, strm) = compacting_data.compact_into_stream().await?;
142+
let immutable_data = lm.immutable_data();
143+
let (sys_data, strm) = immutable_data.compact_into_stream().await?;
144144

145145
self.append_kv_stream(strm).await?;
146146

src/meta/raft-store/src/leveled_store/db_scoped_seq_bounded_read_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async fn test_db_scoped_seq_bounded_read() -> anyhow::Result<()> {
4141

4242
a.commit().await?;
4343

44-
sm.levels_mut().testing_freeze_writable();
44+
sm.levels_mut().freeze_writable_without_permit();
4545

4646
let mut a = sm.new_applier().await;
4747

src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs renamed to src/meta/raft-store/src/leveled_store/immutable_data/compact_into_stream.rs

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,80 +14,26 @@
1414

1515
use std::fmt;
1616
use std::io;
17-
use std::ops::Deref;
18-
use std::sync::Arc;
1917

20-
use databend_common_meta_types::snapshot_db::DB;
2118
use databend_common_meta_types::sys_data::SysData;
2219
use futures_util::future;
2320
use futures_util::StreamExt;
2421
use futures_util::TryStreamExt;
2522
use map_api::mvcc::ScopedSeqBoundedRange;
2623
use map_api::IOResultStream;
2724
use map_api::MapKV;
28-
use rotbl::v001::SeqMarked;
25+
use seq_marked::SeqMarked;
2926
use state_machine_api::ExpireKey;
3027
use state_machine_api::UserKey;
3128
use stream_more::KMerge;
3229
use stream_more::StreamMore;
3330

3431
use crate::leveled_store::immutable_data::ImmutableData;
35-
use crate::leveled_store::immutable_levels::ImmutableLevels;
3632
use crate::leveled_store::rotbl_codec::RotblCodec;
3733
use crate::leveled_store::util;
3834
use crate::utils::add_cooperative_yielding;
3935

40-
/// The data to compact.
41-
///
42-
/// Including several in-memory immutable levels and an optional persisted db.
43-
#[derive(Debug)]
44-
pub(crate) struct CompactingData {
45-
pub(crate) immutable: Arc<ImmutableData>,
46-
}
47-
48-
impl Deref for CompactingData {
49-
type Target = Arc<ImmutableData>;
50-
51-
fn deref(&self) -> &Self::Target {
52-
&self.immutable
53-
}
54-
}
55-
56-
impl CompactingData {
57-
pub fn new(immutable: Arc<ImmutableData>) -> Self {
58-
Self { immutable }
59-
}
60-
61-
// Testing only
62-
#[allow(dead_code)]
63-
pub(crate) fn new_from_levels_and_persisted(
64-
levels: ImmutableLevels,
65-
persisted: Option<DB>,
66-
) -> Self {
67-
let immutable = ImmutableData::new(levels, persisted);
68-
Self {
69-
immutable: Arc::new(immutable),
70-
}
71-
}
72-
73-
/// Compact in-memory immutable levels(excluding on disk db)
74-
/// into one level and keep tombstone record.
75-
///
76-
/// When compact mem levels, do not remove tombstone,
77-
/// because tombstones are still required when compacting with the underlying db.
78-
///
79-
/// This is only used for test
80-
pub async fn compact_immutable_in_place(&mut self) -> Result<(), io::Error> {
81-
// TODO: test: after compaction in place, the data should be the same, the base_seq and newest_seq should be the same.
82-
let immutable_levels = self.immutable.levels().clone();
83-
84-
let levels = immutable_levels.compact_all().await;
85-
let immutable = ImmutableData::new(levels, self.immutable.persisted().cloned());
86-
self.immutable = Arc::new(immutable);
87-
88-
Ok(())
89-
}
90-
36+
impl ImmutableData {
9137
/// Compacted all data into a stream.
9238
///
9339
/// Tombstones are removed because no more compact with lower levels.
@@ -97,6 +43,7 @@ impl CompactingData {
9743
/// The stream Item is 2 items tuple of key, and value with seq.
9844
///
9945
/// The exported stream contains encoded `String` key and rotbl value [`SeqMarked`]
46+
// TODO: mvcc snapshot_seq
10047
pub async fn compact_into_stream(
10148
&self,
10249
) -> Result<(SysData, IOResultStream<(String, SeqMarked)>), io::Error> {
@@ -107,7 +54,7 @@ impl CompactingData {
10754
)
10855
}
10956

110-
let immutable_levels = self.immutable.levels();
57+
let immutable_levels = self.levels();
11158
let d = immutable_levels.newest().unwrap();
11259

11360
let sys_data = d.sys_data().clone();
@@ -141,7 +88,7 @@ impl CompactingData {
14188
let mut kmerge = KMerge::by(util::rotbl_by_key_seq);
14289
kmerge = kmerge.merge(strm);
14390

144-
if let Some(db) = self.immutable.persisted() {
91+
if let Some(db) = self.persisted() {
14592
let db_strm = db.inner_range();
14693
kmerge = kmerge.merge(db_strm);
14794
}

src/meta/raft-store/src/leveled_store/immutable_data/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use crate::leveled_store::map_api::MapKeyEncode;
3737
use crate::leveled_store::value_convert::ValueConvert;
3838
use crate::leveled_store::ScopedSeqBoundedRead;
3939

40+
mod compact_into_stream;
41+
4042
#[derive(Debug, Default, Clone)]
4143
pub struct ImmutableData {
4244
/// The last sequence of the immutable data.

src/meta/raft-store/src/leveled_store/immutable_levels/compact_all.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,24 @@ impl ImmutableLevels {
5959

6060
#[cfg(test)]
6161
mod tests {
62+
use std::ops::Deref;
63+
6264
use databend_common_meta_types::raft_types::Membership;
6365
use databend_common_meta_types::raft_types::StoredMembership;
6466
use futures_util::TryStreamExt;
6567
use map_api::mvcc::ScopedSeqBoundedRange;
6668
use openraft::testing::log_id;
6769
use seq_marked::SeqMarked;
6870
use state_machine_api::ExpireKey;
71+
use state_machine_api::KVMeta;
6972
use state_machine_api::UserKey;
7073

7174
use crate::sm_v003::compact_immutable_levels_test::build_3_levels;
75+
use crate::sm_v003::compact_immutable_levels_test::build_sm_with_expire;
76+
77+
fn s(x: impl ToString) -> String {
78+
x.to_string()
79+
}
7280

7381
fn b(x: impl ToString) -> Vec<u8> {
7482
x.to_string().as_bytes().to_vec()
@@ -82,7 +90,7 @@ mod tests {
8290
async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> {
8391
let lm = build_3_levels().await?;
8492

85-
lm.testing_freeze_writable();
93+
lm.freeze_writable_without_permit();
8694
let immutable_levels = lm.immutable_levels();
8795

8896
// Capture the original newest level's index before compaction
@@ -132,4 +140,55 @@ mod tests {
132140

133141
Ok(())
134142
}
143+
144+
#[tokio::test]
145+
async fn test_compact_expire_index() -> anyhow::Result<()> {
146+
let sm = build_sm_with_expire().await?;
147+
148+
let immutable_levels = {
149+
sm.leveled_map().freeze_writable_without_permit();
150+
let compactor = sm.acquire_compactor("").await;
151+
let immutable_levels = compactor.immutable_levels();
152+
immutable_levels.compact_all().await
153+
};
154+
155+
let d = immutable_levels.newest().unwrap().deref();
156+
157+
let got = d
158+
.range(UserKey::default().., u64::MAX)
159+
.await?
160+
.try_collect::<Vec<_>>()
161+
.await?;
162+
163+
assert_eq!(got, vec![
164+
//
165+
(
166+
user_key("a"),
167+
SeqMarked::new_normal(4, (Some(KVMeta::new_expires_at(15)), b("a1")))
168+
),
169+
(
170+
user_key("b"),
171+
SeqMarked::new_normal(2, (Some(KVMeta::new_expires_at(5)), b("b0")))
172+
),
173+
(
174+
user_key("c"),
175+
SeqMarked::new_normal(3, (Some(KVMeta::new_expires_at(20)), b("c0")))
176+
),
177+
]);
178+
179+
let got = d
180+
.range(ExpireKey::default().., u64::MAX)
181+
.await?
182+
.try_collect::<Vec<_>>()
183+
.await?;
184+
assert_eq!(got, vec![
185+
//
186+
(ExpireKey::new(5_000, 2), SeqMarked::new_normal(2, s("b"))),
187+
(ExpireKey::new(10_000, 1), SeqMarked::new_tombstone(4)),
188+
(ExpireKey::new(15_000, 4), SeqMarked::new_normal(4, s("a"))),
189+
(ExpireKey::new(20_000, 3), SeqMarked::new_normal(3, s("c"))),
190+
]);
191+
192+
Ok(())
193+
}
135194
}

src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use crate::sm_v003::SMV003;
2121
async fn test_blocking_wait_timeout() -> anyhow::Result<()> {
2222
let lm = SMV003::default();
2323

24-
let _c = lm.acquire_compactor().await;
24+
let _c = lm.acquire_compactor("").await;
2525

2626
let (tx, rx) = oneshot::channel();
2727

2828
let _ = timeout(std::time::Duration::from_secs(1), async {
29-
let _got = lm.acquire_compactor().await;
29+
let _got = lm.acquire_compactor("").await;
3030
let _ = tx.send(true);
3131
})
3232
.await;
@@ -43,11 +43,11 @@ async fn test_blocking_wait_timeout() -> anyhow::Result<()> {
4343
async fn test_blocking_wait_ok() -> anyhow::Result<()> {
4444
let lm = SMV003::default();
4545

46-
let _c = lm.acquire_compactor().await;
46+
let _c = lm.acquire_compactor("").await;
4747

4848
let (tx, rx) = oneshot::channel();
4949
databend_common_base::runtime::spawn(async move {
50-
let _got = lm.acquire_compactor().await;
50+
let _got = lm.acquire_compactor("").await;
5151
let _ = tx.send(true);
5252
});
5353

0 commit comments

Comments
 (0)