diff --git a/src/query/pipeline/core/src/processors/block_limit.rs b/src/query/pipeline/core/src/processors/block_limit.rs new file mode 100644 index 0000000000000..1fb5d1ca2b488 --- /dev/null +++ b/src/query/pipeline/core/src/processors/block_limit.rs @@ -0,0 +1,57 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +// this is used to avoid after limiting by bytes, the rows limit is too small +// TODO: this magic 10 may need to be look backed in the future +const MIN_ROWS_BY_BYTES: usize = 10; + +/// DataBlock limit in rows and bytes +pub struct BlockLimit { + rows: AtomicUsize, + bytes: AtomicUsize, +} + +impl BlockLimit { + pub fn new(rows: usize, bytes: usize) -> Self { + BlockLimit { + rows: AtomicUsize::new(rows), + bytes: AtomicUsize::new(bytes), + } + } + + /// Calculate the number of rows to take based on both row and byte limits + pub fn calculate_limit_rows(&self, total_rows: usize, total_bytes: usize) -> usize { + if total_rows == 0 { + return 0; + } + // max with 1 used to avoid division by zero + let average_bytes_per_row = (total_bytes / total_rows).max(1); + let rows_by_bytes = + (self.bytes.load(Ordering::Relaxed) / average_bytes_per_row).max(MIN_ROWS_BY_BYTES); + let rows_limit = self.rows.load(Ordering::Relaxed); + rows_limit.min(rows_by_bytes) + } +} + +impl Default for BlockLimit { + fn default() -> Self { + BlockLimit { + rows: AtomicUsize::new(usize::MAX), + bytes: AtomicUsize::new(usize::MAX), + } + } +} diff --git a/src/query/pipeline/core/src/processors/mod.rs b/src/query/pipeline/core/src/processors/mod.rs index b9468d09fb984..70fe9b90aaac2 100644 --- a/src/query/pipeline/core/src/processors/mod.rs +++ b/src/query/pipeline/core/src/processors/mod.rs @@ -15,6 +15,7 @@ mod port; mod processor; +mod block_limit; mod duplicate_processor; mod port_trigger; mod profile; @@ -22,6 +23,7 @@ mod resize_processor; mod sequence_group; mod shuffle_processor; +pub use block_limit::BlockLimit; pub use duplicate_processor::DuplicateProcessor; pub use port::connect; pub use port::InputPort; diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index fa257bfa5d207..f69ffe9486ded 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicPtr; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -25,6 +26,7 @@ use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use crate::processors::BlockLimit; use crate::processors::UpdateTrigger; use crate::unsafe_cell_wrap::UnSafeCellWrap; @@ -40,6 +42,10 @@ pub struct SharedData(pub Result); pub struct SharedStatus { data: AtomicPtr, + block_limit: Arc, + // This flag is used to indicate if a slice operation + // has occurred on the data block + slice_occurred: AtomicBool, } unsafe impl Send for SharedStatus {} @@ -57,9 +63,11 @@ impl Drop for SharedStatus { } impl SharedStatus { - pub fn create() -> Arc { + pub fn create(block_limit: Arc) -> Arc { Arc::new(SharedStatus { data: AtomicPtr::new(std::ptr::null_mut()), + block_limit, + slice_occurred: AtomicBool::new(false), }) } @@ -124,6 +132,11 @@ impl SharedStatus { pub fn get_flags(&self) -> usize { self.data.load(Ordering::SeqCst) as usize & FLAGS_MASK } + + #[inline(always)] + pub fn get_block_limit(&self) -> &Arc { + &self.block_limit + } } pub struct InputPort { @@ -134,7 +147,7 @@ pub struct InputPort { impl InputPort { pub fn create() -> Arc { Arc::new(InputPort { - shared: UnSafeCellWrap::create(SharedStatus::create()), + shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))), update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()), }) } @@ -169,6 +182,7 @@ impl InputPort { unsafe { let flags = self.shared.set_flags(NEED_DATA, NEED_DATA); if flags & NEED_DATA == 0 { + // info!("[input_port] trigger input port set need data"); UpdateTrigger::update_input(&self.update_trigger); } } @@ -184,24 +198,63 @@ impl InputPort { (self.shared.get_flags() & HAS_DATA) != 0 } - #[inline(always)] pub fn pull_data(&self) -> Option> { unsafe { + // info!("[input_port] trigger input port pull data"); UpdateTrigger::update_input(&self.update_trigger); - let unset_flags = HAS_DATA | NEED_DATA; - match self.shared.swap(std::ptr::null_mut(), 0, unset_flags) { - address if address.is_null() => None, - address => { - let block = (*Box::from_raw(address)).0; - if let Ok(data_block) = block.as_ref() { - ExecutorStats::record_thread_tracker(data_block.num_rows()); - } - Some(block) + + // First, swap out the data without unsetting flags to prevent race conditions + let address = self.shared.swap(std::ptr::null_mut(), 0, 0); + + if address.is_null() { + // No data available, now safe to unset flags + self.shared.set_flags(0, HAS_DATA | NEED_DATA); + return None; + } + + let shared_data = Box::from_raw(address); + match shared_data.0 { + Ok(data_block) => self.process_data_block(data_block), + Err(e) => { + // Error case, unset both flags + self.shared.set_flags(0, HAS_DATA | NEED_DATA); + Some(Err(e)) } } } } + fn process_data_block(&self, data_block: DataBlock) -> Option> { + let block_limit = self.shared.get_block_limit(); + let limit_rows = + block_limit.calculate_limit_rows(data_block.num_rows(), data_block.memory_size()); + + if data_block.num_rows() > limit_rows && limit_rows > 0 { + // info!( + // "[input_port] pull data with slice, limit/all: {}/{}", + // limit_rows, + // data_block.num_rows() + // ); + // Need to split the block + let taken_block = data_block.slice(0..limit_rows); + let remaining_block = data_block.slice(limit_rows..data_block.num_rows()); + + let remaining_data = Box::new(SharedData(Ok(remaining_block))); + self.shared.swap(Box::into_raw(remaining_data), 0, 0); + self.shared.slice_occurred.store(true, Ordering::Relaxed); + ExecutorStats::record_thread_tracker(taken_block.num_rows()); + Some(Ok(taken_block)) + } else { + // info!("[input_port] pull data all: {}", data_block.num_rows()); + // No need to split, take the whole block + // Unset both HAS_DATA and NEED_DATA flags + self.shared.set_flags(0, HAS_DATA | NEED_DATA); + + ExecutorStats::record_thread_tracker(data_block.num_rows()); + Some(Ok(data_block)) + } + } + /// # Safety /// /// Method is thread unsafe and require thread safe call @@ -215,6 +268,14 @@ impl InputPort { pub unsafe fn set_trigger(&self, update_trigger: *mut UpdateTrigger) { self.update_trigger.set_value(update_trigger) } + + pub fn slice_occurred(&self) -> bool { + self.shared.slice_occurred.load(Ordering::Relaxed) + } + + pub fn reset_slice_occurred(&self) { + self.shared.slice_occurred.store(false, Ordering::Relaxed); + } } pub struct OutputPort { @@ -227,7 +288,7 @@ impl OutputPort { pub fn create() -> Arc { Arc::new(OutputPort { record_profile: UnSafeCellWrap::create(false), - shared: UnSafeCellWrap::create(SharedStatus::create()), + shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))), update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()), }) } @@ -235,6 +296,7 @@ impl OutputPort { #[inline(always)] pub fn push_data(&self, data: Result) { unsafe { + // info!("[output_port] trigger output port push_data"); UpdateTrigger::update_output(&self.update_trigger); if let Ok(data_block) = &data { @@ -318,8 +380,8 @@ impl OutputPort { /// Connect input and output ports. /// /// # Safety -pub unsafe fn connect(input: &InputPort, output: &OutputPort) { - let shared_status = SharedStatus::create(); +pub unsafe fn connect(input: &InputPort, output: &OutputPort, block_limit: Arc) { + let shared_status = SharedStatus::create(block_limit); input.set_shared(shared_status.clone()); output.set_shared(shared_status); diff --git a/src/query/pipeline/core/src/processors/port_trigger.rs b/src/query/pipeline/core/src/processors/port_trigger.rs index 7a02cfae185cb..5aed03ca14b0e 100644 --- a/src/query/pipeline/core/src/processors/port_trigger.rs +++ b/src/query/pipeline/core/src/processors/port_trigger.rs @@ -33,6 +33,7 @@ unsafe impl Send for UpdateList {} unsafe impl Sync for UpdateList {} +#[derive(Debug, Clone)] pub enum DirectedEdge { Source(EdgeIndex), Target(EdgeIndex), diff --git a/src/query/pipeline/core/src/processors/resize_processor.rs b/src/query/pipeline/core/src/processors/resize_processor.rs index 9bfff2f58183c..a4edcdd1a1f12 100644 --- a/src/query/pipeline/core/src/processors/resize_processor.rs +++ b/src/query/pipeline/core/src/processors/resize_processor.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::sync::Arc; use databend_common_exception::Result; +use log::info; use crate::pipe::PipeItem; use crate::processors::Event; @@ -110,6 +111,10 @@ impl Processor for ResizeProcessor { } } } + info!( + "[resize] event_with_cause: {:?}, input port {:?} output port {:?}", + &cause, &self.waiting_inputs, &self.waiting_outputs + ); while !self.waiting_outputs.is_empty() && !self.waiting_inputs.is_empty() { let output_index = self.waiting_outputs.pop_front().unwrap(); @@ -159,6 +164,10 @@ impl Processor for ResizeProcessor { return Ok(Event::Finished); } + info!( + "[resize] event_with_cause: {:?}, input port {:?} output port {:?}", + &cause, &self.waiting_inputs, &self.waiting_outputs + ); match self.waiting_outputs.is_empty() { true => Ok(Event::NeedConsume), diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs b/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs index 9e29ad5c4657a..1857005c2bff7 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::types::Int32Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::DuplicateProcessor; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; @@ -40,9 +43,17 @@ async fn test_duplicate_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.set_need_data(); @@ -68,9 +79,17 @@ async fn test_duplicate_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.finish(); @@ -94,9 +113,17 @@ async fn test_duplicate_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.finish(); @@ -120,9 +147,17 @@ async fn test_duplicate_processor() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.set_need_data(); diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs b/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs index 07b2ba1b1c2f1..566ce7a081cca 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs @@ -19,9 +19,12 @@ use databend_common_base::runtime::Thread; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::local_block_meta_serde; +use databend_common_expression::types::Int32Type; use databend_common_expression::BlockMetaInfo; use databend_common_expression::DataBlock; +use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -60,7 +63,7 @@ async fn test_port_drop() -> Result<()> { let input = InputPort::create(); let output = OutputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); output.push_data(Ok(DataBlock::empty_with_meta(meta.clone_self()))); assert_eq!(meta.ref_count(), 2); } @@ -98,7 +101,7 @@ async fn test_input_and_output_port() -> Result<()> { let output = OutputPort::create(); let barrier = Arc::new(Barrier::new(2)); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); let thread_1 = Thread::spawn(input_port(input, barrier.clone())); let thread_2 = Thread::spawn(output_port(output, barrier)); @@ -114,7 +117,7 @@ async fn test_input_and_output_flags() -> Result<()> { let input = InputPort::create(); let output = OutputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); output.finish(); assert!(input.is_finished()); @@ -127,3 +130,93 @@ async fn test_input_and_output_flags() -> Result<()> { // assert_eq!(!output.can_push()); Ok(()) } + +#[test] +fn test_block_limit_splitting() -> Result<()> { + unsafe { + let input = InputPort::create(); + let output = OutputPort::create(); + + // Create a BlockLimit with small row limit to trigger splitting + let block_limit = Arc::new(BlockLimit::new(10, 1024 * 1024)); // 10 rows max, 1MB bytes + connect(&input, &output, block_limit); + + // Create a block with 30 rows + let mut data = Vec::with_capacity(30); + for i in 0..30 { + data.push(i); + } + let col = Int32Type::from_data(data); + let block = DataBlock::new_from_columns(vec![col]); + + // Push the large block + output.push_data(Ok(block.clone())); + + // First pull should get 10 rows + input.set_need_data(); + assert!(input.has_data()); + let pulled_block = input.pull_data().unwrap()?; + assert_eq!(pulled_block.num_rows(), 10); + + // After first pull, should still have data (remaining 20 rows) + assert!(input.has_data()); + + // Second pull should get another 10 rows + let pulled_block = input.pull_data().unwrap()?; + assert_eq!(pulled_block.num_rows(), 10); + + // Should still have data (remaining 10 rows) + assert!(input.has_data()); + + // Third pull should get the last 10 rows + let pulled_block = input.pull_data().unwrap()?; + assert_eq!(pulled_block.num_rows(), 10); + + // Now should have no data + assert!(!input.has_data()); + + // Trying to pull again should return None + let result = input.pull_data(); + assert!(result.is_none()); + } + + Ok(()) +} + +#[test] +fn test_block_limit_no_splitting() -> Result<()> { + unsafe { + let input = InputPort::create(); + let output = OutputPort::create(); + + // Create a BlockLimit with large limits (no splitting should occur) + let block_limit = Arc::new(BlockLimit::new(1000, 10 * 1024 * 1024)); // 1000 rows, 10MB + connect(&input, &output, block_limit); + + // Create a small block with 30 rows + let mut data = Vec::with_capacity(30); + for i in 0..30 { + data.push(i); + } + let col = Int32Type::from_data(data); + let block = DataBlock::new_from_columns(vec![col]); + + // Push the block + output.push_data(Ok(block.clone())); + + // Pull should get all 30 rows at once + input.set_need_data(); + assert!(input.has_data()); + let pulled_block = input.pull_data().unwrap().unwrap(); + assert_eq!(pulled_block.num_rows(), 30); + + // Should have no more data + assert!(!input.has_data()); + + // Trying to pull again should return None + let result = input.pull_data(); + assert!(result.is_none()); + } + + Ok(()) +} diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs b/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs index 0d2dea56e168d..46d08beba9794 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -61,7 +62,7 @@ fn connect_inputs(inputs: Vec>) -> Vec> { unsafe { for input in inputs { let output = OutputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); outputs.push(output); } } @@ -75,7 +76,7 @@ fn connect_outputs(outputs: Vec>) -> Vec> { unsafe { for output in outputs { let input = InputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); inputs.push(input); } } diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs b/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs index 5f419913f1a6b..5973d2f65b2ab 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::types::Int32Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::InputPort; @@ -43,10 +46,18 @@ async fn test_shuffle_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.finish(); @@ -106,14 +117,30 @@ async fn test_shuffle_processor() -> Result<()> { let downstream_input4 = InputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); - connect(&input3, &upstream_output3); - connect(&input4, &upstream_output4); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); - connect(&downstream_input3, &output3); - connect(&downstream_input4, &output4); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); + connect(&input3, &upstream_output3, Arc::new(BlockLimit::default())); + connect(&input4, &upstream_output4, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input3, + &output3, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input4, + &output4, + Arc::new(BlockLimit::default()), + ); } let col1 = Int32Type::from_data(vec![1]); diff --git a/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs b/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs index af8c32988fcef..a0cb1e36d2eac 100644 --- a/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs +++ b/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -73,8 +74,8 @@ async fn test_async_mpsc_sink() -> Result<()> { let upstream_output2 = OutputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); } upstream_output1.push_data(Ok(DataBlock::new(vec![], 1))); diff --git a/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs b/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs index 45ed96476c6e4..66db07c14dc78 100644 --- a/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs +++ b/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -71,8 +72,8 @@ async fn test_sync_mpsc_sink() -> Result<()> { let upstream_output2 = OutputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); } upstream_output1.push_data(Ok(DataBlock::new(vec![], 1))); diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs index 182c1c77c9f70..6a320b4e90a50 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs @@ -245,4 +245,8 @@ pub struct SortTaskMeta { } #[typetag::serde(name = "sort_task")] -impl BlockMetaInfo for SortTaskMeta {} +impl BlockMetaInfo for SortTaskMeta { + fn clone_self(&self) -> Box { + Box::new(*self) + } +} diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 5092c459a8f31..0202d2e978c7d 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -39,6 +39,7 @@ use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::PlanScope; use databend_common_pipeline_core::Pipeline; @@ -46,7 +47,7 @@ use databend_common_pipeline_core::PlanProfile; use databend_common_storages_system::QueryExecutionStatsQueue; use fastrace::prelude::*; use log::debug; -use log::trace; +use log::info; use log::warn; use parking_lot::Condvar; use parking_lot::Mutex; @@ -190,19 +191,24 @@ type StateLockGuard = ExecutingGraph; impl ExecutingGraph { pub fn create( mut pipeline: Pipeline, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result { let mut graph = StableGraph::new(); let mut time_series_profile_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); - Self::init_graph(&mut pipeline, &mut graph, &mut time_series_profile_builder); + Self::init_graph( + &mut pipeline, + &mut graph, + &mut time_series_profile_builder, + block_limit, + ); let executor_stats = ExecutorStats::new(); Ok(ExecutingGraph { graph, finished_nodes: AtomicUsize::new(0), - points: AtomicU64::new((DEFAULT_POINTS << 32) | init_epoch as u64), + points: AtomicU64::new((DEFAULT_POINTS << 32) | 1u64), max_points: AtomicU64::new(DEFAULT_POINTS), query_id, should_finish: AtomicBool::new(false), @@ -215,21 +221,26 @@ impl ExecutingGraph { pub fn from_pipelines( mut pipelines: Vec, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result { let mut graph = StableGraph::new(); let mut time_series_profile_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); for pipeline in &mut pipelines { - Self::init_graph(pipeline, &mut graph, &mut time_series_profile_builder); + Self::init_graph( + pipeline, + &mut graph, + &mut time_series_profile_builder, + block_limit.clone(), + ); } let executor_stats = ExecutorStats::new(); Ok(ExecutingGraph { finished_nodes: AtomicUsize::new(0), graph, - points: AtomicU64::new((DEFAULT_POINTS << 32) | init_epoch as u64), + points: AtomicU64::new((DEFAULT_POINTS << 32) | 1u64), max_points: AtomicU64::new(DEFAULT_POINTS), query_id, should_finish: AtomicBool::new(false), @@ -244,6 +255,7 @@ impl ExecutingGraph { pipeline: &mut Pipeline, graph: &mut StableGraph, EdgeInfo>, time_series_profile_builder: &mut QueryTimeSeriesProfileBuilder, + block_limit: Arc, ) { let offset = graph.node_count(); for node in pipeline.graph.node_weights() { @@ -284,7 +296,6 @@ impl ExecutingGraph { mut_node.tracking_payload.time_series_profile = Some(query_time_series.clone()); } } - for edge in pipeline.graph.edge_indices() { let index = EdgeIndex::new(edge.index()); if let Some((source, target)) = pipeline.graph.edge_endpoints(index) { @@ -324,6 +335,7 @@ impl ExecutingGraph { connect( &graph[target_node].inputs_port[target_port], &graph[source_node].outputs_port[source_port], + block_limit.clone(), ); } } @@ -355,6 +367,9 @@ impl ExecutingGraph { schedule_queue: &mut ScheduleQueue, graph: &Arc, ) -> Result<()> { + // info!("[schedule] ------------new--------------"); + let _node = &locker.graph[index]; + // info!("[schedule] Node {:?} trigger schedule", node); let mut need_schedule_nodes = VecDeque::new(); let mut need_schedule_edges = VecDeque::new(); @@ -367,6 +382,7 @@ impl ExecutingGraph { if need_schedule_nodes.is_empty() { let edge = need_schedule_edges.pop_front().unwrap(); + // info!("Schedule edge: {:?}", edge); let target_index = DirectedEdge::get_target(&edge, &locker.graph)?; event_cause = match edge { @@ -382,6 +398,7 @@ impl ExecutingGraph { let node_state = node.state.lock().unwrap_or_else(PoisonError::into_inner); if matches!(*node_state, State::Idle) { + // info!("[schedule] add new Node: {:?}", node); state_guard_cache = Some(node_state); need_schedule_nodes.push_back(target_index); } else { @@ -391,6 +408,7 @@ impl ExecutingGraph { if let Some(schedule_index) = need_schedule_nodes.pop_front() { let node = &locker.graph[schedule_index]; + // info!("[schedule] Schedule node: {:?}", node); let (event, process_rows) = { let mut payload = node.tracking_payload.clone(); payload.process_rows = AtomicUsize::new(0); @@ -410,7 +428,7 @@ impl ExecutingGraph { } }?; - trace!( + info!( "node id: {:?}, name: {:?}, event: {:?}", node.processor.id(), node.processor.name(), @@ -443,8 +461,31 @@ impl ExecutingGraph { } }; - node.trigger(&mut need_schedule_edges); + let mut new_need_schedule_edges = VecDeque::new(); + node.trigger(&mut new_need_schedule_edges); + while let Some(edge) = new_need_schedule_edges.pop_back() { + if let DirectedEdge::Target(index) = edge { + let port_index = locker.graph.edge_weight(index).unwrap().input_index; + let port = node.inputs_port[port_index].as_ref(); + if port.slice_occurred() { + port.reset_slice_occurred(); + // info!( + // "[schedule!!] detect slice occurred on edge: {:?}", + // DirectedEdge::Source(index) + // ); + need_schedule_edges.push_front(DirectedEdge::Source(index)); + } + } + need_schedule_edges.push_front(edge); + } + *state_guard_cache.unwrap() = processor_state; + + // info!( + // "[schedule] node {} trigger edge: {:?}", + // node.processor.name(), + // need_schedule_edges + // ); } } @@ -688,24 +729,28 @@ pub struct RunningGraph(ExecutingGraph); impl RunningGraph { pub fn create( pipeline: Pipeline, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result> { let graph_state = - ExecutingGraph::create(pipeline, init_epoch, query_id, finish_condvar_notify)?; + ExecutingGraph::create(pipeline, query_id, finish_condvar_notify, block_limit)?; debug!("Create running graph:{:?}", graph_state); Ok(Arc::new(RunningGraph(graph_state))) } pub fn from_pipelines( pipelines: Vec, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result> { - let graph_state = - ExecutingGraph::from_pipelines(pipelines, init_epoch, query_id, finish_condvar_notify)?; + let graph_state = ExecutingGraph::from_pipelines( + pipelines, + query_id, + finish_condvar_notify, + block_limit, + )?; debug!("Create running graph:{:?}", graph_state); Ok(Arc::new(RunningGraph(graph_state))) } diff --git a/src/query/service/src/pipelines/executor/executor_settings.rs b/src/query/service/src/pipelines/executor/executor_settings.rs index 4eb0df638b1a2..0237c406de5bf 100644 --- a/src/query/service/src/pipelines/executor/executor_settings.rs +++ b/src/query/service/src/pipelines/executor/executor_settings.rs @@ -18,6 +18,7 @@ use std::time::Duration; use databend_common_catalog::table_context::TableContext; use databend_common_config::GlobalConfig; use databend_common_exception::Result; +use databend_common_pipeline_core::processors::BlockLimit; #[derive(Clone)] pub struct ExecutorSettings { @@ -26,6 +27,7 @@ pub struct ExecutorSettings { pub enable_queries_executor: bool, pub max_execute_time_in_seconds: Duration, pub executor_node_id: String, + pub block_limit: Arc, } impl ExecutorSettings { @@ -45,12 +47,20 @@ impl ExecutorSettings { config_enable_queries_executor }; + let max_block_rows = settings.get_max_block_size()?; + let max_block_bytes = settings.get_max_block_bytes()?; + let block_limit = Arc::new(BlockLimit::new( + max_block_rows as usize, + max_block_bytes as usize, + )); + Ok(ExecutorSettings { enable_queries_executor, query_id: Arc::new(query_id), max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds), max_threads, executor_node_id: ctx.get_cluster().local_id.clone(), + block_limit, }) } } diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 8b2df0a3e10d7..f066866300e1e 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -75,9 +75,9 @@ impl PipelineExecutor { let graph = RunningGraph::create( pipeline, - 1, settings.query_id.clone(), Some(finish_condvar.clone()), + settings.block_limit.clone(), )?; Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper { @@ -130,9 +130,9 @@ impl PipelineExecutor { let graph = RunningGraph::from_pipelines( pipelines, - 1, settings.query_id.clone(), Some(finish_condvar.clone()), + settings.block_limit.clone(), )?; Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper { diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 9f8122eb19dd8..306a684394e8f 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -84,7 +84,12 @@ impl QueryPipelineExecutor { let mut on_finished_chain = pipeline.take_on_finished(); let lock_guards = pipeline.take_lock_guards(); - match RunningGraph::create(pipeline, 1, settings.query_id.clone(), None) { + match RunningGraph::create( + pipeline, + settings.query_id.clone(), + None, + settings.block_limit.clone(), + ) { Err(cause) => { let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); let _ = on_finished_chain.apply(info); @@ -149,7 +154,12 @@ impl QueryPipelineExecutor { .flat_map(|x| x.take_lock_guards()) .collect::>(); - match RunningGraph::from_pipelines(pipelines, 1, settings.query_id.clone(), None) { + match RunningGraph::from_pipelines( + pipelines, + settings.query_id.clone(), + None, + settings.block_limit.clone(), + ) { Err(cause) => { let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); let _ignore_res = finished_chain.apply(info); diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs index bdcd9c1237ec5..9755b7a999a75 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs @@ -296,6 +296,7 @@ mod tests { use databend_common_expression::types::Int32Type; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; + use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_transforms::sort::SimpleRowsAsc; use super::*; @@ -315,7 +316,7 @@ mod tests { let output = OutputPort::create(); let input = InputPort::create(); unsafe { - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); } let stream = BoundedInputStream { diff --git a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs index c997119fcba5d..e82f33f7e6d86 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs @@ -1364,6 +1364,7 @@ mod tests { use databend_common_expression::FromData; use databend_common_functions::aggregates::AggregateFunctionFactory; use databend_common_pipeline_core::processors::connect; + use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -2305,8 +2306,8 @@ mod tests { )?; unsafe { - connect(&input, &upstream_output); - connect(&downstream_input, &output); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input, &output, Arc::new(BlockLimit::default())); } downstream_input.set_need_data(); @@ -2380,8 +2381,8 @@ mod tests { )?; unsafe { - connect(&input, &upstream_output); - connect(&downstream_input, &output); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input, &output, Arc::new(BlockLimit::default())); } downstream_input.set_need_data(); @@ -2477,8 +2478,8 @@ mod tests { )?; unsafe { - connect(&input, &upstream_output); - connect(&downstream_input, &output); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input, &output, Arc::new(BlockLimit::default())); } downstream_input.set_need_data(); diff --git a/src/query/service/tests/it/pipelines/executor/executor_graph.rs b/src/query/service/tests/it/pipelines/executor/executor_graph.rs index 2efe0cfbcf080..742f04e820e74 100644 --- a/src/query/service/tests/it/pipelines/executor/executor_graph.rs +++ b/src/query/service/tests/it/pipelines/executor/executor_graph.rs @@ -456,7 +456,12 @@ fn create_simple_pipeline(ctx: Arc) -> Result> { pipeline.add_pipe(create_transform_pipe(1)?); pipeline.add_pipe(sink_pipe); - RunningGraph::create(pipeline, 1, Arc::new("".to_string()), None) + RunningGraph::create( + pipeline, + Arc::new("".to_string()), + None, + Arc::new(Default::default()), + ) } fn create_parallel_simple_pipeline(ctx: Arc) -> Result> { @@ -468,7 +473,12 @@ fn create_parallel_simple_pipeline(ctx: Arc) -> Result) -> Result> { @@ -484,7 +494,12 @@ fn create_resize_pipeline(ctx: Arc) -> Result> { pipeline.try_resize(2)?; pipeline.add_pipe(sink_pipe); - RunningGraph::create(pipeline, 1, Arc::new("".to_string()), None) + RunningGraph::create( + pipeline, + Arc::new("".to_string()), + None, + Arc::new(Default::default()), + ) } fn create_source_pipe( @@ -562,6 +577,7 @@ async fn create_executor_with_simple_pipeline( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; QueryPipelineExecutor::create(pipeline, settings) } diff --git a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs index 89acb352d3025..45b5e1f161add 100644 --- a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs @@ -47,6 +47,7 @@ async fn test_always_call_on_finished() -> Result<()> { enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; { diff --git a/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs index 77ef2bbe0616c..7680b4fe180ed 100644 --- a/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs +++ b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs @@ -57,6 +57,7 @@ fn create_pipeline( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; let executor = QueryPipelineExecutor::create(pipeline, settings)?; Ok((executor, rx)) diff --git a/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs b/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs index 9b9b90f0484e5..23bbdc608a081 100644 --- a/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs +++ b/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs @@ -123,6 +123,7 @@ async fn apply_snapshot_pruning( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?; diff --git a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs index 7f89d6be3088d..68235ddb0dcd7 100644 --- a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs +++ b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs @@ -124,6 +124,7 @@ async fn apply_snapshot_pruning( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?; diff --git a/tests/sqllogictests/suites/query/join/left_outer.test b/tests/sqllogictests/suites/query/join/left_outer.test index 9bc1c406a224e..d7f4c98cfbf20 100644 --- a/tests/sqllogictests/suites/query/join/left_outer.test +++ b/tests/sqllogictests/suites/query/join/left_outer.test @@ -285,7 +285,7 @@ INSERT INTO t2 VALUES(2, 2), (2, 4), (2, 6), (2, 8), (2, 10); statement ok set max_block_size = 2; -query I +query I rowsort SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a; ---- 2 3 2 10