Skip to content

Commit 810a557

Browse files
authored
refactor(meta-service): merge Marked and SeqMarked into one (#18350)
* refactor(meta-service): merge `Marked` and `SeqMarked` into one These two structures provide similar functionality and should be just one. Internally, `SeqMarked` is a type for value with sequence number and tombstone mark. Since this commit it is defined in a shared crate `seqmarked` and reused by all depending crates: - `rotbl`: the snapshot storage impl; - `map-api`: abstract map like API; - databend raft-store impl * M Cargo.lock * M Cargo.lock * M src/meta/raft-store/Cargo.toml * M Cargo.toml * M src/meta/raft-store/src/leveled_store/db_map_api_ro_test.rs * M src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs
1 parent ad37f97 commit 810a557

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1096
-634
lines changed

Cargo.lock

Lines changed: 10 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ logforth = { git = "https://github.com/datafuse-extras/logforth", branch = "glob
381381
'fastrace',
382382
] }
383383
lz4 = "1.24.0"
384-
map-api = { version = "0.2.3" }
384+
map-api = { version = "0.2.5" }
385385
maplit = "1.0.2"
386386
match-template = "0.0.1"
387387
md-5 = "0.10.5"
@@ -466,7 +466,7 @@ reqwest-hickory-resolver = "0.2"
466466
ringbuffer = "0.14.2"
467467
rmp-serde = "1.1.1"
468468
roaring = { version = "^0.10", features = ["serde"] }
469-
rotbl = { version = "0.2.3", features = [] }
469+
rotbl = { version = "0.2.6", features = [] }
470470
rust_decimal = "1.26"
471471
rustix = "0.38.37"
472472
rustls = { version = "0.23.27", features = ["ring", "tls12"], default-features = false }
@@ -476,6 +476,7 @@ rustyline = "14"
476476
scroll = "0.12.0"
477477
self_cell = "1.2.0"
478478
semver = "1.0.14"
479+
seq-marked = { version = "0.3.1", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
479480
serde = { version = "1.0.164", features = ["derive", "rc"] }
480481
serde_derive = "1"
481482
serde_ignored = "0.1.10"
@@ -657,7 +658,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
657658
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
658659
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
659660
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.2.0" }
660-
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.3" }
661+
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.6" }
661662
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
662663
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" }
663664
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "d82aa6d" }

src/meta/api/src/kv_pb_api/codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ where
8686
let x: Result<_, PbDecodeError> = try {
8787
let p: T::PB = prost::Message::decode(buf.as_ref())?;
8888
let v: T = FromToProto::from_pb(p)?;
89-
SeqV::with_meta(seqv.seq, seqv.meta, v)
89+
SeqV::new_with_meta(seqv.seq, seqv.meta, v)
9090
};
9191

9292
x.map_err(|e| e.with_context(context()))

