Skip to content

Commit f5a216f

Browse files
authored
feat(meta-service): add expirable keys stat to metrics and cluster-status API (#18179)
This commit counts the user keys and the expirable keys when building a snapshot, and save these two number in the snapshot meta data. These two values are: - reported to metrics(http API `/v1/metrics`) under the keys: - `snapshot_primary_index_count` - `snapshot_expire_index_count` - and can be retrieved in the cluster-status http API(`/v1/cluster/status`), in the key `snapshot_key_space_stat`, which is a `BTreeMap` of `String` to `u64`: ``` "snapshot_key_space_stat": { "exp-": 1, "kv--": 5 }, ``` Also refactors MetaKvApi implementation into a standalone struct for better separation of concerns. This commit is Non-Breaking despite on-disk format modification.
1 parent 1f2cd7f commit f5a216f

File tree

11 files changed

+579
-63
lines changed

11 files changed

+579
-63
lines changed

src/meta/raft-store/src/leveled_store/key_spaces_impl.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ use std::io::Error;
2121
use crate::leveled_store::map_api::MapKey;
2222
use crate::leveled_store::map_api::MapKeyDecode;
2323
use crate::leveled_store::map_api::MapKeyEncode;
24+
use crate::leveled_store::map_api::MapKeyPrefix;
2425
use crate::state_machine::ExpireKey;
2526

2627
impl MapKeyEncode for String {
27-
const PREFIX: &'static str = "kv--";
28+
const PREFIX: MapKeyPrefix = "kv--";
2829

2930
fn encode<W: fmt::Write>(&self, mut w: W) -> Result<(), fmt::Error> {
3031
w.write_str(self.as_str())
@@ -38,7 +39,7 @@ impl MapKeyDecode for String {
3839
}
3940

4041
impl MapKeyEncode for ExpireKey {
41-
const PREFIX: &'static str = "exp-";
42+
const PREFIX: MapKeyPrefix = "exp-";
4243

4344
fn encode<W: fmt::Write>(&self, mut w: W) -> Result<(), fmt::Error> {
4445
// max u64 len is 20: 18446744073709551616

src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ impl<'a> CompactingData<'a> {
8888
///
8989
/// It returns a small chunk of sys data that is always copied across levels,
9090
/// and a stream contains `kv` and `expire` entries.
91+
/// The stream Item is 2 items tuple of key, and value with seq.
9192
///
9293
/// The exported stream contains encoded `String` key and rotbl value [`SeqMarked`]
9394
pub async fn compact(
@@ -109,15 +110,18 @@ impl<'a> CompactingData<'a> {
109110

110111
let strm = (*self.immutable_levels).expire_map().range(..).await?;
111112
let expire_strm = strm.map(|item: Result<(ExpireKey, Marked<String>), io::Error>| {
112-
let (k, v) = item?;
113-
RotblCodec::encode_key_seq_marked(&k, v).map_err(|e| with_context(e, &k))
113+
let (expire_key, marked_string) = item?;
114+
115+
RotblCodec::encode_key_seq_marked(&expire_key, marked_string)
116+
.map_err(|e| with_context(e, &expire_key))
114117
});
115118

116119
// kv: prefix: `kv--/`
117120

118121
let strm = (*self.immutable_levels).str_map().range(..).await?;
119122
let kv_strm = strm.map(|item: Result<(String, Marked), io::Error>| {
120123
let (k, v) = item?;
124+
121125
RotblCodec::encode_key_seq_marked(&k, v).map_err(|e| with_context(e, &k))
122126
});
123127

src/meta/raft-store/src/leveled_store/map_api.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,15 @@ pub use map_api::IOResultStream;
2929
use crate::marked::Marked;
3030
use crate::state_machine::ExpireKey;
3131

32+
pub type MapKeyPrefix = &'static str;
33+
3234
pub trait MapKeyEncode {
3335
/// PREFIX is the prefix of the key used to define key space in the on-disk storage.
34-
const PREFIX: &'static str;
36+
const PREFIX: MapKeyPrefix;
37+
38+
fn prefix(&self) -> MapKeyPrefix {
39+
Self::PREFIX
40+
}
3541

3642
fn encode<W: Write>(&self, w: W) -> Result<(), fmt::Error>;
3743
}

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::collections::BTreeSet;
1617
use std::net::Ipv4Addr;
1718
use std::sync::atomic::AtomicI32;
@@ -91,6 +92,7 @@ use crate::message::LeaveRequest;
9192
use crate::meta_service::errors::grpc_error_to_network_err;
9293
use crate::meta_service::forwarder::MetaForwarder;
9394
use crate::meta_service::meta_leader::MetaLeader;
95+
use crate::meta_service::meta_node_kv_api_impl::MetaKVApi;
9496
use crate::meta_service::meta_node_status::MetaNodeStatus;
9597
use crate::meta_service::watcher::DispatcherHandle;
9698
use crate::meta_service::watcher::WatchTypes;
@@ -466,6 +468,17 @@ impl MetaNode {
466468
// metrics about server storage
467469
server_metrics::set_raft_log_size(meta_node.get_raft_log_size().await);
468470
server_metrics::set_snapshot_key_count(meta_node.get_snapshot_key_count().await);
471+
{
472+
let stat = meta_node.get_snapshot_key_space_stat().await;
473+
474+
server_metrics::set_snapshot_primary_index_count(
475+
stat.get("kv--").copied().unwrap_or_default(),
476+
);
477+
478+
server_metrics::set_snapshot_expire_index_count(
479+
stat.get("exp-").copied().unwrap_or_default(),
480+
)
481+
}
469482

470483
last_leader = mm.current_leader;
471484
}
@@ -874,6 +887,10 @@ impl MetaNode {
874887
.unwrap_or_default()
875888
}
876889

890+
async fn get_snapshot_key_space_stat(&self) -> BTreeMap<String, u64> {
891+
self.raft_store.get_snapshot_key_space_stat().await
892+
}
893+
877894
pub async fn get_status(&self) -> Result<MetaNodeStatus, MetaError> {
878895
let voters = self
879896
.raft_store
@@ -892,6 +909,7 @@ impl MetaNode {
892909

893910
let raft_log_status = self.get_raft_log_stat().await.into();
894911
let snapshot_key_count = self.get_snapshot_key_count().await;
912+
let snapshot_key_space_stat = self.get_snapshot_key_space_stat().await;
895913

896914
let metrics = self.raft.metrics().borrow().clone();
897915

@@ -910,6 +928,7 @@ impl MetaNode {
910928
endpoint: endpoint.to_string(),
911929
raft_log: raft_log_status,
912930
snapshot_key_count,
931+
snapshot_key_space_stat,
913932
state: format!("{:?}", metrics.state),
914933
is_leader: metrics.state == openraft::ServerState::Leader,
915934
current_term: metrics.current_term,
@@ -1175,6 +1194,11 @@ impl MetaNode {
11751194

11761195
Ok(stream_sender)
11771196
}
1197+
1198+
/// Get a kvapi::KVApi implementation.
1199+
pub fn kv_api(&self) -> MetaKVApi {
1200+
MetaKVApi::new(self)
1201+
}
11781202
}
11791203

11801204
pub(crate) fn event_filter_from_filter_type(filter_type: FilterType) -> EventFilter {

src/meta/service/src/meta_service/meta_node_kv_api_impl.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,24 @@ use crate::message::ForwardRequest;
3434
use crate::meta_service::MetaNode;
3535
use crate::metrics::server_metrics;
3636

37-
/// Impl kvapi::KVApi for MetaNode.
38-
///
39-
/// Write through raft-log.
40-
/// Read through local state machine, which may not be consistent.
41-
/// E.g. Read is not guaranteed to see a write.
37+
/// A wrapper of MetaNode that implements kvapi::KVApi.
38+
pub struct MetaKVApi<'a> {
39+
inner: &'a MetaNode,
40+
}
41+
42+
impl<'a> MetaKVApi<'a> {
43+
pub fn new(inner: &'a MetaNode) -> Self {
44+
Self { inner }
45+
}
46+
}
47+
4248
#[async_trait]
43-
impl kvapi::KVApi for MetaNode {
49+
impl<'a> kvapi::KVApi for MetaKVApi<'a> {
4450
type Error = MetaAPIError;
4551

4652
async fn upsert_kv(&self, act: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
4753
let ent = LogEntry::new(Cmd::UpsertKV(act));
48-
let rst = self.write(ent).await?;
54+
let rst = self.inner.write(ent).await?;
4955

5056
match rst {
5157
AppliedState::KV(x) => Ok(x),
@@ -62,6 +68,7 @@ impl kvapi::KVApi for MetaNode {
6268
};
6369

6470
let res = self
71+
.inner
6572
.handle_forwardable_request(ForwardRequest::new(1, MetaGrpcReadReq::MGetKV(req)))
6673
.await;
6774

@@ -81,6 +88,7 @@ impl kvapi::KVApi for MetaNode {
8188
};
8289

8390
let res = self
91+
.inner
8492
.handle_forwardable_request(ForwardRequest::new(1, MetaGrpcReadReq::ListKV(req)))
8593
.await;
8694

@@ -98,7 +106,7 @@ impl kvapi::KVApi for MetaNode {
98106
info!("MetaNode::transaction(): {}", txn);
99107

100108
let ent = LogEntry::new(Cmd::Transaction(txn));
101-
let rst = self.write(ent).await?;
109+
let rst = self.inner.write(ent).await?;
102110

103111
match rst {
104112
AppliedState::TxnReply(x) => Ok(x),
@@ -108,3 +116,32 @@ impl kvapi::KVApi for MetaNode {
108116
}
109117
}
110118
}
119+
120+
/// Impl kvapi::KVApi for MetaNode.
121+
///
122+
/// Write through raft-log.
123+
/// Read through local state machine, which may not be consistent.
124+
/// E.g. Read is not guaranteed to see a write.
125+
#[async_trait]
126+
impl kvapi::KVApi for MetaNode {
127+
type Error = MetaAPIError;
128+
129+
async fn upsert_kv(&self, act: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
130+
self.kv_api().upsert_kv(act).await
131+
}
132+
133+
#[fastrace::trace]
134+
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
135+
self.kv_api().get_kv_stream(keys).await
136+
}
137+
138+
#[fastrace::trace]
139+
async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
140+
self.kv_api().list_kv(prefix).await
141+
}
142+
143+
#[fastrace::trace]
144+
async fn transaction(&self, txn: TxnRequest) -> Result<TxnReply, Self::Error> {
145+
self.kv_api().transaction(txn).await
146+
}
147+
}

src/meta/service/src/meta_service/meta_node_status.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_common_meta_types::node::Node;
2020
use databend_common_meta_types::raft_types::LogId;
2121
use databend_common_meta_types::raft_types::NodeId;
2222

23-
#[derive(serde::Serialize)]
23+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
2424
pub struct MetaNodeStatus {
2525
pub id: NodeId,
2626

@@ -39,6 +39,11 @@ pub struct MetaNodeStatus {
3939
/// Total number of keys in current snapshot
4040
pub snapshot_key_count: u64,
4141

42+
/// The count of keys in each key space in the snapshot data.
43+
#[serde(default)]
44+
#[serde(skip_serializing_if = "BTreeMap::is_empty")]
45+
pub snapshot_key_space_stat: BTreeMap<String, u64>,
46+
4247
/// Server state, one of "Follower", "Learner", "Candidate", "Leader".
4348
pub state: String,
4449

src/meta/service/src/metrics/meta_metrics.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,18 @@ pub mod server_metrics {
5353
node_is_health: Gauge,
5454
leader_changes: Counter,
5555
applying_snapshot: Gauge,
56+
57+
/// Primary index is index by string key. Each primary index has an optional expire index key.
58+
///
59+
/// `snapshot_key_count = snapshot_primary_index_count + snapshot_expire_index_count`
5660
snapshot_key_count: Gauge,
5761

62+
/// `snapshot_key_count = snapshot_primary_index_count + snapshot_expire_index_count`
63+
snapshot_primary_index_count: Gauge,
64+
65+
/// `snapshot_key_count = snapshot_primary_index_count + snapshot_expire_index_count`
66+
snapshot_expire_index_count: Gauge,
67+
5868
raft_log_cache_items: Gauge,
5969
raft_log_cache_used_size: Gauge,
6070
raft_log_wal_open_chunk_size: Gauge,
@@ -83,6 +93,9 @@ pub mod server_metrics {
8393
leader_changes: Counter::default(),
8494
applying_snapshot: Gauge::default(),
8595
snapshot_key_count: Gauge::default(),
96+
snapshot_primary_index_count: Gauge::default(),
97+
snapshot_expire_index_count: Gauge::default(),
98+
8699
raft_log_cache_items: Gauge::default(),
87100
raft_log_cache_used_size: Gauge::default(),
88101
raft_log_wal_open_chunk_size: Gauge::default(),
@@ -128,6 +141,16 @@ pub mod server_metrics {
128141
"number of keys in the last snapshot",
129142
metrics.snapshot_key_count.clone(),
130143
);
144+
registry.register(
145+
key!("snapshot_primary_index_count"),
146+
"number of primary keys in the last snapshot",
147+
metrics.snapshot_primary_index_count.clone(),
148+
);
149+
registry.register(
150+
key!("snapshot_expire_index_count"),
151+
"number of expire index keys in the last snapshot",
152+
metrics.snapshot_expire_index_count.clone(),
153+
);
131154

132155
registry.register(
133156
key!("raft_log_cache_items"),
@@ -228,6 +251,12 @@ pub mod server_metrics {
228251
pub fn set_snapshot_key_count(n: u64) {
229252
SERVER_METRICS.snapshot_key_count.set(n as i64);
230253
}
254+
pub fn set_snapshot_primary_index_count(n: u64) {
255+
SERVER_METRICS.snapshot_primary_index_count.set(n as i64);
256+
}
257+
pub fn set_snapshot_expire_index_count(n: u64) {
258+
SERVER_METRICS.snapshot_expire_index_count.set(n as i64);
259+
}
231260

232261
pub fn set_raft_log_stat(st: RaftLogStat) {
233262
SERVER_METRICS

src/meta/service/src/store/store_inner.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::fs;
1617
use std::io;
1718
use std::io::ErrorKind;
@@ -192,7 +193,7 @@ impl RaftStoreInner {
192193
w.acquire_compactor().await
193194
};
194195

195-
let (sys_data, mut strm) = compactor
196+
let (mut sys_data, mut strm) = compactor
196197
.compact()
197198
.await
198199
.map_err(|e| StorageError::read_snapshot(None, &e))?;
@@ -224,6 +225,16 @@ impl RaftStoreInner {
224225
.await
225226
.map_err(|e| StorageError::read_snapshot(None, &e))?
226227
{
228+
// The first 4 chars are key space, such as: "kv--/" or "exp-/"
229+
// Get the first 4 chars as key space.
230+
let prefix = &ent.0.as_str()[..4];
231+
let ks = sys_data.key_counts_mut();
232+
if let Some(count) = ks.get_mut(prefix) {
233+
*count += 1;
234+
} else {
235+
ks.insert(prefix.to_string(), 1);
236+
}
237+
227238
tx.send(WriteEntry::Data(ent))
228239
.await
229240
.map_err(|e| StorageError::write_snapshot(Some(signature.clone()), &e))?;
@@ -283,6 +294,21 @@ impl RaftStoreInner {
283294
Some(db.stat().key_num)
284295
}
285296

297+
/// Returns the count of keys in each key space from the snapshot data.
298+
///
299+
/// Key spaces include:
300+
/// - `"exp-"`: expire index data
301+
/// - `"kv--"`: key-value data
302+
///
303+
/// Returns an empty map if no snapshot exists.
304+
pub(crate) async fn get_snapshot_key_space_stat(&self) -> BTreeMap<String, u64> {
305+
let sm = self.state_machine.read().await;
306+
let Some(db) = sm.levels().persisted() else {
307+
return Default::default();
308+
};
309+
db.sys_data().key_counts().clone()
310+
}
311+
286312
/// Install a snapshot to build a state machine from it and replace the old state machine with the new one.
287313
#[fastrace::trace]
288314
pub async fn do_install_snapshot(&self, db: DB) -> Result<(), MetaStorageError> {

0 commit comments

Comments
 (0)