Skip to content

Commit 00a0693

Browse files
dqhl76zhang2014
andauthored
feat(executor): add query execution stats (#18345)
* feat: add executor stats * feat: add process time * feat: add process rows * fix panic * add system table * add archive stats * save * rebase, solve conflict change table schema * fix test * ci: remove flaky test * temp, use for test * Revert "temp, use for test" This reverts commit 5696744. * refactor: apply review suggestions to remove percentage * fix ut --------- Co-authored-by: Winter Zhang <[email protected]>
1 parent ff7e81d commit 00a0693

File tree

23 files changed

+825
-78
lines changed

23 files changed

+825
-78
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod stats;
16+
17+
pub use stats::ExecutorStats;
18+
pub use stats::ExecutorStatsSlot;
19+
pub use stats::ExecutorStatsSnapshot;
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::atomic::AtomicU64;
16+
use std::sync::atomic::Ordering;
17+
use std::time::SystemTime;
18+
19+
use crate::runtime::ThreadTracker;
20+
21+
const RING_BUFFER_SIZE: usize = 10;
22+
const TS_SHIFT: u32 = 32;
23+
const VAL_MASK: u64 = 0xFFFFFFFF;
24+
25+
const MICROS_PER_SEC: u64 = 1_000_000;
26+
27+
/// Snapshot of executor statistics containing timestamp-value pairs for process time and rows.
28+
#[derive(Debug, Clone)]
29+
pub struct ExecutorStatsSnapshot {
30+
pub process_time: Vec<(u32, u32)>,
31+
pub process_rows: Vec<(u32, u32)>,
32+
}
33+
34+
/// Packs a timestamp (u32) and a value (u32) into a u64.
35+
#[inline]
36+
fn pack(timestamp: u32, value: u32) -> u64 {
37+
(timestamp as u64) << TS_SHIFT | (value as u64)
38+
}
39+
40+
/// Unpacks an u64 into a timestamp (u32) and a value (u32).
41+
#[inline]
42+
fn unpack(packed: u64) -> (u32, u32) {
43+
((packed >> TS_SHIFT) as u32, (packed & VAL_MASK) as u32)
44+
}
45+
46+
/// A slot for storing executor statistics for a specific time window (1 second).
47+
///
48+
/// It uses a single AtomicU64 to store both a Unix timestamp and a value.
49+
/// - The upper 32 bits store the timestamp (seconds since Unix epoch).
50+
/// - The lower 32 bits store the accumulated value (e.g., rows, duration in micros).
51+
#[derive(Default)]
52+
pub struct ExecutorStatsSlot(AtomicU64);
53+
54+
impl ExecutorStatsSlot {
55+
/// Creates a new empty ExecutorStatsSlot.
56+
pub fn new() -> Self {
57+
Self::default()
58+
}
59+
60+
/// Records a metric value using the provided timestamp.
61+
pub fn record_metric(&self, timestamp: usize, value: usize) {
62+
// Convert to u32, clamping if necessary
63+
let timestamp_u32 = timestamp as u32;
64+
let value_u32 = if value > u32::MAX as usize {
65+
u32::MAX
66+
} else {
67+
value as u32
68+
};
69+
self.add(timestamp_u32, value_u32);
70+
}
71+
72+
/// Adds a value to the slot for the given timestamp.
73+
///
74+
/// This operation is thread-safe and uses a lock-free CAS loop.
75+
/// If the time window has expired, the value is reset before adding.
76+
pub fn add(&self, timestamp: u32, value_to_add: u32) {
77+
let mut current_packed = self.0.load(Ordering::SeqCst);
78+
loop {
79+
let (current_ts, current_val) = unpack(current_packed);
80+
let new_packed = if current_ts == timestamp {
81+
pack(current_ts, current_val.saturating_add(value_to_add))
82+
} else {
83+
pack(timestamp, value_to_add)
84+
};
85+
match self.0.compare_exchange_weak(
86+
current_packed,
87+
new_packed,
88+
Ordering::SeqCst,
89+
Ordering::SeqCst,
90+
) {
91+
Ok(_) => return,
92+
Err(expected) => {
93+
current_packed = expected;
94+
}
95+
}
96+
}
97+
}
98+
99+
/// Gets the timestamp and value
100+
pub fn get(&self) -> (u32, u32) {
101+
let packed = self.0.load(Ordering::Acquire);
102+
unpack(packed)
103+
}
104+
}
105+
106+
// A ring-buffer thread-free implementation for storing scheduling profile
107+
pub struct ExecutorStats {
108+
pub process_time: [ExecutorStatsSlot; RING_BUFFER_SIZE],
109+
pub process_rows: [ExecutorStatsSlot; RING_BUFFER_SIZE],
110+
}
111+
112+
impl ExecutorStats {
113+
pub fn new() -> Self {
114+
let process_time = std::array::from_fn(|_| ExecutorStatsSlot::new());
115+
let process_rows = std::array::from_fn(|_| ExecutorStatsSlot::new());
116+
ExecutorStats {
117+
process_time,
118+
process_rows,
119+
}
120+
}
121+
122+
pub fn record_process(&self, begin: SystemTime, elapsed_micros: usize, rows: usize) {
123+
let begin_micros = begin
124+
.duration_since(SystemTime::UNIX_EPOCH)
125+
.unwrap()
126+
.as_micros() as u64;
127+
128+
let end_micros = begin_micros + elapsed_micros as u64;
129+
130+
let begin_timestamp_secs = begin_micros / MICROS_PER_SEC;
131+
let end_timestamp_secs = end_micros / MICROS_PER_SEC;
132+
133+
if begin_timestamp_secs == end_timestamp_secs {
134+
// Single second case - record all in one slot
135+
let slot_idx = (begin_timestamp_secs % RING_BUFFER_SIZE as u64) as usize;
136+
self.process_time[slot_idx]
137+
.record_metric(begin_timestamp_secs as usize, elapsed_micros);
138+
self.process_rows[slot_idx].record_metric(begin_timestamp_secs as usize, rows);
139+
} else {
140+
// Cross-second case - distribute proportionally
141+
let total_duration_micros = elapsed_micros as u64;
142+
143+
for current_sec in begin_timestamp_secs..=end_timestamp_secs {
144+
let slot_idx = (current_sec % RING_BUFFER_SIZE as u64) as usize;
145+
146+
let sec_start_micros = if current_sec == begin_timestamp_secs {
147+
begin_micros % MICROS_PER_SEC
148+
} else {
149+
0
150+
};
151+
152+
let sec_end_micros = if current_sec == end_timestamp_secs {
153+
end_micros % MICROS_PER_SEC
154+
} else {
155+
MICROS_PER_SEC
156+
};
157+
158+
let sec_duration_micros = sec_end_micros - sec_start_micros;
159+
let proportion = sec_duration_micros as f64 / total_duration_micros as f64;
160+
161+
let allocated_micros = (elapsed_micros as f64 * proportion) as usize;
162+
let allocated_rows = (rows as f64 * proportion) as usize;
163+
164+
if allocated_micros > 0 {
165+
self.process_time[slot_idx]
166+
.record_metric(current_sec as usize, allocated_micros);
167+
}
168+
if allocated_rows > 0 {
169+
self.process_rows[slot_idx].record_metric(current_sec as usize, allocated_rows);
170+
}
171+
}
172+
}
173+
}
174+
175+
pub fn record_thread_tracker(rows: usize) {
176+
ThreadTracker::with(|x| {
177+
x.borrow()
178+
.payload
179+
.process_rows
180+
.store(rows, Ordering::SeqCst)
181+
});
182+
}
183+
184+
pub fn dump_snapshot(&self) -> ExecutorStatsSnapshot {
185+
let process_time_snapshot = self.process_time.iter().map(|slot| slot.get()).collect();
186+
let process_rows_snapshot = self.process_rows.iter().map(|slot| slot.get()).collect();
187+
188+
ExecutorStatsSnapshot {
189+
process_time: process_time_snapshot,
190+
process_rows: process_rows_snapshot,
191+
}
192+
}
193+
}
194+
195+
impl Default for ExecutorStats {
196+
fn default() -> Self {
197+
Self::new()
198+
}
199+
}

src/common/base/src/runtime/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ mod backtrace;
1616
mod catch_unwind;
1717
mod defer;
1818
pub mod error_info;
19+
mod executor_stats;
1920
mod global_runtime;
2021
mod memory;
2122
pub mod metrics;
@@ -35,6 +36,9 @@ pub use catch_unwind::catch_unwind;
3536
pub use catch_unwind::drop_guard;
3637
pub use catch_unwind::CatchUnwindFuture;
3738
pub use defer::defer;
39+
pub use executor_stats::ExecutorStats;
40+
pub use executor_stats::ExecutorStatsSlot;
41+
pub use executor_stats::ExecutorStatsSnapshot;
3842
pub use global_runtime::GlobalIORuntime;
3943
pub use global_runtime::GlobalQueryRuntime;
4044
pub use memory::set_alloc_error_hook;

src/common/base/src/runtime/runtime_tracker.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
use std::cell::RefCell;
4646
use std::future::Future;
4747
use std::pin::Pin;
48+
use std::sync::atomic::AtomicUsize;
4849
use std::sync::Arc;
4950
use std::task::Context;
5051
use std::task::Poll;
@@ -132,7 +133,6 @@ impl CaptureLogSettings {
132133
}
133134
}
134135

