Skip to content

Commit b1f122c

Browse files
committed
refactor(executor): add max rows/bytes limit when schedule executor task
1 parent 1ace76d commit b1f122c

File tree

17 files changed

+390
-25
lines changed

17 files changed

+390
-25
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/// An indicator to limit the size of datablocks transferred between processor
16+
#[derive(Debug, Copy, Clone)]
17+
pub struct DataBlockLimit {
18+
/// The maximum number of rows allowed in a data block
19+
pub max_rows: usize,
20+
/// The maximum size in bytes allowed for a data block
21+
pub max_bytes: usize,
22+
}
23+
24+
impl DataBlockLimit {
25+
pub fn new(mut max_rows: usize, mut max_bytes: usize) -> Self {
26+
// If max_rows or max_bytes is set to 0, we treat it as no limit
27+
if max_rows == 0 {
28+
max_rows = usize::MAX;
29+
}
30+
if max_bytes == 0 {
31+
max_bytes = usize::MAX;
32+
}
33+
34+
Self {
35+
max_rows,
36+
max_bytes,
37+
}
38+
}
39+
40+
/// Calculate the number of rows to take based on both row and byte limits
41+
pub fn rows_to_take(&self, total_rows: usize, total_bytes: usize) -> usize {
42+
debug_assert!(total_rows > 0);
43+
// check row limit
44+
let rows_by_limit = self.max_rows.min(total_rows);
45+
46+
// check byte limit
47+
let average_bytes_per_row = total_bytes / total_rows;
48+
let rows_by_bytes = if average_bytes_per_row > 0 {
49+
// TODO: this 1 may be not suitable
50+
(self.max_bytes / average_bytes_per_row).max(1)
51+
} else {
52+
// Avoid division by zero
53+
total_rows
54+
};
55+
56+
// the minimum of the two limits
57+
rows_by_limit.min(rows_by_bytes)
58+
}
59+
}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
use super::*;
64+
65+
#[test]
66+
fn test_rows_to_take_basic() {
67+
let limit = DataBlockLimit::new(1000, 1_000_000);
68+
69+
// Case 1: Both limits not exceeded
70+
let rows = limit.rows_to_take(500, 500_000);
71+
assert_eq!(rows, 500);
72+
73+
// Case 2: Row limit exceeded
74+
let rows = limit.rows_to_take(2000, 500_000);
75+
assert_eq!(rows, 1000);
76+
77+
// Case 3: Byte limit exceeded
78+
// 2_000_000 bytes / 1000 rows = 2000 bytes per row
79+
// 1_000_000 bytes / 2000 bytes per row = 500 rows
80+
let rows = limit.rows_to_take(1000, 2_000_000);
81+
assert_eq!(rows, 500);
82+
83+
let limit = DataBlockLimit::new(1000, 1_000_000);
84+
85+
// Both limits exceeded, byte limit is more restrictive
86+
// 4_000_000 bytes / 2000 rows = 2000 bytes per row
87+
// 1_000_000 bytes / 2000 bytes per row = 500 rows
88+
let rows = limit.rows_to_take(2000, 4_000_000);
89+
assert_eq!(rows, 500);
90+
91+
// Both limits exceeded, row limit is more restrictive
92+
// 1_500_000 bytes / 2000 rows = 750 bytes per row
93+
// 1_000_000 bytes / 750 bytes per row = 1333 rows
94+
// But row limit is 1000, so we take 1000
95+
let rows = limit.rows_to_take(2000, 1_500_000);
96+
assert_eq!(rows, 1000);
97+
}
98+
}

src/common/base/src/runtime/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod backtrace;
1616
mod catch_unwind;
17+
mod datablock_limit;
1718
mod defer;
1819
pub mod error_info;
1920
mod executor_stats;
@@ -35,6 +36,7 @@ pub use backtrace::AsyncTaskItem;
3536
pub use catch_unwind::catch_unwind;
3637
pub use catch_unwind::drop_guard;
3738
pub use catch_unwind::CatchUnwindFuture;
39+
pub use datablock_limit::DataBlockLimit;
3840
pub use defer::defer;
3941
pub use executor_stats::ExecutorStats;
4042
pub use executor_stats::ExecutorStatsSlot;

src/common/base/src/runtime/runtime_tracker.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use concurrent_queue::ConcurrentQueue;
5454
use log::LevelFilter;
5555
use pin_project_lite::pin_project;
5656

57+
use crate::runtime::datablock_limit::DataBlockLimit;
5758
use crate::runtime::memory::GlobalStatBuffer;
5859
use crate::runtime::memory::MemStat;
5960
use crate::runtime::metrics::ScopedRegistry;
@@ -144,6 +145,7 @@ pub struct TrackingPayload {
144145
pub workload_group_resource: Option<Arc<WorkloadGroupResource>>,
145146
pub perf_enabled: bool,
146147
pub process_rows: AtomicUsize,
148+
pub datablock_limit: Option<Arc<DataBlockLimit>>,
147149
}
148150

