Skip to content

Commit e366f7e

Browse files
authored
feat(meta-service): add metrics for stream access operations (#18210)
Previously, stream-based operations like `mget` and `list` were only counted as single RPCs in our metrics system, despite potentially sending large volumes of data. For example, a single `list` RPC fetching a large directory would appear the same as a simple operation in our metrics. This commit adds granular metrics to track the actual number of items sent in stream operations with the following counters: ``` metasrv_meta_network_stream_get_item_sent_total metasrv_meta_network_stream_mget_item_sent_total metasrv_meta_network_stream_list_item_sent_total ```
1 parent 89cb39a commit e366f7e

File tree

4 files changed

+79
-11
lines changed

4 files changed

+79
-11
lines changed

src/meta/client/src/grpc_action.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ impl MetaGrpcReadReq {
152152

153153
Ok(raft_request)
154154
}
155+
156+
pub fn type_name(&self) -> &'static str {
157+
match self {
158+
MetaGrpcReadReq::GetKV(_) => "get",
159+
MetaGrpcReadReq::MGetKV(_) => "mget",
160+
MetaGrpcReadReq::ListKV(_) => "list",
161+
}
162+
}
155163
}
156164

157165
impl RequestFor for GetKVReq {

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use databend_common_meta_types::LogEntry;
5353
use databend_common_meta_types::TxnReply;
5454
use databend_common_meta_types::TxnRequest;
5555
use databend_common_metrics::count::Count;
56+
use databend_common_tracing::start_trace_for_remote_request;
5657
use fastrace::func_name;
5758
use fastrace::func_path;
5859
use fastrace::prelude::*;
@@ -160,10 +161,8 @@ impl MetaServiceImpl {
160161
#[fastrace::trace]
161162
async fn handle_kv_read_v1(
162163
&self,
163-
request: Request<RaftRequest>,
164+
req: MetaGrpcReadReq,
164165
) -> Result<(Option<Endpoint>, BoxStream<StreamItem>), Status> {
165-
let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?;
166-
167166
debug!("{}: Received ReadRequest: {:?}", func_name!(), req);
168167

169168
let req = ForwardRequest::new(1, req);
@@ -296,8 +295,7 @@ impl MetaService for MetaServiceImpl {
296295
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
297296
let _guard = RequestInFlight::guard();
298297

299-
let root =
300-
databend_common_tracing::start_trace_for_remote_request(func_path!(), &request);
298+
let root = start_trace_for_remote_request(func_path!(), &request);
301299
let reply = self.handle_kv_api(request).in_span(root).await?;
302300

303301
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
@@ -316,12 +314,23 @@ impl MetaService for MetaServiceImpl {
316314
self.check_token(request.metadata())?;
317315

318316
let _guard = thread_tracking_guard(&request);
317+
318+
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
319+
320+
let root = start_trace_for_remote_request(func_path!(), &request);
321+
322+
let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?;
323+
let req_typ = req.type_name();
324+
319325
ThreadTracker::tracking_future(async move {
320-
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
321-
let root =
322-
databend_common_tracing::start_trace_for_remote_request(func_path!(), &request);
326+
let (endpoint, strm) = self.handle_kv_read_v1(req).in_span(root).await?;
323327

324-
let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?;
328+
let strm = strm
329+
.map(move |item| {
330+
network_metrics::incr_stream_sent_item(req_typ);
331+
item
332+
})
333+
.boxed();
325334

326335
let mut resp = Response::new(strm);
327336
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
@@ -343,8 +352,7 @@ impl MetaService for MetaServiceImpl {
343352
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
344353
let _guard = RequestInFlight::guard();
345354

346-
let root =
347-
databend_common_tracing::start_trace_for_remote_request(func_path!(), &request);
355+
let root = start_trace_for_remote_request(func_path!(), &request);
348356
let (endpoint, reply) = self.handle_txn(request).in_span(root).await?;
349357

350358
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);

src/meta/service/src/metrics/meta_metrics.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,7 @@ pub mod network_metrics {
690690
use std::time::Duration;
691691

692692
use databend_common_meta_types::protobuf::WatchResponse;
693+
use log::error;
693694
use prometheus_client::metrics::counter::Counter;
694695
use prometheus_client::metrics::gauge::Gauge;
695696
use prometheus_client::metrics::histogram::Histogram;
@@ -717,6 +718,15 @@ pub mod network_metrics {
717718

718719
/// Number of items sent when data changes in a watch stream.
719720
watch_change_item_sent: Counter,
721+
722+
/// Number of items sent in a stream get response.
723+
stream_get_item_sent: Counter,
724+
725+
/// Number of items sent in a stream mget response.
726+
stream_mget_item_sent: Counter,
727+
728+
/// Number of items sent in a stream list response.
729+
stream_list_item_sent: Counter,
720730
}
721731

722732
impl NetworkMetrics {
@@ -743,6 +753,10 @@ pub mod network_metrics {
743753

744754
watch_initialization_item_sent: Counter::default(),
745755
watch_change_item_sent: Counter::default(),
756+
757+
stream_get_item_sent: Counter::default(),
758+
stream_mget_item_sent: Counter::default(),
759+
stream_list_item_sent: Counter::default(),
746760
};
747761

748762
let mut registry = load_global_registry();
@@ -781,6 +795,22 @@ pub mod network_metrics {
781795
metrics.watch_change_item_sent.clone(),
782796
);
783797

798+
registry.register(
799+
key!("stream_get_item_sent"),
800+
"Number of items sent in a stream get response",
801+
metrics.stream_get_item_sent.clone(),
802+
);
803+
registry.register(
804+
key!("stream_mget_item_sent"),
805+
"Number of items sent in a stream mget response",
806+
metrics.stream_mget_item_sent.clone(),
807+
);
808+
registry.register(
809+
key!("stream_list_item_sent"),
810+
"Number of items sent in a stream list response",
811+
metrics.stream_list_item_sent.clone(),
812+
);
813+
784814
metrics
785815
}
786816
}
@@ -830,6 +860,23 @@ pub mod network_metrics {
830860
pub fn incr_watch_sent_change_item() {
831861
NETWORK_METRICS.watch_change_item_sent.inc();
832862
}
863+
864+
pub fn incr_stream_sent_item(typ: &'static str) {
865+
match typ {
866+
"get" => {
867+
NETWORK_METRICS.stream_get_item_sent.inc();
868+
}
869+
"mget" => {
870+
NETWORK_METRICS.stream_mget_item_sent.inc();
871+
}
872+
"list" => {
873+
NETWORK_METRICS.stream_list_item_sent.inc();
874+
}
875+
_ => {
876+
error!("Unknown stream item type: {}", typ);
877+
}
878+
}
879+
}
833880
}
834881

835882
/// RAII metrics counter of in-flight requests count and delay.

src/meta/service/tests/it/api/http/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,5 +285,10 @@ async fn test_metrics() -> anyhow::Result<()> {
285285
assert!(metric_keys.contains("metasrv_meta_network_watch_initialization_total"));
286286
assert!(metric_keys.contains("metasrv_meta_network_watch_change_total"));
287287

288+
// Stream metrics
289+
assert!(metric_keys.contains("metasrv_meta_network_stream_get_item_sent_total"));
290+
assert!(metric_keys.contains("metasrv_meta_network_stream_mget_item_sent_total"));
291+
assert!(metric_keys.contains("metasrv_meta_network_stream_list_item_sent_total"));
292+
288293
Ok(())
289294
}

0 commit comments

Comments
 (0)