135-
#[derive(Clone)]
136136
pub struct TrackingPayload {
137137
pub query_id: Option<String>,
138138
pub profile: Option<Arc<Profile>>,
@@ -143,6 +143,26 @@ pub struct TrackingPayload {
143143
pub local_time_series_profile: Option<Arc<TimeSeriesProfiles>>,
144144
pub workload_group_resource: Option<Arc<WorkloadGroupResource>>,
145145
pub perf_enabled: bool,
146+
pub process_rows: AtomicUsize,
147+
}
148+
149+
impl Clone for TrackingPayload {
150+
fn clone(&self) -> Self {
151+
TrackingPayload {
152+
query_id: self.query_id.clone(),
153+
profile: self.profile.clone(),
154+
mem_stat: self.mem_stat.clone(),
155+
metrics: self.metrics.clone(),
156+
capture_log_settings: self.capture_log_settings.clone(),
157+
time_series_profile: self.time_series_profile.clone(),
158+
local_time_series_profile: self.local_time_series_profile.clone(),
159+
workload_group_resource: self.workload_group_resource.clone(),
160+
perf_enabled: self.perf_enabled,
161+
process_rows: AtomicUsize::new(
162+
self.process_rows.load(std::sync::atomic::Ordering::SeqCst),
163+
),
164+
}
165+
}
146166
}
147167

148168
pub struct TrackingGuard {
@@ -222,6 +242,7 @@ impl ThreadTracker {
222242
local_time_series_profile: None,
223243
workload_group_resource: None,
224244
perf_enabled: false,
245+
process_rows: AtomicUsize::new(0),
225246
}),
226247
}
227248
}
@@ -336,6 +357,18 @@ impl ThreadTracker {
336357
.ok()
337358
.and_then(|x| x)
338359
}
360+
361+
pub fn process_rows() -> usize {
362+
TRACKER
363+
.try_with(|tracker| {
364+
tracker
365+
.borrow()
366+
.payload
367+
.process_rows
368+
.load(std::sync::atomic::Ordering::SeqCst)
369+
})
370+
.unwrap_or(0)
371+
}
339372
}
340373

341374
pin_project! {

0 commit comments

Comments
 (0)