149151
impl Clone for TrackingPayload {
@@ -161,6 +163,7 @@ impl Clone for TrackingPayload {
161163
process_rows: AtomicUsize::new(
162164
self.process_rows.load(std::sync::atomic::Ordering::SeqCst),
163165
),
166+
datablock_limit: self.datablock_limit.clone(),
164167
}
165168
}
166169
}
@@ -243,6 +246,7 @@ impl ThreadTracker {
243246
workload_group_resource: None,
244247
perf_enabled: false,
245248
process_rows: AtomicUsize::new(0),
249+
datablock_limit: None,
246250
}),
247251
}
248252
}
@@ -369,6 +373,20 @@ impl ThreadTracker {
369373
})
370374
.unwrap_or(0)
371375
}
376+
377+
pub fn datablock_limit() -> Option<&'static Arc<DataBlockLimit>> {
378+
TRACKER
379+
.try_with(|tracker| {
380+
tracker
381+
.borrow()
382+
.payload
383+
.datablock_limit
384+
.as_ref()
385+
.map(|v| unsafe { std::mem::transmute(v) })
386+
})
387+
.ok()
388+
.and_then(|x| x)
389+
}
372390
}
373391

374392
pin_project! {

src/query/expression/src/block.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,18 @@ impl DataBlock {
547547
}
548548
}
549549

550+
pub fn split_at(&self, mid: usize) -> (Self, Self) {
551+
assert!(
552+
mid <= self.num_rows,
553+
"split point {} out of len {}",
554+
mid,
555+
self.num_rows
556+
);
557+
let first = self.slice(0..mid);
558+
let second = self.slice(mid..self.num_rows);
559+
(first, second)
560+
}
561+
550562
pub fn split_by_rows(&self, max_rows_per_block: usize) -> (Vec<Self>, Option<Self>) {
551563
let mut res = Vec::with_capacity(self.num_rows / max_rows_per_block);
552564
let mut offset = 0;

src/query/pipeline/core/src/processors/port.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use std::sync::Arc;
1919
use databend_common_base::runtime::drop_guard;
2020
use databend_common_base::runtime::profile::Profile;
2121
use databend_common_base::runtime::profile::ProfileStatisticsName;
22-
use databend_common_base::runtime::ExecutorStats;
22+
use databend_common_base::runtime::DataBlockLimit;
2323
use databend_common_base::runtime::QueryTimeSeriesProfile;
24+
use databend_common_base::runtime::ThreadTracker;
2425
use databend_common_base::runtime::TimeSeriesProfileName;
2526
use databend_common_exception::Result;
2627
use databend_common_expression::DataBlock;
@@ -186,17 +187,57 @@ impl InputPort {
186187

187188
#[inline(always)]
188189
pub fn pull_data(&self) -> Option<Result<DataBlock>> {
190+
let datablock_limit = ThreadTracker::datablock_limit();
191+
if let Some(limit) = datablock_limit {
192+
self.pull_partial_data(limit)
193+
} else {
194+
self.pull_all_data()
195+
}
196+
}
197+
198+
/// Pull entire available datablock without any limit.
199+
///
200+
/// This method provides the original behavior with minimal overhead
201+
/// when no datablock limit is configured.
202+
#[inline(always)]
203+
pub fn pull_all_data(&self) -> Option<Result<DataBlock>> {
189204
unsafe {
190205
UpdateTrigger::update_input(&self.update_trigger);
191206
let unset_flags = HAS_DATA | NEED_DATA;
192207
match self.shared.swap(std::ptr::null_mut(), 0, unset_flags) {
208+
address if address.is_null() => None,
209+
address => Some((*Box::from_raw(address)).0),
210+
}
211+
}
212+
}
213+
214+
/// Pull datablock with a size limit, preserving excess rows for subsequent pulls.
215+
pub fn pull_partial_data(&self, limit: &DataBlockLimit) -> Option<Result<DataBlock>> {
216+
unsafe {
217+
UpdateTrigger::update_input(&self.update_trigger);
218+
// retrieve data without changing flags to prevent concurrent modifications
219+
match self.shared.swap(std::ptr::null_mut(), 0, 0) {
193220
address if address.is_null() => None,
194221
address => {
195-
let block = (*Box::from_raw(address)).0;
196-
if let Ok(data_block) = block.as_ref() {
197-
ExecutorStats::record_thread_tracker(data_block.num_rows());
222+
let data_block = (*Box::from_raw(address)).0;
223+
if let Ok(data_block) = &data_block {
224+
let total_rows = data_block.num_rows();
225+
let total_bytes = data_block.memory_size();
226+
let rows_to_take = limit.rows_to_take(total_rows, total_bytes);
227+
// dbg!(rows_to_take, total_rows);
228+
// check if we need to split based on either rows or bytes limit
229+
if rows_to_take < total_rows {
230+
let (need, remain) = data_block.split_at(rows_to_take);
231+
let remain_data = Box::into_raw(Box::new(SharedData(Ok(remain))));
232+
// put back the remainder without changing flags
233+
// this keeps HAS_DATA set so we will have next pull to retrieve it
234+
self.shared.swap(remain_data, 0, 0);
235+
return Some(Ok(need));
236+
}
198237
}
199-
Some(block)
238+
// take entire datablock and unset the flags to signal ready for next loop
239+
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
240+
Some(data_block)
200241
}
201242
}
202243
}

0 commit comments

Comments
 (0)