diff --git a/components/tracker/src/lib.rs b/components/tracker/src/lib.rs index b678120b92c..31a985629ac 100644 --- a/components/tracker/src/lib.rs +++ b/components/tracker/src/lib.rs @@ -57,6 +57,19 @@ impl Tracker { detail_v2.set_read_index_propose_wait_nanos(self.metrics.read_index_propose_wait_nanos); detail_v2.set_read_index_confirm_wait_nanos(self.metrics.read_index_confirm_wait_nanos); detail_v2.set_read_pool_schedule_wait_nanos(self.metrics.read_pool_schedule_wait_nanos); + + // NOTE: These stats are mainly for write operations. Read operations populate + // MVCC scan stats by `Statistics::write_scan_detail` before calling this + // function. So only fill them when unset to avoid clobbering existing values. + if detail_v2.get_total_versions() == 0 { + detail_v2.set_total_versions(self.metrics.mvcc_total_versions); + } + if detail_v2.get_processed_versions() == 0 { + detail_v2.set_processed_versions(self.metrics.mvcc_processed_versions); + } + if detail_v2.get_processed_versions_size() == 0 { + detail_v2.set_processed_versions_size(self.metrics.mvcc_processed_versions_size); + } } pub fn write_write_detail(&self, detail: &mut pb::WriteDetail) { @@ -173,6 +186,11 @@ pub struct RequestMetrics { pub scheduler_process_nanos: u64, pub scheduler_throttle_nanos: u64, + // MVCC scan stats for txn commands, to be written into `ScanDetailV2`. + pub mvcc_total_versions: u64, + pub mvcc_processed_versions: u64, + pub mvcc_processed_versions_size: u64, + pub future_process_nanos: u64, pub future_suspend_nanos: u64, @@ -203,3 +221,59 @@ pub struct RequestMetrics { // recorded outside the read_pool thread, accessed inside the read_pool thread for topsql usage pub grpc_req_size: u64, } + +#[cfg(test)] +mod tests { + use std::time::Instant; + + use super::*; + + fn new_req_info() -> RequestInfo { + RequestInfo { + region_id: 0, + start_ts: 0, + task_id: 0, + resource_group_tag: vec![], + begin: Instant::now(), + request_type: RequestType::default(), + cid: 0, + is_external_req: false, + } + } + + #[test] + fn test_write_scan_detail_sets_mvcc_scan_stats_when_unset_and_is_idempotent() { + let mut tracker = Tracker::new(new_req_info()); + tracker.metrics.mvcc_processed_versions = 3; + tracker.metrics.mvcc_total_versions = 5; + tracker.metrics.mvcc_processed_versions_size = 7; + + let mut detail_v2 = pb::ScanDetailV2::default(); + + tracker.write_scan_detail(&mut detail_v2); + tracker.write_scan_detail(&mut detail_v2); + + assert_eq!(detail_v2.get_processed_versions(), 3); + assert_eq!(detail_v2.get_total_versions(), 5); + assert_eq!(detail_v2.get_processed_versions_size(), 7); + } + + #[test] + fn test_write_scan_detail_does_not_override_existing_mvcc_scan_stats() { + let mut tracker = Tracker::new(new_req_info()); + tracker.metrics.mvcc_processed_versions = 3; + tracker.metrics.mvcc_total_versions = 5; + tracker.metrics.mvcc_processed_versions_size = 7; + + let mut detail_v2 = pb::ScanDetailV2::default(); + detail_v2.set_processed_versions(11); + detail_v2.set_total_versions(13); + detail_v2.set_processed_versions_size(17); + + tracker.write_scan_detail(&mut detail_v2); + + assert_eq!(detail_v2.get_processed_versions(), 11); + assert_eq!(detail_v2.get_total_versions(), 13); + assert_eq!(detail_v2.get_processed_versions_size(), 17); + } +} diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index b8177634787..788cae12279 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -1888,6 +1888,27 @@ impl TxnScheduler { let deadline = task.cmd().deadline(); let write_result = Self::handle_task(self.clone(), snapshot, task, sched_details).await; + // Feed MVCC scan stats into the per-request tracker before any callback/early + // response can be triggered, so the gRPC layer can include them in + // `ExecDetailsV2.scan_detail_v2`. + let mvcc_total_versions = sched_details.stat.write.total_op_count() as u64; + let mvcc_processed_versions = sched_details.stat.write.processed_keys as u64; + let mvcc_processed_versions_size = sched_details.stat.processed_size as u64; + GLOBAL_TRACKERS.with_tracker(tracker_token, |tracker| { + tracker.metrics.mvcc_total_versions = tracker + .metrics + .mvcc_total_versions + .saturating_add(mvcc_total_versions); + tracker.metrics.mvcc_processed_versions = tracker + .metrics + .mvcc_processed_versions + .saturating_add(mvcc_processed_versions); + tracker.metrics.mvcc_processed_versions_size = tracker + .metrics + .mvcc_processed_versions_size + .saturating_add(mvcc_processed_versions_size); + }); + let mut write_result = match deadline .check() .map_err(StorageError::from) diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index d1850df1956..544ff4d0f8a 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -2685,11 +2685,11 @@ fn test_commands_write_detail() { pessimistic_lock_req.set_primary_lock(k.clone()); pessimistic_lock_req.set_lock_ttl(3000); let pessimistic_lock_resp = client.kv_pessimistic_lock(&pessimistic_lock_req).unwrap(); - check_scan_detail( - pessimistic_lock_resp - .get_exec_details_v2() - .get_scan_detail_v2(), - ); + let pessimistic_lock_scan_detail = pessimistic_lock_resp + .get_exec_details_v2() + .get_scan_detail_v2(); + check_scan_detail(pessimistic_lock_scan_detail); + assert!(pessimistic_lock_scan_detail.get_total_versions() > 0); check_write_detail( pessimistic_lock_resp .get_exec_details_v2() @@ -2739,8 +2739,8 @@ fn test_commands_write_detail() { ); let mut check_txn_status_req = CheckTxnStatusRequest::default(); - check_txn_status_req.set_context(ctx); - check_txn_status_req.set_primary_key(k); + check_txn_status_req.set_context(ctx.clone()); + check_txn_status_req.set_primary_key(k.clone()); check_txn_status_req.set_lock_ts(20); check_txn_status_req.set_rollback_if_not_exist(true); let check_txn_status_resp = client.kv_check_txn_status(&check_txn_status_req).unwrap(); @@ -2756,6 +2756,26 @@ fn test_commands_write_detail() { .get_process_nanos() > 0 ); + + // Verify MVCC scan stats are carried into exec details for txn write RPCs. + // Use an optimistic prewrite (for_update_ts == 0) after the key has a committed + // version, so it will check write CF and produce non-zero `total_versions`. + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.set_key(k.clone()); + mutation.set_value(b"value2".to_vec()); + let mut optimistic_prewrite_req = PrewriteRequest::default(); + optimistic_prewrite_req.set_mutations(vec![mutation].into()); + optimistic_prewrite_req.set_context(ctx); + optimistic_prewrite_req.set_primary_lock(k); + optimistic_prewrite_req.set_start_version(40); + optimistic_prewrite_req.set_lock_ttl(3000); + let optimistic_prewrite_resp = client.kv_prewrite(&optimistic_prewrite_req).unwrap(); + let optimistic_prewrite_scan_detail = optimistic_prewrite_resp + .get_exec_details_v2() + .get_scan_detail_v2(); + check_scan_detail(optimistic_prewrite_scan_detail); + assert!(optimistic_prewrite_scan_detail.get_total_versions() > 0); } #[test_case(test_raftstore::must_new_cluster_and_kv_client)]