src/meta/api/src/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ where
8989
if let Some(pb_seqv) = resp.value {
9090
let seqv = SeqV::from(pb_seqv);
9191
let value = deserialize_struct::<K::ValueType>(&seqv.data)?;
92-
let seqv = SeqV::with_meta(seqv.seq, seqv.meta, value);
92+
let seqv = SeqV::new_with_meta(seqv.seq, seqv.meta, value);
9393
Ok((key, Some(seqv)))
9494
} else {
9595
Ok((key, None))
@@ -111,7 +111,7 @@ where K: kvapi::Key {
111111
if let Some(pb_seqv) = resp.value {
112112
let seqv = SeqV::from(pb_seqv);
113113
let id = deserialize_u64(&seqv.data)?;
114-
let seqv = SeqV::with_meta(seqv.seq, seqv.meta, id);
114+
let seqv = SeqV::new_with_meta(seqv.seq, seqv.meta, id);
115115
Ok((key, Some(seqv)))
116116
} else {
117117
Ok((key, None))

src/meta/kvapi-test-suite/src/kvapi_test_suite.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::time::Duration;
16+
use std::time::SystemTime;
1617

1718
use databend_common_meta_kvapi::kvapi;
1819
use databend_common_meta_types::protobuf as pb;
@@ -281,7 +282,7 @@ impl TestSuite {
281282
assert!(res.is_none(), "got expired");
282283
}
283284

284-
let now_sec = SeqV::<()>::now_sec();
285+
let now_sec = since_epoch_secs();
285286

286287
info!("--- expired entry act as if it does not exist, an ADD op should apply");
287288
{
@@ -293,7 +294,7 @@ impl TestSuite {
293294
.with(MetaSpec::new_expire(now_sec - 1)),
294295
)
295296
.await?;
296-
// dbg!("update expired k1", _res);
297+
// dbg!("update on expired k1", _res);
297298

298299
let _res = kv
299300
.upsert_kv(
@@ -305,8 +306,14 @@ impl TestSuite {
305306
// dbg!("update non expired k2", _res);
306307

307308
info!("--- mget should not return expired");
308-
let mut res = kv.mget_kv(&["k1".to_string(), "k2".to_string()]).await?;
309309
{
310+
// let got = kv.get_kv("k1").await?;
311+
// dbg!("k1", got);
312+
// let got = kv.get_kv("k2").await?;
313+
// dbg!("k2", got);
314+
315+
let mut res = kv.mget_kv(&["k1".to_string(), "k2".to_string()]).await?;
316+
// dbg!(&res);
310317
assert_eq!(res[0], None);
311318

312319
let v2 = res.remove(1).unwrap();
@@ -362,7 +369,7 @@ impl TestSuite {
362369

363370
info!("--- expired in milliseconds");
364371
{
365-
let now_sec = SeqV::<()>::now_sec();
372+
let now_sec = since_epoch_secs();
366373

367374
// expire time in milliseconds
368375
kv.upsert_kv(
@@ -451,7 +458,7 @@ impl TestSuite {
451458

452459
let test_key = "test_key_for_update_meta";
453460

454-
let now_sec = SeqV::<()>::now_sec();
461+
let now_sec = since_epoch_secs();
455462

456463
let r = kv.upsert_kv(UpsertKV::update(test_key, b"v1")).await?;
457464
assert_eq!(Some(SeqV::new(1, b("v1"))), r.result);
@@ -872,7 +879,7 @@ impl TestSuite {
872879
TxnOpResponse {
873880
response: Some(txn_op_response::Response::Get(TxnGetResponse {
874881
key: txn_key1.clone(),
875-
value: Some(pb::SeqV::from(SeqV::with_meta(
882+
value: Some(pb::SeqV::from(SeqV::new_with_meta(
876883
6,
877884
Some(KVMeta::default()),
878885
val1_new.clone(),
@@ -884,7 +891,7 @@ impl TestSuite {
884891
response: Some(txn_op_response::Response::Delete(TxnDeleteResponse {
885892
key: txn_key1.clone(),
886893
success: true,
887-
prev_value: Some(pb::SeqV::from(SeqV::with_meta(
894+
prev_value: Some(pb::SeqV::from(SeqV::new_with_meta(
888895
6,
889896
Some(KVMeta::default()),
890897
val1_new.clone(),
@@ -1427,7 +1434,7 @@ impl TestSuite {
14271434

14281435
info!("--- {}", func_path!());
14291436

1430-
let now_ms = SeqV::<()>::now_ms();
1437+
let now_ms = since_epoch_millis();
14311438

14321439
let txn = TxnRequest::new(vec![], vec![
14331440
TxnOp::put_sequential("k1/", "seq1", b("v1")).with_expires_at_ms(Some(now_ms + 1000)),
@@ -1906,3 +1913,17 @@ fn normalize_txn_response(vs: Vec<TxnOpResponse>) -> Vec<TxnOpResponse> {
19061913
fn b(x: impl ToString) -> Vec<u8> {
19071914
x.to_string().as_bytes().to_vec()
19081915
}
1916+
1917+
fn since_epoch_secs() -> u64 {
1918+
since_epoch().as_secs()
1919+
}
1920+
1921+
fn since_epoch_millis() -> u64 {
1922+
since_epoch().as_millis() as u64
1923+
}
1924+
1925+
fn since_epoch() -> Duration {
1926+
SystemTime::now()
1927+
.duration_since(SystemTime::UNIX_EPOCH)
1928+
.unwrap()
1929+
}

src/meta/raft-store/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ raft-log = { workspace = true }
4242
rmp-serde = { workspace = true }
4343
rotbl = { workspace = true }
4444
semver = { workspace = true }
45+
seq-marked = { workspace = true, features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
4546
serde = { workspace = true }
4647
serde_json = { workspace = true }
4748
stream-more = { workspace = true }

src/meta/raft-store/src/applier/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,7 @@ where SM: StateMachineApi + 'static
675675

676676
for (expire_key, key) in to_clean {
677677
let curr = self.sm.get_maybe_expired_kv(&key).await?;
678+
678679
if let Some(seq_v) = &curr {
679680
assert_eq!(expire_key.seq, seq_v.seq);
680681
info!("clean expired: {}, {}", key, expire_key);

src/meta/raft-store/src/leveled_store/db_exporter.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,14 @@ impl<'a> DBExporter<'a> {
102102

103103
// kv
104104

105-
let strm = MapView(self.db).str_map().range(..).await?;
106-
let kv_strm = strm.try_filter_map(|(str_k, marked)| {
105+
let strm = MapView(self.db).user_map().range(..).await?;
106+
let kv_strm = strm.try_filter_map(|(user_key, seq_marked)| {
107107
// Tombstone will be converted to None and be ignored.
108-
let seqv: Option<SeqV<_>> = marked.into();
109-
let ent = seqv.map(|value| SMEntry::GenericKV { key: str_k, value });
108+
let seqv: Option<SeqV<_>> = seq_marked.into();
109+
let ent = seqv.map(|value| SMEntry::GenericKV {
110+
key: user_key.to_string(),
111+
value,
112+
});
110113
future::ready(Ok(ent))
111114
});
112115

src/meta/raft-store/src/leveled_store/db_map_api_ro_impl.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::io;
1616
use std::ops::RangeBounds;
1717

1818
use databend_common_meta_types::snapshot_db::DB;
19-
use databend_common_meta_types::KVMeta;
2019
use futures_util::StreamExt;
2120
use map_api::map_api_ro::MapApiRO;
2221
use rotbl::v001::SeqMarked;
@@ -27,31 +26,30 @@ use crate::leveled_store::map_api::MapKeyDecode;
2726
use crate::leveled_store::map_api::MapKeyEncode;
2827
use crate::leveled_store::rotbl_codec::RotblCodec;
2928
use crate::leveled_store::value_convert::ValueConvert;
30-
use crate::marked::Marked;
3129

3230
/// A wrapper that implements the `MapApiRO` trait for the `DB`.
3331
#[derive(Debug, Clone)]
3432
pub struct MapView<'a>(pub &'a DB);
3533

3634
#[async_trait::async_trait]
37-
impl<K> MapApiRO<K, KVMeta> for MapView<'_>
35+
impl<K> MapApiRO<K> for MapView<'_>
3836
where
39-
K: MapKey<KVMeta>,
37+
K: MapKey,
4038
K: MapKeyEncode,
4139
K: MapKeyDecode,
42-
Marked<K::V>: ValueConvert<SeqMarked>,
40+
SeqMarked<K::V>: ValueConvert<SeqMarked>,
4341
{
44-
async fn get(&self, key: &K) -> Result<Marked<K::V>, io::Error> {
42+
async fn get(&self, key: &K) -> Result<SeqMarked<K::V>, io::Error> {
4543
let key = RotblCodec::encode_key(key)
4644
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
4745

4846
let res = self.0.rotbl.get(&key).await?;
4947

5048
let Some(seq_marked) = res else {
51-
return Ok(Marked::empty());
49+
return Ok(SeqMarked::new_not_found());
5250
};
5351

54-
let marked = Marked::<K::V>::conv_from(seq_marked)?;
52+
let marked = SeqMarked::<K::V>::conv_from(seq_marked)?;
5553
Ok(marked)
5654
}
5755

@@ -65,7 +63,7 @@ where
6563
let strm = strm.map(|res_item: Result<(String, SeqMarked), io::Error>| {
6664
let (str_k, seq_marked) = res_item?;
6765
let key = RotblCodec::decode_key(&str_k)?;
68-
let marked = Marked::conv_from(seq_marked)?;
66+
let marked = SeqMarked::conv_from(seq_marked)?;
6967
Ok((key, marked))
7068
});
7169

0 commit comments

Comments
 (0)