From a98ed8aaf185190a3bde3b32593d93dc7528c4f4 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Tue, 5 Aug 2025 15:23:33 +0800 Subject: [PATCH 1/7] refactor: add datablock limit --- .../base/src/runtime/runtime_tracker.rs | 20 ++++++++++ .../core/src/processors/block_limit.rs | 39 ++++++++++++++++++ src/query/pipeline/core/src/processors/mod.rs | 2 + .../pipeline/core/src/processors/port.rs | 13 +++--- .../it/pipelines/processors/duplicate.rs | 27 +++++++------ .../it/pipelines/processors/port_test.rs | 7 ++-- .../tests/it/pipelines/processors/resize.rs | 5 ++- .../tests/it/pipelines/processors/shuffle.rs | 27 +++++++------ .../sinks/tests/it/async_mpsc_sink.rs | 5 ++- .../pipeline/sinks/tests/it/sync_mpsc_sink.rs | 5 ++- .../src/pipelines/executor/executor_graph.rs | 40 ++++++++++++++----- .../pipelines/executor/executor_settings.rs | 7 ++++ .../pipelines/executor/pipeline_executor.rs | 4 +- .../executor/query_pipeline_executor.rs | 14 ++++++- .../it/pipelines/executor/executor_graph.rs | 22 ++++++++-- 15 files changed, 181 insertions(+), 56 deletions(-) create mode 100644 src/query/pipeline/core/src/processors/block_limit.rs diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index 4a1f0d95f45a2..27f5fe29d44d1 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -45,6 +45,7 @@ use std::cell::RefCell; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::task::Context; @@ -144,6 +145,8 @@ pub struct TrackingPayload { pub workload_group_resource: Option>, pub perf_enabled: bool, pub process_rows: AtomicUsize, + // Indicate whether datablock is sliced and has remaining data in port + pub has_remaining_data: AtomicBool, } impl Clone for TrackingPayload { @@ -161,6 +164,10 @@ impl Clone for TrackingPayload { process_rows: AtomicUsize::new( self.process_rows.load(std::sync::atomic::Ordering::SeqCst), ), + has_remaining_data: AtomicBool::new( + self.has_remaining_data + .load(std::sync::atomic::Ordering::SeqCst), + ), } } } @@ -243,6 +250,7 @@ impl ThreadTracker { workload_group_resource: None, perf_enabled: false, process_rows: AtomicUsize::new(0), + has_remaining_data: AtomicBool::new(false), }), } } @@ -369,6 +377,18 @@ impl ThreadTracker { }) .unwrap_or(0) } + + pub fn has_remaining_data() -> bool { + TRACKER + .try_with(|tracker| { + tracker + .borrow() + .payload + .has_remaining_data + .load(std::sync::atomic::Ordering::SeqCst) + }) + .unwrap_or(false) + } } pin_project! { diff --git a/src/query/pipeline/core/src/processors/block_limit.rs b/src/query/pipeline/core/src/processors/block_limit.rs new file mode 100644 index 0000000000000..706ea63db6e83 --- /dev/null +++ b/src/query/pipeline/core/src/processors/block_limit.rs @@ -0,0 +1,39 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; + +/// DataBlock limit in rows and bytes +pub struct BlockLimit { + rows: AtomicU64, + bytes: AtomicU64, +} + +impl BlockLimit { + pub fn new(rows: u64, bytes: u64) -> Self { + BlockLimit { + rows: AtomicU64::new(rows), + bytes: AtomicU64::new(bytes), + } + } +} + +impl Default for BlockLimit { + fn default() -> Self { + BlockLimit { + rows: AtomicU64::new(u64::MAX), + bytes: AtomicU64::new(u64::MAX), + } + } +} diff --git a/src/query/pipeline/core/src/processors/mod.rs b/src/query/pipeline/core/src/processors/mod.rs index b9468d09fb984..70fe9b90aaac2 100644 --- a/src/query/pipeline/core/src/processors/mod.rs +++ b/src/query/pipeline/core/src/processors/mod.rs @@ -15,6 +15,7 @@ mod port; mod processor; +mod block_limit; mod duplicate_processor; mod port_trigger; mod profile; @@ -22,6 +23,7 @@ mod resize_processor; mod sequence_group; mod shuffle_processor; +pub use block_limit::BlockLimit; pub use duplicate_processor::DuplicateProcessor; pub use port::connect; pub use port::InputPort; diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index fa257bfa5d207..a912af0c7a30e 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -25,6 +25,7 @@ use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use crate::processors::BlockLimit; use crate::processors::UpdateTrigger; use crate::unsafe_cell_wrap::UnSafeCellWrap; @@ -40,6 +41,7 @@ pub struct SharedData(pub Result); pub struct SharedStatus { data: AtomicPtr, + block_limit: Arc, } unsafe impl Send for SharedStatus {} @@ -57,9 +59,10 @@ impl Drop for SharedStatus { } impl SharedStatus { - pub fn create() -> Arc { + pub fn create(block_limit: Arc) -> Arc { Arc::new(SharedStatus { data: AtomicPtr::new(std::ptr::null_mut()), + block_limit, }) } @@ -134,7 +137,7 @@ pub struct InputPort { impl InputPort { pub fn create() -> Arc { Arc::new(InputPort { - shared: UnSafeCellWrap::create(SharedStatus::create()), + shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))), update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()), }) } @@ -227,7 +230,7 @@ impl OutputPort { pub fn create() -> Arc { Arc::new(OutputPort { record_profile: UnSafeCellWrap::create(false), - shared: UnSafeCellWrap::create(SharedStatus::create()), + shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))), update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()), }) } @@ -318,8 +321,8 @@ impl OutputPort { /// Connect input and output ports. /// /// # Safety -pub unsafe fn connect(input: &InputPort, output: &OutputPort) { - let shared_status = SharedStatus::create(); +pub unsafe fn connect(input: &InputPort, output: &OutputPort, block_limit: Arc) { + let shared_status = SharedStatus::create(block_limit); input.set_shared(shared_status.clone()); output.set_shared(shared_status); diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs b/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs index 9e29ad5c4657a..bad72e6c28626 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::types::Int32Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::DuplicateProcessor; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; @@ -40,9 +43,9 @@ async fn test_duplicate_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); + connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); } downstream_input1.set_need_data(); @@ -68,9 +71,9 @@ async fn test_duplicate_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); + connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); } downstream_input1.finish(); @@ -94,9 +97,9 @@ async fn test_duplicate_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); + connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); } downstream_input1.finish(); @@ -120,9 +123,9 @@ async fn test_duplicate_processor() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input, &upstream_output); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); + connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); } downstream_input1.set_need_data(); diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs b/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs index 07b2ba1b1c2f1..9edd358082e82 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs @@ -22,6 +22,7 @@ use databend_common_expression::local_block_meta_serde; use databend_common_expression::BlockMetaInfo; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -60,7 +61,7 @@ async fn test_port_drop() -> Result<()> { let input = InputPort::create(); let output = OutputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); output.push_data(Ok(DataBlock::empty_with_meta(meta.clone_self()))); assert_eq!(meta.ref_count(), 2); } @@ -98,7 +99,7 @@ async fn test_input_and_output_port() -> Result<()> { let output = OutputPort::create(); let barrier = Arc::new(Barrier::new(2)); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); let thread_1 = Thread::spawn(input_port(input, barrier.clone())); let thread_2 = Thread::spawn(output_port(output, barrier)); @@ -114,7 +115,7 @@ async fn test_input_and_output_flags() -> Result<()> { let input = InputPort::create(); let output = OutputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); output.finish(); assert!(input.is_finished()); diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs b/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs index 0d2dea56e168d..46d08beba9794 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/resize.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -61,7 +62,7 @@ fn connect_inputs(inputs: Vec>) -> Vec> { unsafe { for input in inputs { let output = OutputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); outputs.push(output); } } @@ -75,7 +76,7 @@ fn connect_outputs(outputs: Vec>) -> Vec> { unsafe { for output in outputs { let input = InputPort::create(); - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); inputs.push(input); } } diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs b/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs index 5f419913f1a6b..13fa4efdcc68d 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::types::Int32Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::InputPort; @@ -43,10 +46,10 @@ async fn test_shuffle_output_finish() -> Result<()> { let downstream_input2 = InputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); + connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); + connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); } downstream_input1.finish(); @@ -106,14 +109,14 @@ async fn test_shuffle_processor() -> Result<()> { let downstream_input4 = InputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); - connect(&input3, &upstream_output3); - connect(&input4, &upstream_output4); - connect(&downstream_input1, &output1); - connect(&downstream_input2, &output2); - connect(&downstream_input3, &output3); - connect(&downstream_input4, &output4); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); + connect(&input3, &upstream_output3, Arc::new(BlockLimit::default())); + connect(&input4, &upstream_output4, Arc::new(BlockLimit::default())); + connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); + connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); + connect(&downstream_input3, &output3, Arc::new(BlockLimit::default())); + connect(&downstream_input4, &output4, Arc::new(BlockLimit::default())); } let col1 = Int32Type::from_data(vec![1]); diff --git a/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs b/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs index af8c32988fcef..a0cb1e36d2eac 100644 --- a/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs +++ b/src/query/pipeline/sinks/tests/it/async_mpsc_sink.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -73,8 +74,8 @@ async fn test_async_mpsc_sink() -> Result<()> { let upstream_output2 = OutputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); } upstream_output1.push_data(Ok(DataBlock::new(vec![], 1))); diff --git a/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs b/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs index 45ed96476c6e4..66db07c14dc78 100644 --- a/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs +++ b/src/query/pipeline/sinks/tests/it/sync_mpsc_sink.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::connect; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -71,8 +72,8 @@ async fn test_sync_mpsc_sink() -> Result<()> { let upstream_output2 = OutputPort::create(); unsafe { - connect(&input1, &upstream_output1); - connect(&input2, &upstream_output2); + connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); + connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); } upstream_output1.push_data(Ok(DataBlock::new(vec![], 1))); diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 5092c459a8f31..71262562aea01 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -39,6 +39,7 @@ use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; +use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::PlanScope; use databend_common_pipeline_core::Pipeline; @@ -190,19 +191,24 @@ type StateLockGuard = ExecutingGraph; impl ExecutingGraph { pub fn create( mut pipeline: Pipeline, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result { let mut graph = StableGraph::new(); let mut time_series_profile_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); - Self::init_graph(&mut pipeline, &mut graph, &mut time_series_profile_builder); + Self::init_graph( + &mut pipeline, + &mut graph, + &mut time_series_profile_builder, + block_limit, + ); let executor_stats = ExecutorStats::new(); Ok(ExecutingGraph { graph, finished_nodes: AtomicUsize::new(0), - points: AtomicU64::new((DEFAULT_POINTS << 32) | init_epoch as u64), + points: AtomicU64::new((DEFAULT_POINTS << 32) | 1u64), max_points: AtomicU64::new(DEFAULT_POINTS), query_id, should_finish: AtomicBool::new(false), @@ -215,21 +221,26 @@ impl ExecutingGraph { pub fn from_pipelines( mut pipelines: Vec, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result { let mut graph = StableGraph::new(); let mut time_series_profile_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); for pipeline in &mut pipelines { - Self::init_graph(pipeline, &mut graph, &mut time_series_profile_builder); + Self::init_graph( + pipeline, + &mut graph, + &mut time_series_profile_builder, + block_limit.clone(), + ); } let executor_stats = ExecutorStats::new(); Ok(ExecutingGraph { finished_nodes: AtomicUsize::new(0), graph, - points: AtomicU64::new((DEFAULT_POINTS << 32) | init_epoch as u64), + points: AtomicU64::new((DEFAULT_POINTS << 32) | 1u64), max_points: AtomicU64::new(DEFAULT_POINTS), query_id, should_finish: AtomicBool::new(false), @@ -244,6 +255,7 @@ impl ExecutingGraph { pipeline: &mut Pipeline, graph: &mut StableGraph, EdgeInfo>, time_series_profile_builder: &mut QueryTimeSeriesProfileBuilder, + block_limit: Arc, ) { let offset = graph.node_count(); for node in pipeline.graph.node_weights() { @@ -324,6 +336,7 @@ impl ExecutingGraph { connect( &graph[target_node].inputs_port[target_port], &graph[source_node].outputs_port[source_port], + block_limit.clone(), ); } } @@ -394,6 +407,7 @@ impl ExecutingGraph { let (event, process_rows) = { let mut payload = node.tracking_payload.clone(); payload.process_rows = AtomicUsize::new(0); + payload.has_remaining_data = AtomicBool::new(false); let guard = ThreadTracker::tracking(payload); if state_guard_cache.is_none() { @@ -688,24 +702,28 @@ pub struct RunningGraph(ExecutingGraph); impl RunningGraph { pub fn create( pipeline: Pipeline, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result> { let graph_state = - ExecutingGraph::create(pipeline, init_epoch, query_id, finish_condvar_notify)?; + ExecutingGraph::create(pipeline, query_id, finish_condvar_notify, block_limit)?; debug!("Create running graph:{:?}", graph_state); Ok(Arc::new(RunningGraph(graph_state))) } pub fn from_pipelines( pipelines: Vec, - init_epoch: u32, query_id: Arc, finish_condvar_notify: Option, Condvar)>>, + block_limit: Arc, ) -> Result> { - let graph_state = - ExecutingGraph::from_pipelines(pipelines, init_epoch, query_id, finish_condvar_notify)?; + let graph_state = ExecutingGraph::from_pipelines( + pipelines, + query_id, + finish_condvar_notify, + block_limit, + )?; debug!("Create running graph:{:?}", graph_state); Ok(Arc::new(RunningGraph(graph_state))) } diff --git a/src/query/service/src/pipelines/executor/executor_settings.rs b/src/query/service/src/pipelines/executor/executor_settings.rs index 4eb0df638b1a2..d7f9cbfc00e20 100644 --- a/src/query/service/src/pipelines/executor/executor_settings.rs +++ b/src/query/service/src/pipelines/executor/executor_settings.rs @@ -18,6 +18,7 @@ use std::time::Duration; use databend_common_catalog::table_context::TableContext; use databend_common_config::GlobalConfig; use databend_common_exception::Result; +use databend_common_pipeline_core::processors::BlockLimit; #[derive(Clone)] pub struct ExecutorSettings { @@ -26,6 +27,7 @@ pub struct ExecutorSettings { pub enable_queries_executor: bool, pub max_execute_time_in_seconds: Duration, pub executor_node_id: String, + pub block_limit: Arc, } impl ExecutorSettings { @@ -45,12 +47,17 @@ impl ExecutorSettings { config_enable_queries_executor }; + let max_block_rows = settings.get_max_block_size()?; + let max_block_bytes = settings.get_max_block_bytes()?; + let block_limit = Arc::new(BlockLimit::new(max_block_rows, max_block_bytes)); + Ok(ExecutorSettings { enable_queries_executor, query_id: Arc::new(query_id), max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds), max_threads, executor_node_id: ctx.get_cluster().local_id.clone(), + block_limit, }) } } diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 8b2df0a3e10d7..f066866300e1e 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -75,9 +75,9 @@ impl PipelineExecutor { let graph = RunningGraph::create( pipeline, - 1, settings.query_id.clone(), Some(finish_condvar.clone()), + settings.block_limit.clone(), )?; Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper { @@ -130,9 +130,9 @@ impl PipelineExecutor { let graph = RunningGraph::from_pipelines( pipelines, - 1, settings.query_id.clone(), Some(finish_condvar.clone()), + settings.block_limit.clone(), )?; Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper { diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 9f8122eb19dd8..306a684394e8f 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -84,7 +84,12 @@ impl QueryPipelineExecutor { let mut on_finished_chain = pipeline.take_on_finished(); let lock_guards = pipeline.take_lock_guards(); - match RunningGraph::create(pipeline, 1, settings.query_id.clone(), None) { + match RunningGraph::create( + pipeline, + settings.query_id.clone(), + None, + settings.block_limit.clone(), + ) { Err(cause) => { let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); let _ = on_finished_chain.apply(info); @@ -149,7 +154,12 @@ impl QueryPipelineExecutor { .flat_map(|x| x.take_lock_guards()) .collect::>(); - match RunningGraph::from_pipelines(pipelines, 1, settings.query_id.clone(), None) { + match RunningGraph::from_pipelines( + pipelines, + settings.query_id.clone(), + None, + settings.block_limit.clone(), + ) { Err(cause) => { let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); let _ignore_res = finished_chain.apply(info); diff --git a/src/query/service/tests/it/pipelines/executor/executor_graph.rs b/src/query/service/tests/it/pipelines/executor/executor_graph.rs index 2efe0cfbcf080..742f04e820e74 100644 --- a/src/query/service/tests/it/pipelines/executor/executor_graph.rs +++ b/src/query/service/tests/it/pipelines/executor/executor_graph.rs @@ -456,7 +456,12 @@ fn create_simple_pipeline(ctx: Arc) -> Result> { pipeline.add_pipe(create_transform_pipe(1)?); pipeline.add_pipe(sink_pipe); - RunningGraph::create(pipeline, 1, Arc::new("".to_string()), None) + RunningGraph::create( + pipeline, + Arc::new("".to_string()), + None, + Arc::new(Default::default()), + ) } fn create_parallel_simple_pipeline(ctx: Arc) -> Result> { @@ -468,7 +473,12 @@ fn create_parallel_simple_pipeline(ctx: Arc) -> Result) -> Result> { @@ -484,7 +494,12 @@ fn create_resize_pipeline(ctx: Arc) -> Result> { pipeline.try_resize(2)?; pipeline.add_pipe(sink_pipe); - RunningGraph::create(pipeline, 1, Arc::new("".to_string()), None) + RunningGraph::create( + pipeline, + Arc::new("".to_string()), + None, + Arc::new(Default::default()), + ) } fn create_source_pipe( @@ -562,6 +577,7 @@ async fn create_executor_with_simple_pipeline( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; QueryPipelineExecutor::create(pipeline, settings) } From 0174da4b240611f5395beaf96e348a252f7ab1d6 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Wed, 6 Aug 2025 12:05:50 +0800 Subject: [PATCH 2/7] refactor: add split block --- .../core/src/processors/block_limit.rs | 34 +++++-- .../pipeline/core/src/processors/port.rs | 57 ++++++++++-- .../it/pipelines/processors/duplicate.rs | 48 ++++++++-- .../it/pipelines/processors/port_test.rs | 92 +++++++++++++++++++ .../tests/it/pipelines/processors/shuffle.rs | 36 ++++++-- .../src/pipelines/executor/executor_graph.rs | 18 +++- .../pipelines/executor/executor_settings.rs | 5 +- .../transforms/sort/sort_merge_stream.rs | 3 +- .../transforms/window/transform_window.rs | 13 +-- .../pipelines/executor/pipeline_executor.rs | 1 + .../it/pipelines/transforms/sort/k_way.rs | 1 + .../fuse/pruning_column_oriented_segment.rs | 1 + .../it/storages/fuse/pruning_pipeline.rs | 1 + 13 files changed, 269 insertions(+), 41 deletions(-) diff --git a/src/query/pipeline/core/src/processors/block_limit.rs b/src/query/pipeline/core/src/processors/block_limit.rs index 706ea63db6e83..1fb5d1ca2b488 100644 --- a/src/query/pipeline/core/src/processors/block_limit.rs +++ b/src/query/pipeline/core/src/processors/block_limit.rs @@ -12,28 +12,46 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +// this is used to avoid after limiting by bytes, the rows limit is too small +// TODO: this magic 10 may need to be look backed in the future +const MIN_ROWS_BY_BYTES: usize = 10; /// DataBlock limit in rows and bytes pub struct BlockLimit { - rows: AtomicU64, - bytes: AtomicU64, + rows: AtomicUsize, + bytes: AtomicUsize, } impl BlockLimit { - pub fn new(rows: u64, bytes: u64) -> Self { + pub fn new(rows: usize, bytes: usize) -> Self { BlockLimit { - rows: AtomicU64::new(rows), - bytes: AtomicU64::new(bytes), + rows: AtomicUsize::new(rows), + bytes: AtomicUsize::new(bytes), + } + } + + /// Calculate the number of rows to take based on both row and byte limits + pub fn calculate_limit_rows(&self, total_rows: usize, total_bytes: usize) -> usize { + if total_rows == 0 { + return 0; } + // max with 1 used to avoid division by zero + let average_bytes_per_row = (total_bytes / total_rows).max(1); + let rows_by_bytes = + (self.bytes.load(Ordering::Relaxed) / average_bytes_per_row).max(MIN_ROWS_BY_BYTES); + let rows_limit = self.rows.load(Ordering::Relaxed); + rows_limit.min(rows_by_bytes) } } impl Default for BlockLimit { fn default() -> Self { BlockLimit { - rows: AtomicU64::new(u64::MAX), - bytes: AtomicU64::new(u64::MAX), + rows: AtomicUsize::new(usize::MAX), + bytes: AtomicUsize::new(usize::MAX), } } } diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index a912af0c7a30e..b7761b81e5a57 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -127,6 +127,11 @@ impl SharedStatus { pub fn get_flags(&self) -> usize { self.data.load(Ordering::SeqCst) as usize & FLAGS_MASK } + + #[inline(always)] + pub fn get_block_limit(&self) -> &Arc { + &self.block_limit + } } pub struct InputPort { @@ -187,24 +192,56 @@ impl InputPort { (self.shared.get_flags() & HAS_DATA) != 0 } - #[inline(always)] pub fn pull_data(&self) -> Option> { unsafe { UpdateTrigger::update_input(&self.update_trigger); - let unset_flags = HAS_DATA | NEED_DATA; - match self.shared.swap(std::ptr::null_mut(), 0, unset_flags) { - address if address.is_null() => None, - address => { - let block = (*Box::from_raw(address)).0; - if let Ok(data_block) = block.as_ref() { - ExecutorStats::record_thread_tracker(data_block.num_rows()); - } - Some(block) + + // First, swap out the data without unsetting flags to prevent race conditions + let address = self.shared.swap(std::ptr::null_mut(), 0, 0); + + if address.is_null() { + // No data available, now safe to unset flags + self.shared.set_flags(0, HAS_DATA | NEED_DATA); + return None; + } + + let shared_data = Box::from_raw(address); + match shared_data.0 { + Ok(data_block) => self.process_data_block(data_block), + Err(e) => { + // Error case, unset both flags + self.shared.set_flags(0, HAS_DATA | NEED_DATA); + Some(Err(e)) } } } } + fn process_data_block(&self, data_block: DataBlock) -> Option> { + let block_limit = self.shared.get_block_limit(); + let limit_rows = + block_limit.calculate_limit_rows(data_block.num_rows(), data_block.memory_size()); + + if data_block.num_rows() > limit_rows && limit_rows > 0 { + // Need to split the block + let taken_block = data_block.slice(0..limit_rows); + let remaining_block = data_block.slice(limit_rows..data_block.num_rows()); + + let remaining_data = Box::new(SharedData(Ok(remaining_block))); + self.shared.swap(Box::into_raw(remaining_data), 0, 0); + + ExecutorStats::record_thread_tracker(taken_block.num_rows()); + Some(Ok(taken_block)) + } else { + // No need to split, take the whole block + // Unset both HAS_DATA and NEED_DATA flags + self.shared.set_flags(0, HAS_DATA | NEED_DATA); + + ExecutorStats::record_thread_tracker(data_block.num_rows()); + Some(Ok(data_block)) + } + } + /// # Safety /// /// Method is thread unsafe and require thread safe call diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs b/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs index bad72e6c28626..1857005c2bff7 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs @@ -44,8 +44,16 @@ async fn test_duplicate_output_finish() -> Result<()> { unsafe { connect(&input, &upstream_output, Arc::new(BlockLimit::default())); - connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); - connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.set_need_data(); @@ -72,8 +80,16 @@ async fn test_duplicate_output_finish() -> Result<()> { unsafe { connect(&input, &upstream_output, Arc::new(BlockLimit::default())); - connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); - connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.finish(); @@ -98,8 +114,16 @@ async fn test_duplicate_output_finish() -> Result<()> { unsafe { connect(&input, &upstream_output, Arc::new(BlockLimit::default())); - connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); - connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.finish(); @@ -124,8 +148,16 @@ async fn test_duplicate_processor() -> Result<()> { unsafe { connect(&input, &upstream_output, Arc::new(BlockLimit::default())); - connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); - connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.set_need_data(); diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs b/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs index 9edd358082e82..566ce7a081cca 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs @@ -19,8 +19,10 @@ use databend_common_base::runtime::Thread; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::local_block_meta_serde; +use databend_common_expression::types::Int32Type; use databend_common_expression::BlockMetaInfo; use databend_common_expression::DataBlock; +use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::InputPort; @@ -128,3 +130,93 @@ async fn test_input_and_output_flags() -> Result<()> { // assert_eq!(!output.can_push()); Ok(()) } + +#[test] +fn test_block_limit_splitting() -> Result<()> { + unsafe { + let input = InputPort::create(); + let output = OutputPort::create(); + + // Create a BlockLimit with small row limit to trigger splitting + let block_limit = Arc::new(BlockLimit::new(10, 1024 * 1024)); // 10 rows max, 1MB bytes + connect(&input, &output, block_limit); + + // Create a block with 30 rows + let mut data = Vec::with_capacity(30); + for i in 0..30 { + data.push(i); + } + let col = Int32Type::from_data(data); + let block = DataBlock::new_from_columns(vec![col]); + + // Push the large block + output.push_data(Ok(block.clone())); + + // First pull should get 10 rows + input.set_need_data(); + assert!(input.has_data()); + let pulled_block = input.pull_data().unwrap()?; + assert_eq!(pulled_block.num_rows(), 10); + + // After first pull, should still have data (remaining 20 rows) + assert!(input.has_data()); + + // Second pull should get another 10 rows + let pulled_block = input.pull_data().unwrap()?; + assert_eq!(pulled_block.num_rows(), 10); + + // Should still have data (remaining 10 rows) + assert!(input.has_data()); + + // Third pull should get the last 10 rows + let pulled_block = input.pull_data().unwrap()?; + assert_eq!(pulled_block.num_rows(), 10); + + // Now should have no data + assert!(!input.has_data()); + + // Trying to pull again should return None + let result = input.pull_data(); + assert!(result.is_none()); + } + + Ok(()) +} + +#[test] +fn test_block_limit_no_splitting() -> Result<()> { + unsafe { + let input = InputPort::create(); + let output = OutputPort::create(); + + // Create a BlockLimit with large limits (no splitting should occur) + let block_limit = Arc::new(BlockLimit::new(1000, 10 * 1024 * 1024)); // 1000 rows, 10MB + connect(&input, &output, block_limit); + + // Create a small block with 30 rows + let mut data = Vec::with_capacity(30); + for i in 0..30 { + data.push(i); + } + let col = Int32Type::from_data(data); + let block = DataBlock::new_from_columns(vec![col]); + + // Push the block + output.push_data(Ok(block.clone())); + + // Pull should get all 30 rows at once + input.set_need_data(); + assert!(input.has_data()); + let pulled_block = input.pull_data().unwrap().unwrap(); + assert_eq!(pulled_block.num_rows(), 30); + + // Should have no more data + assert!(!input.has_data()); + + // Trying to pull again should return None + let result = input.pull_data(); + assert!(result.is_none()); + } + + Ok(()) +} diff --git a/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs b/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs index 13fa4efdcc68d..5973d2f65b2ab 100644 --- a/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs +++ b/src/query/pipeline/core/tests/it/pipelines/processors/shuffle.rs @@ -48,8 +48,16 @@ async fn test_shuffle_output_finish() -> Result<()> { unsafe { connect(&input1, &upstream_output1, Arc::new(BlockLimit::default())); connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); - connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); - connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); } downstream_input1.finish(); @@ -113,10 +121,26 @@ async fn test_shuffle_processor() -> Result<()> { connect(&input2, &upstream_output2, Arc::new(BlockLimit::default())); connect(&input3, &upstream_output3, Arc::new(BlockLimit::default())); connect(&input4, &upstream_output4, Arc::new(BlockLimit::default())); - connect(&downstream_input1, &output1, Arc::new(BlockLimit::default())); - connect(&downstream_input2, &output2, Arc::new(BlockLimit::default())); - connect(&downstream_input3, &output3, Arc::new(BlockLimit::default())); - connect(&downstream_input4, &output4, Arc::new(BlockLimit::default())); + connect( + &downstream_input1, + &output1, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input2, + &output2, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input3, + &output3, + Arc::new(BlockLimit::default()), + ); + connect( + &downstream_input4, + &output4, + Arc::new(BlockLimit::default()), + ); } let col1 = Int32Type::from_data(vec![1]); diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 71262562aea01..380042b4b397a 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -414,7 +414,23 @@ impl ExecutingGraph { state_guard_cache = Some(node.state.lock().unwrap()); } - let event = node.processor.event(event_cause)?; + let mut event = node.processor.event(event_cause.clone())?; + let mut count = 0; + while (!matches!(event, Event::Sync) || !matches!(event, Event::Async)) + && ThreadTracker::has_remaining_data() + { + count += 1; + if count > 100000 { + warn!( + "Node {:?} has been processing for too long, event: {:?}, cause: {:?}", + node.processor.id(), + event, + event_cause + ); + break; + } + event = node.processor.event(event_cause.clone())?; + } let process_rows = ThreadTracker::process_rows(); match guard.flush() { Ok(_) => Ok((event, process_rows)), diff --git a/src/query/service/src/pipelines/executor/executor_settings.rs b/src/query/service/src/pipelines/executor/executor_settings.rs index d7f9cbfc00e20..0237c406de5bf 100644 --- a/src/query/service/src/pipelines/executor/executor_settings.rs +++ b/src/query/service/src/pipelines/executor/executor_settings.rs @@ -49,7 +49,10 @@ impl ExecutorSettings { let max_block_rows = settings.get_max_block_size()?; let max_block_bytes = settings.get_max_block_bytes()?; - let block_limit = Arc::new(BlockLimit::new(max_block_rows, max_block_bytes)); + let block_limit = Arc::new(BlockLimit::new( + max_block_rows as usize, + max_block_bytes as usize, + )); Ok(ExecutorSettings { enable_queries_executor, diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs index bdcd9c1237ec5..9755b7a999a75 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs @@ -296,6 +296,7 @@ mod tests { use databend_common_expression::types::Int32Type; use databend_common_expression::FromData; use databend_common_pipeline_core::processors::connect; + use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_transforms::sort::SimpleRowsAsc; use super::*; @@ -315,7 +316,7 @@ mod tests { let output = OutputPort::create(); let input = InputPort::create(); unsafe { - connect(&input, &output); + connect(&input, &output, Arc::new(BlockLimit::default())); } let stream = BoundedInputStream { diff --git a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs index c997119fcba5d..e82f33f7e6d86 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs @@ -1364,6 +1364,7 @@ mod tests { use databend_common_expression::FromData; use databend_common_functions::aggregates::AggregateFunctionFactory; use databend_common_pipeline_core::processors::connect; + use databend_common_pipeline_core::processors::BlockLimit; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -2305,8 +2306,8 @@ mod tests { )?; unsafe { - connect(&input, &upstream_output); - connect(&downstream_input, &output); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input, &output, Arc::new(BlockLimit::default())); } downstream_input.set_need_data(); @@ -2380,8 +2381,8 @@ mod tests { )?; unsafe { - connect(&input, &upstream_output); - connect(&downstream_input, &output); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input, &output, Arc::new(BlockLimit::default())); } downstream_input.set_need_data(); @@ -2477,8 +2478,8 @@ mod tests { )?; unsafe { - connect(&input, &upstream_output); - connect(&downstream_input, &output); + connect(&input, &upstream_output, Arc::new(BlockLimit::default())); + connect(&downstream_input, &output, Arc::new(BlockLimit::default())); } downstream_input.set_need_data(); diff --git a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs index 89acb352d3025..45b5e1f161add 100644 --- a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs @@ -47,6 +47,7 @@ async fn test_always_call_on_finished() -> Result<()> { enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; { diff --git a/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs index 77ef2bbe0616c..7680b4fe180ed 100644 --- a/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs +++ b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs @@ -57,6 +57,7 @@ fn create_pipeline( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; let executor = QueryPipelineExecutor::create(pipeline, settings)?; Ok((executor, rx)) diff --git a/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs b/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs index 9b9b90f0484e5..23bbdc608a081 100644 --- a/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs +++ b/src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs @@ -123,6 +123,7 @@ async fn apply_snapshot_pruning( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?; diff --git a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs index 7f89d6be3088d..68235ddb0dcd7 100644 --- a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs +++ b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs @@ -124,6 +124,7 @@ async fn apply_snapshot_pruning( enable_queries_executor: false, max_threads: 8, executor_node_id: "".to_string(), + block_limit: Arc::new(Default::default()), }; let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?; From 582190a1f63c1ed5689250c1b7cb27ca5ea9c208 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Thu, 7 Aug 2025 16:01:47 +0800 Subject: [PATCH 3/7] fix hang --- .../base/src/runtime/runtime_tracker.rs | 21 +++------- .../pipeline/core/src/processors/port.rs | 2 + .../core/src/processors/port_trigger.rs | 1 + .../src/pipelines/executor/executor_graph.rs | 40 +++++++++++-------- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index 27f5fe29d44d1..1f14355e384e3 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -146,7 +146,7 @@ pub struct TrackingPayload { pub perf_enabled: bool, pub process_rows: AtomicUsize, // Indicate whether datablock is sliced and has remaining data in port - pub has_remaining_data: AtomicBool, + pub has_remaining_data: Arc, } impl Clone for TrackingPayload { @@ -164,10 +164,7 @@ impl Clone for TrackingPayload { process_rows: AtomicUsize::new( self.process_rows.load(std::sync::atomic::Ordering::SeqCst), ), - has_remaining_data: AtomicBool::new( - self.has_remaining_data - .load(std::sync::atomic::Ordering::SeqCst), - ), + has_remaining_data: self.has_remaining_data.clone(), } } } @@ -250,7 +247,7 @@ impl ThreadTracker { workload_group_resource: None, perf_enabled: false, process_rows: AtomicUsize::new(0), - has_remaining_data: AtomicBool::new(false), + has_remaining_data: Arc::new(AtomicBool::new(false)), }), } } @@ -378,16 +375,10 @@ impl ThreadTracker { .unwrap_or(0) } - pub fn has_remaining_data() -> bool { + pub fn has_remaining_data() -> Arc { TRACKER - .try_with(|tracker| { - tracker - .borrow() - .payload - .has_remaining_data - .load(std::sync::atomic::Ordering::SeqCst) - }) - .unwrap_or(false) + .try_with(|tracker| tracker.borrow().payload.has_remaining_data.clone()) + .unwrap_or(Arc::new(AtomicBool::new(false))) } } diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index b7761b81e5a57..dca4e5cde4bfe 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -21,6 +21,7 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::ExecutorStats; use databend_common_base::runtime::QueryTimeSeriesProfile; +use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -230,6 +231,7 @@ impl InputPort { let remaining_data = Box::new(SharedData(Ok(remaining_block))); self.shared.swap(Box::into_raw(remaining_data), 0, 0); + ThreadTracker::has_remaining_data().store(true, Ordering::SeqCst); ExecutorStats::record_thread_tracker(taken_block.num_rows()); Some(Ok(taken_block)) } else { diff --git a/src/query/pipeline/core/src/processors/port_trigger.rs b/src/query/pipeline/core/src/processors/port_trigger.rs index 7a02cfae185cb..5aed03ca14b0e 100644 --- a/src/query/pipeline/core/src/processors/port_trigger.rs +++ b/src/query/pipeline/core/src/processors/port_trigger.rs @@ -33,6 +33,7 @@ unsafe impl Send for UpdateList {} unsafe impl Sync for UpdateList {} +#[derive(Debug, Clone)] pub enum DirectedEdge { Source(EdgeIndex), Target(EdgeIndex), diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 380042b4b397a..92cc90d45d4b9 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -47,6 +47,7 @@ use databend_common_pipeline_core::PlanProfile; use databend_common_storages_system::QueryExecutionStatsQueue; use fastrace::prelude::*; use log::debug; +use log::error; use log::trace; use log::warn; use parking_lot::Condvar; @@ -404,36 +405,43 @@ impl ExecutingGraph { if let Some(schedule_index) = need_schedule_nodes.pop_front() { let node = &locker.graph[schedule_index]; + let mut final_event; let (event, process_rows) = { + let mut inner_event_cause = event_cause.clone(); + + let has_remaining_data = Arc::new(AtomicBool::new(false)); + let mut payload = node.tracking_payload.clone(); payload.process_rows = AtomicUsize::new(0); - payload.has_remaining_data = AtomicBool::new(false); + payload.has_remaining_data = has_remaining_data.clone(); let guard = ThreadTracker::tracking(payload); if state_guard_cache.is_none() { state_guard_cache = Some(node.state.lock().unwrap()); } + let mut cnt = 0; - let mut event = node.processor.event(event_cause.clone())?; - let mut count = 0; - while (!matches!(event, Event::Sync) || !matches!(event, Event::Async)) - && ThreadTracker::has_remaining_data() - { - count += 1; - if count > 100000 { - warn!( - "Node {:?} has been processing for too long, event: {:?}, cause: {:?}", - node.processor.id(), - event, - event_cause - ); + loop { + let event = node.processor.event(inner_event_cause.clone())?; + final_event = event; + // If the processor is finished or needs different handling, stop retrigger processor's event + if matches!(final_event, Event::Finished | Event::Async | Event::Sync) { break; } - event = node.processor.event(event_cause.clone())?; + // If no remaining data, we're done + if !has_remaining_data.swap(false, Ordering::SeqCst) { + break; + } + inner_event_cause = EventCause::Input(0); + cnt += 1; + if cnt > 10 { + error!("Infinite loop detected in processor event handling for node: {:?}, event: {:?}, cause: {:?}", + node.processor.id(), final_event, inner_event_cause); + } } let process_rows = ThreadTracker::process_rows(); match guard.flush() { - Ok(_) => Ok((event, process_rows)), + Ok(_) => Ok((final_event, process_rows)), Err(out_of_limit) => { Err(ErrorCode::PanicError(format!("{:?}", out_of_limit))) } From 03f2a4f847bee7a6b7a6d00056e8f4c9dc743c63 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 8 Aug 2025 09:41:31 +0800 Subject: [PATCH 4/7] debug --- src/query/pipeline/core/src/processors/port.rs | 13 +++++++++++++ .../core/src/processors/resize_processor.rs | 9 +++++++++ .../src/pipelines/executor/executor_graph.rs | 15 +++++++++++++-- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index dca4e5cde4bfe..b91051012d41b 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -25,6 +25,7 @@ use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use log::info; use crate::processors::BlockLimit; use crate::processors::UpdateTrigger; @@ -43,6 +44,7 @@ pub struct SharedData(pub Result); pub struct SharedStatus { data: AtomicPtr, block_limit: Arc, + // TODO: add new status if slice } unsafe impl Send for SharedStatus {} @@ -154,6 +156,7 @@ impl InputPort { let flags = self.shared.set_flags(IS_FINISHED, IS_FINISHED); if flags & IS_FINISHED == 0 { + info!("[input_port] trigger input port finish"); UpdateTrigger::update_input(&self.update_trigger); } } @@ -178,6 +181,7 @@ impl InputPort { unsafe { let flags = self.shared.set_flags(NEED_DATA, NEED_DATA); if flags & NEED_DATA == 0 { + info!("[input_port] trigger input port set need data"); UpdateTrigger::update_input(&self.update_trigger); } } @@ -195,6 +199,7 @@ impl InputPort { pub fn pull_data(&self) -> Option> { unsafe { + info!("[input_port] trigger input port pull data"); UpdateTrigger::update_input(&self.update_trigger); // First, swap out the data without unsetting flags to prevent race conditions @@ -224,6 +229,11 @@ impl InputPort { block_limit.calculate_limit_rows(data_block.num_rows(), data_block.memory_size()); if data_block.num_rows() > limit_rows && limit_rows > 0 { + info!( + "[input_port] pull data with slice, limit/all: {}/{}", + limit_rows, + data_block.num_rows() + ); // Need to split the block let taken_block = data_block.slice(0..limit_rows); let remaining_block = data_block.slice(limit_rows..data_block.num_rows()); @@ -235,6 +245,7 @@ impl InputPort { ExecutorStats::record_thread_tracker(taken_block.num_rows()); Some(Ok(taken_block)) } else { + info!("[input_port] pull data all: {}", data_block.num_rows()); // No need to split, take the whole block // Unset both HAS_DATA and NEED_DATA flags self.shared.set_flags(0, HAS_DATA | NEED_DATA); @@ -277,6 +288,7 @@ impl OutputPort { #[inline(always)] pub fn push_data(&self, data: Result) { unsafe { + info!("[output_port] trigger output port push_data"); UpdateTrigger::update_output(&self.update_trigger); if let Ok(data_block) = &data { @@ -311,6 +323,7 @@ impl OutputPort { let flags = self.shared.set_flags(IS_FINISHED, IS_FINISHED); if flags & IS_FINISHED == 0 { + info!("[output_port] trigger output port finish"); UpdateTrigger::update_output(&self.update_trigger); } } diff --git a/src/query/pipeline/core/src/processors/resize_processor.rs b/src/query/pipeline/core/src/processors/resize_processor.rs index 9bfff2f58183c..a4edcdd1a1f12 100644 --- a/src/query/pipeline/core/src/processors/resize_processor.rs +++ b/src/query/pipeline/core/src/processors/resize_processor.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::sync::Arc; use databend_common_exception::Result; +use log::info; use crate::pipe::PipeItem; use crate::processors::Event; @@ -110,6 +111,10 @@ impl Processor for ResizeProcessor { } } } + info!( + "[resize] event_with_cause: {:?}, input port {:?} output port {:?}", + &cause, &self.waiting_inputs, &self.waiting_outputs + ); while !self.waiting_outputs.is_empty() && !self.waiting_inputs.is_empty() { let output_index = self.waiting_outputs.pop_front().unwrap(); @@ -159,6 +164,10 @@ impl Processor for ResizeProcessor { return Ok(Event::Finished); } + info!( + "[resize] event_with_cause: {:?}, input port {:?} output port {:?}", + &cause, &self.waiting_inputs, &self.waiting_outputs + ); match self.waiting_outputs.is_empty() { true => Ok(Event::NeedConsume), diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 92cc90d45d4b9..dc32e48b913b1 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -48,7 +48,7 @@ use databend_common_storages_system::QueryExecutionStatsQueue; use fastrace::prelude::*; use log::debug; use log::error; -use log::trace; +use log::info; use log::warn; use parking_lot::Condvar; use parking_lot::Mutex; @@ -369,6 +369,9 @@ impl ExecutingGraph { schedule_queue: &mut ScheduleQueue, graph: &Arc, ) -> Result<()> { + info!("------------new--------------"); + let node = &locker.graph[index]; + info!("Node {:?} trigger schedule", node); let mut need_schedule_nodes = VecDeque::new(); let mut need_schedule_edges = VecDeque::new(); @@ -381,6 +384,7 @@ impl ExecutingGraph { if need_schedule_nodes.is_empty() { let edge = need_schedule_edges.pop_front().unwrap(); + info!("Schedule edge: {:?}", edge); let target_index = DirectedEdge::get_target(&edge, &locker.graph)?; event_cause = match edge { @@ -396,6 +400,7 @@ impl ExecutingGraph { let node_state = node.state.lock().unwrap_or_else(PoisonError::into_inner); if matches!(*node_state, State::Idle) { + info!("add new Node: {:?}", target_index); state_guard_cache = Some(node_state); need_schedule_nodes.push_back(target_index); } else { @@ -438,6 +443,12 @@ impl ExecutingGraph { error!("Infinite loop detected in processor event handling for node: {:?}, event: {:?}, cause: {:?}", node.processor.id(), final_event, inner_event_cause); } + info!( + "!!Reschedule node: {:?} {:?} event_cause: {:?}", + node.processor.id(), + node.processor.name(), + inner_event_cause + ); } let process_rows = ThreadTracker::process_rows(); match guard.flush() { @@ -448,7 +459,7 @@ impl ExecutingGraph { } }?; - trace!( + info!( "node id: {:?}, name: {:?}, event: {:?}", node.processor.id(), node.processor.name(), From 2dbd6a80f0d75c3d0f20531ccb73b598da6de297 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Sun, 10 Aug 2025 22:07:42 +0800 Subject: [PATCH 5/7] fixup --- .../base/src/runtime/runtime_tracker.rs | 11 --- .../pipeline/core/src/processors/port.rs | 39 +++++----- .../src/pipelines/executor/executor_graph.rs | 72 +++++++++---------- 3 files changed, 55 insertions(+), 67 deletions(-) diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index 1f14355e384e3..4a1f0d95f45a2 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -45,7 +45,6 @@ use std::cell::RefCell; use std::future::Future; use std::pin::Pin; -use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::task::Context; @@ -145,8 +144,6 @@ pub struct TrackingPayload { pub workload_group_resource: Option>, pub perf_enabled: bool, pub process_rows: AtomicUsize, - // Indicate whether datablock is sliced and has remaining data in port - pub has_remaining_data: Arc, } impl Clone for TrackingPayload { @@ -164,7 +161,6 @@ impl Clone for TrackingPayload { process_rows: AtomicUsize::new( self.process_rows.load(std::sync::atomic::Ordering::SeqCst), ), - has_remaining_data: self.has_remaining_data.clone(), } } } @@ -247,7 +243,6 @@ impl ThreadTracker { workload_group_resource: None, perf_enabled: false, process_rows: AtomicUsize::new(0), - has_remaining_data: Arc::new(AtomicBool::new(false)), }), } } @@ -374,12 +369,6 @@ impl ThreadTracker { }) .unwrap_or(0) } - - pub fn has_remaining_data() -> Arc { - TRACKER - .try_with(|tracker| tracker.borrow().payload.has_remaining_data.clone()) - .unwrap_or(Arc::new(AtomicBool::new(false))) - } } pin_project! { diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index b91051012d41b..f69ffe9486ded 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicPtr; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -21,11 +22,9 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::ExecutorStats; use databend_common_base::runtime::QueryTimeSeriesProfile; -use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use log::info; use crate::processors::BlockLimit; use crate::processors::UpdateTrigger; @@ -44,7 +43,9 @@ pub struct SharedData(pub Result); pub struct SharedStatus { data: AtomicPtr, block_limit: Arc, - // TODO: add new status if slice + // This flag is used to indicate if a slice operation + // has occurred on the data block + slice_occurred: AtomicBool, } unsafe impl Send for SharedStatus {} @@ -66,6 +67,7 @@ impl SharedStatus { Arc::new(SharedStatus { data: AtomicPtr::new(std::ptr::null_mut()), block_limit, + slice_occurred: AtomicBool::new(false), }) } @@ -156,7 +158,6 @@ impl InputPort { let flags = self.shared.set_flags(IS_FINISHED, IS_FINISHED); if flags & IS_FINISHED == 0 { - info!("[input_port] trigger input port finish"); UpdateTrigger::update_input(&self.update_trigger); } } @@ -181,7 +182,7 @@ impl InputPort { unsafe { let flags = self.shared.set_flags(NEED_DATA, NEED_DATA); if flags & NEED_DATA == 0 { - info!("[input_port] trigger input port set need data"); + // info!("[input_port] trigger input port set need data"); UpdateTrigger::update_input(&self.update_trigger); } } @@ -199,7 +200,7 @@ impl InputPort { pub fn pull_data(&self) -> Option> { unsafe { - info!("[input_port] trigger input port pull data"); + // info!("[input_port] trigger input port pull data"); UpdateTrigger::update_input(&self.update_trigger); // First, swap out the data without unsetting flags to prevent race conditions @@ -229,23 +230,22 @@ impl InputPort { block_limit.calculate_limit_rows(data_block.num_rows(), data_block.memory_size()); if data_block.num_rows() > limit_rows && limit_rows > 0 { - info!( - "[input_port] pull data with slice, limit/all: {}/{}", - limit_rows, - data_block.num_rows() - ); + // info!( + // "[input_port] pull data with slice, limit/all: {}/{}", + // limit_rows, + // data_block.num_rows() + // ); // Need to split the block let taken_block = data_block.slice(0..limit_rows); let remaining_block = data_block.slice(limit_rows..data_block.num_rows()); let remaining_data = Box::new(SharedData(Ok(remaining_block))); self.shared.swap(Box::into_raw(remaining_data), 0, 0); - - ThreadTracker::has_remaining_data().store(true, Ordering::SeqCst); + self.shared.slice_occurred.store(true, Ordering::Relaxed); ExecutorStats::record_thread_tracker(taken_block.num_rows()); Some(Ok(taken_block)) } else { - info!("[input_port] pull data all: {}", data_block.num_rows()); + // info!("[input_port] pull data all: {}", data_block.num_rows()); // No need to split, take the whole block // Unset both HAS_DATA and NEED_DATA flags self.shared.set_flags(0, HAS_DATA | NEED_DATA); @@ -268,6 +268,14 @@ impl InputPort { pub unsafe fn set_trigger(&self, update_trigger: *mut UpdateTrigger) { self.update_trigger.set_value(update_trigger) } + + pub fn slice_occurred(&self) -> bool { + self.shared.slice_occurred.load(Ordering::Relaxed) + } + + pub fn reset_slice_occurred(&self) { + self.shared.slice_occurred.store(false, Ordering::Relaxed); + } } pub struct OutputPort { @@ -288,7 +296,7 @@ impl OutputPort { #[inline(always)] pub fn push_data(&self, data: Result) { unsafe { - info!("[output_port] trigger output port push_data"); + // info!("[output_port] trigger output port push_data"); UpdateTrigger::update_output(&self.update_trigger); if let Ok(data_block) = &data { @@ -323,7 +331,6 @@ impl OutputPort { let flags = self.shared.set_flags(IS_FINISHED, IS_FINISHED); if flags & IS_FINISHED == 0 { - info!("[output_port] trigger output port finish"); UpdateTrigger::update_output(&self.update_trigger); } } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index dc32e48b913b1..cfda1d173bb7f 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -47,7 +47,6 @@ use databend_common_pipeline_core::PlanProfile; use databend_common_storages_system::QueryExecutionStatsQueue; use fastrace::prelude::*; use log::debug; -use log::error; use log::info; use log::warn; use parking_lot::Condvar; @@ -297,7 +296,6 @@ impl ExecutingGraph { mut_node.tracking_payload.time_series_profile = Some(query_time_series.clone()); } } - for edge in pipeline.graph.edge_indices() { let index = EdgeIndex::new(edge.index()); if let Some((source, target)) = pipeline.graph.edge_endpoints(index) { @@ -369,9 +367,9 @@ impl ExecutingGraph { schedule_queue: &mut ScheduleQueue, graph: &Arc, ) -> Result<()> { - info!("------------new--------------"); + // info!("[schedule] ------------new--------------"); let node = &locker.graph[index]; - info!("Node {:?} trigger schedule", node); + // info!("[schedule] Node {:?} trigger schedule", node); let mut need_schedule_nodes = VecDeque::new(); let mut need_schedule_edges = VecDeque::new(); @@ -384,7 +382,7 @@ impl ExecutingGraph { if need_schedule_nodes.is_empty() { let edge = need_schedule_edges.pop_front().unwrap(); - info!("Schedule edge: {:?}", edge); + // info!("Schedule edge: {:?}", edge); let target_index = DirectedEdge::get_target(&edge, &locker.graph)?; event_cause = match edge { @@ -400,7 +398,7 @@ impl ExecutingGraph { let node_state = node.state.lock().unwrap_or_else(PoisonError::into_inner); if matches!(*node_state, State::Idle) { - info!("add new Node: {:?}", target_index); + // info!("[schedule] add new Node: {:?}", node); state_guard_cache = Some(node_state); need_schedule_nodes.push_back(target_index); } else { @@ -410,49 +408,20 @@ impl ExecutingGraph { if let Some(schedule_index) = need_schedule_nodes.pop_front() { let node = &locker.graph[schedule_index]; - let mut final_event; + // info!("[schedule] Schedule node: {:?}", node); let (event, process_rows) = { - let mut inner_event_cause = event_cause.clone(); - - let has_remaining_data = Arc::new(AtomicBool::new(false)); - let mut payload = node.tracking_payload.clone(); payload.process_rows = AtomicUsize::new(0); - payload.has_remaining_data = has_remaining_data.clone(); let guard = ThreadTracker::tracking(payload); if state_guard_cache.is_none() { state_guard_cache = Some(node.state.lock().unwrap()); } - let mut cnt = 0; - - loop { - let event = node.processor.event(inner_event_cause.clone())?; - final_event = event; - // If the processor is finished or needs different handling, stop retrigger processor's event - if matches!(final_event, Event::Finished | Event::Async | Event::Sync) { - break; - } - // If no remaining data, we're done - if !has_remaining_data.swap(false, Ordering::SeqCst) { - break; - } - inner_event_cause = EventCause::Input(0); - cnt += 1; - if cnt > 10 { - error!("Infinite loop detected in processor event handling for node: {:?}, event: {:?}, cause: {:?}", - node.processor.id(), final_event, inner_event_cause); - } - info!( - "!!Reschedule node: {:?} {:?} event_cause: {:?}", - node.processor.id(), - node.processor.name(), - inner_event_cause - ); - } + + let event = node.processor.event(event_cause)?; let process_rows = ThreadTracker::process_rows(); match guard.flush() { - Ok(_) => Ok((final_event, process_rows)), + Ok(_) => Ok((event, process_rows)), Err(out_of_limit) => { Err(ErrorCode::PanicError(format!("{:?}", out_of_limit))) } @@ -492,8 +461,31 @@ impl ExecutingGraph { } }; - node.trigger(&mut need_schedule_edges); + let mut new_need_schedule_edges = VecDeque::new(); + node.trigger(&mut new_need_schedule_edges); + while let Some(edge) = new_need_schedule_edges.pop_back() { + if let DirectedEdge::Target(index) = edge { + let port_index = locker.graph.edge_weight(index).unwrap().input_index; + let port = node.inputs_port[port_index].as_ref(); + if port.slice_occurred() { + port.reset_slice_occurred(); + // info!( + // "[schedule!!] detect slice occurred on edge: {:?}", + // DirectedEdge::Source(index) + // ); + need_schedule_edges.push_front(DirectedEdge::Source(index)); + } + } + need_schedule_edges.push_front(edge); + } + *state_guard_cache.unwrap() = processor_state; + + // info!( + // "[schedule] node {} trigger edge: {:?}", + // node.processor.name(), + // need_schedule_edges + // ); } } From 5b9dcda7d200119e67257b9757c023f0469e907c Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 11 Aug 2025 09:23:30 +0800 Subject: [PATCH 6/7] fix lint --- src/query/service/src/pipelines/executor/executor_graph.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index cfda1d173bb7f..0202d2e978c7d 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -368,7 +368,7 @@ impl ExecutingGraph { graph: &Arc, ) -> Result<()> { // info!("[schedule] ------------new--------------"); - let node = &locker.graph[index]; + let _node = &locker.graph[index]; // info!("[schedule] Node {:?} trigger schedule", node); let mut need_schedule_nodes = VecDeque::new(); let mut need_schedule_edges = VecDeque::new(); From 8d7db4289bcb5bce8c560725ac3b24fd19511dca Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 12 Aug 2025 13:46:23 +0800 Subject: [PATCH 7/7] save --- .../transforms/sort/k_way_merge_sort_partition.rs | 6 +++++- tests/sqllogictests/suites/query/join/left_outer.test | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs index 182c1c77c9f70..6a320b4e90a50 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs @@ -245,4 +245,8 @@ pub struct SortTaskMeta { } #[typetag::serde(name = "sort_task")] -impl BlockMetaInfo for SortTaskMeta {} +impl BlockMetaInfo for SortTaskMeta { + fn clone_self(&self) -> Box { + Box::new(*self) + } +} diff --git a/tests/sqllogictests/suites/query/join/left_outer.test b/tests/sqllogictests/suites/query/join/left_outer.test index 9bc1c406a224e..d7f4c98cfbf20 100644 --- a/tests/sqllogictests/suites/query/join/left_outer.test +++ b/tests/sqllogictests/suites/query/join/left_outer.test @@ -285,7 +285,7 @@ INSERT INTO t2 VALUES(2, 2), (2, 4), (2, 6), (2, 8), (2, 10); statement ok set max_block_size = 2; -query I +query I rowsort SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a; ---- 2 3 2 10