Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions components/tracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ impl Tracker {
detail_v2.set_read_index_propose_wait_nanos(self.metrics.read_index_propose_wait_nanos);
detail_v2.set_read_index_confirm_wait_nanos(self.metrics.read_index_confirm_wait_nanos);
detail_v2.set_read_pool_schedule_wait_nanos(self.metrics.read_pool_schedule_wait_nanos);

// NOTE: These stats are mainly for write operations. Read operations populate
// MVCC scan stats by `Statistics::write_scan_detail` before calling this
// function. So only fill them when unset to avoid clobbering existing values.
if detail_v2.get_total_versions() == 0 {
detail_v2.set_total_versions(self.metrics.mvcc_total_versions);
}
if detail_v2.get_processed_versions() == 0 {
detail_v2.set_processed_versions(self.metrics.mvcc_processed_versions);
}
if detail_v2.get_processed_versions_size() == 0 {
detail_v2.set_processed_versions_size(self.metrics.mvcc_processed_versions_size);
}
}

pub fn write_write_detail(&self, detail: &mut pb::WriteDetail) {
Expand Down Expand Up @@ -173,6 +186,11 @@ pub struct RequestMetrics {
pub scheduler_process_nanos: u64,
pub scheduler_throttle_nanos: u64,

// MVCC scan stats for txn commands, to be written into `ScanDetailV2`.
pub mvcc_total_versions: u64,
pub mvcc_processed_versions: u64,
pub mvcc_processed_versions_size: u64,

pub future_process_nanos: u64,
pub future_suspend_nanos: u64,

Expand Down Expand Up @@ -203,3 +221,59 @@ pub struct RequestMetrics {
// recorded outside the read_pool thread, accessed inside the read_pool thread for topsql usage
pub grpc_req_size: u64,
}

#[cfg(test)]
mod tests {
use std::time::Instant;

use super::*;

fn new_req_info() -> RequestInfo {
RequestInfo {
region_id: 0,
start_ts: 0,
task_id: 0,
resource_group_tag: vec![],
begin: Instant::now(),
request_type: RequestType::default(),
cid: 0,
is_external_req: false,
}
}

#[test]
fn test_write_scan_detail_sets_mvcc_scan_stats_when_unset_and_is_idempotent() {
let mut tracker = Tracker::new(new_req_info());
tracker.metrics.mvcc_processed_versions = 3;
tracker.metrics.mvcc_total_versions = 5;
tracker.metrics.mvcc_processed_versions_size = 7;

let mut detail_v2 = pb::ScanDetailV2::default();

tracker.write_scan_detail(&mut detail_v2);
tracker.write_scan_detail(&mut detail_v2);

assert_eq!(detail_v2.get_processed_versions(), 3);
assert_eq!(detail_v2.get_total_versions(), 5);
assert_eq!(detail_v2.get_processed_versions_size(), 7);
}

#[test]
fn test_write_scan_detail_does_not_override_existing_mvcc_scan_stats() {
let mut tracker = Tracker::new(new_req_info());
tracker.metrics.mvcc_processed_versions = 3;
tracker.metrics.mvcc_total_versions = 5;
tracker.metrics.mvcc_processed_versions_size = 7;

let mut detail_v2 = pb::ScanDetailV2::default();
detail_v2.set_processed_versions(11);
detail_v2.set_total_versions(13);
detail_v2.set_processed_versions_size(17);

tracker.write_scan_detail(&mut detail_v2);

assert_eq!(detail_v2.get_processed_versions(), 11);
assert_eq!(detail_v2.get_total_versions(), 13);
assert_eq!(detail_v2.get_processed_versions_size(), 17);
}
}
21 changes: 21 additions & 0 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,27 @@ impl<E: Engine, L: LockManager> TxnScheduler<E, L> {
let deadline = task.cmd().deadline();
let write_result = Self::handle_task(self.clone(), snapshot, task, sched_details).await;

// Feed MVCC scan stats into the per-request tracker before any callback/early
// response can be triggered, so the gRPC layer can include them in
// `ExecDetailsV2.scan_detail_v2`.
let mvcc_total_versions = sched_details.stat.write.total_op_count() as u64;
let mvcc_processed_versions = sched_details.stat.write.processed_keys as u64;
let mvcc_processed_versions_size = sched_details.stat.processed_size as u64;
GLOBAL_TRACKERS.with_tracker(tracker_token, |tracker| {
tracker.metrics.mvcc_total_versions = tracker
.metrics
.mvcc_total_versions
.saturating_add(mvcc_total_versions);
tracker.metrics.mvcc_processed_versions = tracker
.metrics
.mvcc_processed_versions
.saturating_add(mvcc_processed_versions);
tracker.metrics.mvcc_processed_versions_size = tracker
.metrics
.mvcc_processed_versions_size
.saturating_add(mvcc_processed_versions_size);
});

let mut write_result = match deadline
.check()
.map_err(StorageError::from)
Expand Down
34 changes: 27 additions & 7 deletions tests/integrations/server/kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2685,11 +2685,11 @@ fn test_commands_write_detail() {
pessimistic_lock_req.set_primary_lock(k.clone());
pessimistic_lock_req.set_lock_ttl(3000);
let pessimistic_lock_resp = client.kv_pessimistic_lock(&pessimistic_lock_req).unwrap();
check_scan_detail(
pessimistic_lock_resp
.get_exec_details_v2()
.get_scan_detail_v2(),
);
let pessimistic_lock_scan_detail = pessimistic_lock_resp
.get_exec_details_v2()
.get_scan_detail_v2();
check_scan_detail(pessimistic_lock_scan_detail);
assert!(pessimistic_lock_scan_detail.get_total_versions() > 0);
check_write_detail(
pessimistic_lock_resp
.get_exec_details_v2()
Expand Down Expand Up @@ -2739,8 +2739,8 @@ fn test_commands_write_detail() {
);

let mut check_txn_status_req = CheckTxnStatusRequest::default();
check_txn_status_req.set_context(ctx);
check_txn_status_req.set_primary_key(k);
check_txn_status_req.set_context(ctx.clone());
check_txn_status_req.set_primary_key(k.clone());
check_txn_status_req.set_lock_ts(20);
check_txn_status_req.set_rollback_if_not_exist(true);
let check_txn_status_resp = client.kv_check_txn_status(&check_txn_status_req).unwrap();
Expand All @@ -2756,6 +2756,26 @@ fn test_commands_write_detail() {
.get_process_nanos()
> 0
);

// Verify MVCC scan stats are carried into exec details for txn write RPCs.
// Use an optimistic prewrite (for_update_ts == 0) after the key has a committed
// version, so it will check write CF and produce non-zero `total_versions`.
let mut mutation = Mutation::default();
mutation.set_op(Op::Put);
mutation.set_key(k.clone());
mutation.set_value(b"value2".to_vec());
let mut optimistic_prewrite_req = PrewriteRequest::default();
optimistic_prewrite_req.set_mutations(vec![mutation].into());
optimistic_prewrite_req.set_context(ctx);
optimistic_prewrite_req.set_primary_lock(k);
optimistic_prewrite_req.set_start_version(40);
optimistic_prewrite_req.set_lock_ttl(3000);
let optimistic_prewrite_resp = client.kv_prewrite(&optimistic_prewrite_req).unwrap();
let optimistic_prewrite_scan_detail = optimistic_prewrite_resp
.get_exec_details_v2()
.get_scan_detail_v2();
check_scan_detail(optimistic_prewrite_scan_detail);
assert!(optimistic_prewrite_scan_detail.get_total_versions() > 0);
}

#[test_case(test_raftstore::must_new_cluster_and_kv_client)]
Expand Down