Skip to content

Commit a98ed8a

Browse files
committed
refactor: add datablock limit
1 parent b76bef0 commit a98ed8a

File tree

15 files changed

+181
-56
lines changed

15 files changed

+181
-56
lines changed

src/common/base/src/runtime/runtime_tracker.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
use std::cell::RefCell;
4646
use std::future::Future;
4747
use std::pin::Pin;
48+
use std::sync::atomic::AtomicBool;
4849
use std::sync::atomic::AtomicUsize;
4950
use std::sync::Arc;
5051
use std::task::Context;
@@ -144,6 +145,8 @@ pub struct TrackingPayload {
144145
pub workload_group_resource: Option<Arc<WorkloadGroupResource>>,
145146
pub perf_enabled: bool,
146147
pub process_rows: AtomicUsize,
148+
// Indicate whether datablock is sliced and has remaining data in port
149+
pub has_remaining_data: AtomicBool,
147150
}
148151

149152
impl Clone for TrackingPayload {
@@ -161,6 +164,10 @@ impl Clone for TrackingPayload {
161164
process_rows: AtomicUsize::new(
162165
self.process_rows.load(std::sync::atomic::Ordering::SeqCst),
163166
),
167+
has_remaining_data: AtomicBool::new(
168+
self.has_remaining_data
169+
.load(std::sync::atomic::Ordering::SeqCst),
170+
),
164171
}
165172
}
166173
}
@@ -243,6 +250,7 @@ impl ThreadTracker {
243250
workload_group_resource: None,
244251
perf_enabled: false,
245252
process_rows: AtomicUsize::new(0),
253+
has_remaining_data: AtomicBool::new(false),
246254
}),
247255
}
248256
}
@@ -369,6 +377,18 @@ impl ThreadTracker {
369377
})
370378
.unwrap_or(0)
371379
}
380+
381+
pub fn has_remaining_data() -> bool {
382+
TRACKER
383+
.try_with(|tracker| {
384+
tracker
385+
.borrow()
386+
.payload
387+
.has_remaining_data
388+
.load(std::sync::atomic::Ordering::SeqCst)
389+
})
390+
.unwrap_or(false)
391+
}
372392
}
373393

374394
pin_project! {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::atomic::AtomicU64;
16+
17+
/// DataBlock limit in rows and bytes
18+
pub struct BlockLimit {
19+
rows: AtomicU64,
20+
bytes: AtomicU64,
21+
}
22+
23+
impl BlockLimit {
24+
pub fn new(rows: u64, bytes: u64) -> Self {
25+
BlockLimit {
26+
rows: AtomicU64::new(rows),
27+
bytes: AtomicU64::new(bytes),
28+
}
29+
}
30+
}
31+
32+
impl Default for BlockLimit {
33+
fn default() -> Self {
34+
BlockLimit {
35+
rows: AtomicU64::new(u64::MAX),
36+
bytes: AtomicU64::new(u64::MAX),
37+
}
38+
}
39+
}

src/query/pipeline/core/src/processors/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
mod port;
1616
mod processor;
1717

18+
mod block_limit;
1819
mod duplicate_processor;
1920
mod port_trigger;
2021
mod profile;
2122
mod resize_processor;
2223
mod sequence_group;
2324
mod shuffle_processor;
2425

26+
pub use block_limit::BlockLimit;
2527
pub use duplicate_processor::DuplicateProcessor;
2628
pub use port::connect;
2729
pub use port::InputPort;

src/query/pipeline/core/src/processors/port.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use databend_common_base::runtime::TimeSeriesProfileName;
2525
use databend_common_exception::Result;
2626
use databend_common_expression::DataBlock;
2727

28+
use crate::processors::BlockLimit;
2829
use crate::processors::UpdateTrigger;
2930
use crate::unsafe_cell_wrap::UnSafeCellWrap;
3031

@@ -40,6 +41,7 @@ pub struct SharedData(pub Result<DataBlock>);
4041

4142
pub struct SharedStatus {
4243
data: AtomicPtr<SharedData>,
44+
block_limit: Arc<BlockLimit>,
4345
}
4446

4547
unsafe impl Send for SharedStatus {}
@@ -57,9 +59,10 @@ impl Drop for SharedStatus {
5759
}
5860

5961
impl SharedStatus {
60-
pub fn create() -> Arc<SharedStatus> {
62+
pub fn create(block_limit: Arc<BlockLimit>) -> Arc<SharedStatus> {
6163
Arc::new(SharedStatus {
6264
data: AtomicPtr::new(std::ptr::null_mut()),
65+
block_limit,
6366
})
6467
}
6568

@@ -134,7 +137,7 @@ pub struct InputPort {
134137
impl InputPort {
135138
pub fn create() -> Arc<InputPort> {
136139
Arc::new(InputPort {
137-
shared: UnSafeCellWrap::create(SharedStatus::create()),
140+
shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))),
138141
update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()),
139142
})
140143
}
@@ -227,7 +230,7 @@ impl OutputPort {
227230
pub fn create() -> Arc<OutputPort> {
228231
Arc::new(OutputPort {
229232
record_profile: UnSafeCellWrap::create(false),
230-
shared: UnSafeCellWrap::create(SharedStatus::create()),
233+
shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))),
231234
update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()),
232235
})
233236
}
@@ -318,8 +321,8 @@ impl OutputPort {
318321
/// Connect input and output ports.
319322
///
320323
/// # Safety
321-
pub unsafe fn connect(input: &InputPort, output: &OutputPort) {
322-
let shared_status = SharedStatus::create();
324+
pub unsafe fn connect(input: &InputPort, output: &OutputPort, block_limit: Arc<BlockLimit>) {
325+
let shared_status = SharedStatus::create(block_limit);
323326

324327
input.set_shared(shared_status.clone());
325328
output.set_shared(shared_status);

src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use databend_common_exception::Result;
1618
use databend_common_expression::types::Int32Type;
1719
use databend_common_expression::DataBlock;
1820
use databend_common_expression::FromData;
1921
use databend_common_pipeline_core::processors::connect;
22+
use databend_common_pipeline_core::processors::BlockLimit;
2023
use databend_common_pipeline_core::processors::DuplicateProcessor;
2124
use databend_common_pipeline_core::processors::Event;
2225
use databend_common_pipeline_core::processors::InputPort;
@@ -40,9 +43,9 @@ async fn test_duplicate_output_finish() -> Result<()> {
4043
let downstream_input2 = InputPort::create();
4144

4245
unsafe {
43-
connect(&input, &upstream_output);
44-
connect(&downstream_input1, &output1);
45-
connect(&downstream_input2, &output2);
46+
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
47+
connect(&downstream_input1, &output1, Arc::new(BlockLimit::default()));
48+
connect(&downstream_input2, &output2, Arc::new(BlockLimit::default()));
4649
}
4750

4851
downstream_input1.set_need_data();
@@ -68,9 +71,9 @@ async fn test_duplicate_output_finish() -> Result<()> {
6871
let downstream_input2 = InputPort::create();
6972

7073
unsafe {
71-
connect(&input, &upstream_output);
72-
connect(&downstream_input1, &output1);
73-
connect(&downstream_input2, &output2);
74+
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
75+
connect(&downstream_input1, &output1, Arc::new(BlockLimit::default()));
76+
connect(&downstream_input2, &output2, Arc::new(BlockLimit::default()));
7477
}
7578

7679
downstream_input1.finish();
@@ -94,9 +97,9 @@ async fn test_duplicate_output_finish() -> Result<()> {
9497
let downstream_input2 = InputPort::create();
9598

9699
unsafe {
97-
connect(&input, &upstream_output);
98-
connect(&downstream_input1, &output1);
99-
connect(&downstream_input2, &output2);
100+
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
101+
connect(&downstream_input1, &output1, Arc::new(BlockLimit::default()));
102+
connect(&downstream_input2, &output2, Arc::new(BlockLimit::default()));
100103
}
101104

102105
downstream_input1.finish();
@@ -120,9 +123,9 @@ async fn test_duplicate_processor() -> Result<()> {
120123
let downstream_input2 = InputPort::create();
121124

122125
unsafe {
123-
connect(&input, &upstream_output);
124-
connect(&downstream_input1, &output1);
125-
connect(&downstream_input2, &output2);
126+
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
127+
connect(&downstream_input1, &output1, Arc::new(BlockLimit::default()));
128+
connect(&downstream_input2, &output2, Arc::new(BlockLimit::default()));
126129
}
127130

128131
downstream_input1.set_need_data();

src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_expression::local_block_meta_serde;
2222
use databend_common_expression::BlockMetaInfo;
2323
use databend_common_expression::DataBlock;
2424
use databend_common_pipeline_core::processors::connect;
25+
use databend_common_pipeline_core::processors::BlockLimit;
2526
use databend_common_pipeline_core::processors::InputPort;
2627
use databend_common_pipeline_core::processors::OutputPort;
2728

@@ -60,7 +61,7 @@ async fn test_port_drop() -> Result<()> {
6061
let input = InputPort::create();
6162
let output = OutputPort::create();
6263

63-
connect(&input, &output);
64+
connect(&input, &output, Arc::new(BlockLimit::default()));
6465
output.push_data(Ok(DataBlock::empty_with_meta(meta.clone_self())));
6566
assert_eq!(meta.ref_count(), 2);
6667
}
@@ -98,7 +99,7 @@ async fn test_input_and_output_port() -> Result<()> {
9899
let output = OutputPort::create();
99100
let barrier = Arc::new(Barrier::new(2));
100101

101-
connect(&input, &output);
102+
connect(&input, &output, Arc::new(BlockLimit::default()));
102103
let thread_1 = Thread::spawn(input_port(input, barrier.clone()));
103104
let thread_2 = Thread::spawn(output_port(output, barrier));
104105

@@ -114,7 +115,7 @@ async fn test_input_and_output_flags() -> Result<()> {
114115
let input = InputPort::create();
115116
let output = OutputPort::create();
116117

117-
connect(&input, &output);
118+
connect(&input, &output, Arc::new(BlockLimit::default()));
118119

119120
output.finish();
120121
assert!(input.is_finished());

src/query/pipeline/core/tests/it/pipelines/processors/resize.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use databend_common_exception::Result;
1818
use databend_common_expression::DataBlock;
1919
use databend_common_pipeline_core::processors::connect;
20+
use databend_common_pipeline_core::processors::BlockLimit;
2021
use databend_common_pipeline_core::processors::EventCause;
2122
use databend_common_pipeline_core::processors::InputPort;
2223
use databend_common_pipeline_core::processors::OutputPort;
@@ -61,7 +62,7 @@ fn connect_inputs(inputs: Vec<Arc<InputPort>>) -> Vec<Arc<OutputPort>> {
6162
unsafe {
6263
for input in inputs {
6364
let output = OutputPort::create();
64-
connect(&input, &output);
65+
connect(&input, &output, Arc::new(BlockLimit::default()));
6566
outputs.push(output);
6667
}
6768
}
@@ -75,7 +76,7 @@ fn connect_outputs(outputs: Vec<Arc<OutputPort>>) -> Vec<Arc<InputPort>> {
7576
unsafe {
7677
for output in outputs {
7778
let input = InputPort::create();
78-
connect(&input, &output);
79+
connect(&input, &output, Arc::new(BlockLimit::default()));
7980
inputs.push(input);
8081
}
8182
}

src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use databend_common_exception::Result;
1618
use databend_common_expression::types::Int32Type;
1719
use databend_common_expression::DataBlock;
1820
use databend_common_expression::FromData;
1921
use databend_common_pipeline_core::processors::connect;
22+
use databend_common_pipeline_core::processors::BlockLimit;
2023
use databend_common_pipeline_core::processors::Event;
2124
use databend_common_pipeline_core::processors::EventCause;
2225
use databend_common_pipeline_core::processors::InputPort;
@@ -43,10 +46,10 @@ async fn test_shuffle_output_finish() -> Result<()> {
4346
let downstream_input2 = InputPort::create();
4447

4548
unsafe {
46-
connect(&input1, &upstream_output1);
47-
connect(&input2, &upstream_output2);
48-
connect(&downstream_input1, &output1);
49-
connect(&downstream_input2, &output2);
49+
connect(&input1, &upstream_output1, Arc::new(BlockLimit::default()));
50+
connect(&input2, &upstream_output2, Arc::new(BlockLimit::default()));
51+
connect(&downstream_input1, &output1, Arc::new(BlockLimit::default()));
52+
connect(&downstream_input2, &output2, Arc::new(BlockLimit::default()));
5053
}
5154

5255
downstream_input1.finish();
@@ -106,14 +109,14 @@ async fn test_shuffle_processor() -> Result<()> {
106109
let downstream_input4 = InputPort::create();
107110

108111
unsafe {
109-
connect(&input1, &upstream_output1);
110-
connect(&input2, &upstream_output2);
111-
connect(&input3, &upstream_output3);
112-
connect(&input4, &upstream_output4);
113-
connect(&downstream_input1, &output1);
114-
connect(&downstream_input2, &output2);
115-
connect(&downstream_input3, &output3);
116-
connect(&downstream_input4, &output4);
112+
connect(&input1, &upstream_output1, Arc::new(BlockLimit::default()));
113+
connect(&input2, &upstream_output2, Arc::new(BlockLimit::default()));
114+
connect(&input3, &upstream_output3, Arc::new(BlockLimit::default()));
115+
connect(&input4, &upstream_output4, Arc::new(BlockLimit::default()));
116+
connect(&downstream_input1, &output1, Arc::new(BlockLimit::default()));
117+
connect(&downstream_input2, &output2, Arc::new(BlockLimit::default()));
118+
connect(&downstream_input3, &output3, Arc::new(BlockLimit::default()));
119+
connect(&downstream_input4, &output4, Arc::new(BlockLimit::default()));
117120
}
118121

119122
let col1 = Int32Type::from_data(vec![1]);

src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use async_trait::async_trait;
2020
use databend_common_exception::Result;
2121
use databend_common_expression::DataBlock;
2222
use databend_common_pipeline_core::processors::connect;
23+
use databend_common_pipeline_core::processors::BlockLimit;
2324
use databend_common_pipeline_core::processors::Event;
2425
use databend_common_pipeline_core::processors::InputPort;
2526
use databend_common_pipeline_core::processors::OutputPort;
@@ -73,8 +74,8 @@ async fn test_async_mpsc_sink() -> Result<()> {
7374
let upstream_output2 = OutputPort::create();
7475

7576
unsafe {
76-
connect(&input1, &upstream_output1);
77-
connect(&input2, &upstream_output2);
77+
connect(&input1, &upstream_output1, Arc::new(BlockLimit::default()));
78+
connect(&input2, &upstream_output2, Arc::new(BlockLimit::default()));
7879
}
7980

8081
upstream_output1.push_data(Ok(DataBlock::new(vec![], 1)));

src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919
use databend_common_exception::Result;
2020
use databend_common_expression::DataBlock;
2121
use databend_common_pipeline_core::processors::connect;
22+
use databend_common_pipeline_core::processors::BlockLimit;
2223
use databend_common_pipeline_core::processors::Event;
2324
use databend_common_pipeline_core::processors::InputPort;
2425
use databend_common_pipeline_core::processors::OutputPort;
@@ -71,8 +72,8 @@ async fn test_sync_mpsc_sink() -> Result<()> {
7172
let upstream_output2 = OutputPort::create();
7273

7374
unsafe {
74-
connect(&input1, &upstream_output1);
75-
connect(&input2, &upstream_output2);
75+
connect(&input1, &upstream_output1, Arc::new(BlockLimit::default()));
76+
connect(&input2, &upstream_output2, Arc::new(BlockLimit::default()));
7677
}
7778

7879
upstream_output1.push_data(Ok(DataBlock::new(vec![], 1)));

0 commit comments

Comments
 (0)