Skip to content

refactor(executor): add max rows/bytes limit when schedule executor task #18433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/query/pipeline/core/src/processors/block_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

// this is used to avoid after limiting by bytes, the rows limit is too small
// TODO: this magic 10 may need to be look backed in the future
const MIN_ROWS_BY_BYTES: usize = 10;

/// DataBlock limit in rows and bytes
pub struct BlockLimit {
rows: AtomicUsize,
bytes: AtomicUsize,
}

impl BlockLimit {
pub fn new(rows: usize, bytes: usize) -> Self {
BlockLimit {
rows: AtomicUsize::new(rows),
bytes: AtomicUsize::new(bytes),
}
}

/// Calculate the number of rows to take based on both row and byte limits
pub fn calculate_limit_rows(&self, total_rows: usize, total_bytes: usize) -> usize {
if total_rows == 0 {
return 0;
}
// max with 1 used to avoid division by zero
let average_bytes_per_row = (total_bytes / total_rows).max(1);
let rows_by_bytes =
(self.bytes.load(Ordering::Relaxed) / average_bytes_per_row).max(MIN_ROWS_BY_BYTES);
let rows_limit = self.rows.load(Ordering::Relaxed);
rows_limit.min(rows_by_bytes)
}
}

impl Default for BlockLimit {
fn default() -> Self {
BlockLimit {
rows: AtomicUsize::new(usize::MAX),
bytes: AtomicUsize::new(usize::MAX),
}
}
}
2 changes: 2 additions & 0 deletions src/query/pipeline/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
mod port;
mod processor;

mod block_limit;
mod duplicate_processor;
mod port_trigger;
mod profile;
mod resize_processor;
mod sequence_group;
mod shuffle_processor;

pub use block_limit::BlockLimit;
pub use duplicate_processor::DuplicateProcessor;
pub use port::connect;
pub use port::InputPort;
Expand Down
92 changes: 77 additions & 15 deletions src/query/pipeline/core/src/processors/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -25,6 +26,7 @@ use databend_common_base::runtime::TimeSeriesProfileName;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;

use crate::processors::BlockLimit;
use crate::processors::UpdateTrigger;
use crate::unsafe_cell_wrap::UnSafeCellWrap;

Expand All @@ -40,6 +42,10 @@ pub struct SharedData(pub Result<DataBlock>);

pub struct SharedStatus {
data: AtomicPtr<SharedData>,
block_limit: Arc<BlockLimit>,
// This flag is used to indicate if a slice operation
// has occurred on the data block
slice_occurred: AtomicBool,
}

unsafe impl Send for SharedStatus {}
Expand All @@ -57,9 +63,11 @@ impl Drop for SharedStatus {
}

impl SharedStatus {
pub fn create() -> Arc<SharedStatus> {
pub fn create(block_limit: Arc<BlockLimit>) -> Arc<SharedStatus> {
Arc::new(SharedStatus {
data: AtomicPtr::new(std::ptr::null_mut()),
block_limit,
slice_occurred: AtomicBool::new(false),
})
}

Expand Down Expand Up @@ -124,6 +132,11 @@ impl SharedStatus {
pub fn get_flags(&self) -> usize {
self.data.load(Ordering::SeqCst) as usize & FLAGS_MASK
}

#[inline(always)]
pub fn get_block_limit(&self) -> &Arc<BlockLimit> {
&self.block_limit
}
}

pub struct InputPort {
Expand All @@ -134,7 +147,7 @@ pub struct InputPort {
impl InputPort {
pub fn create() -> Arc<InputPort> {
Arc::new(InputPort {
shared: UnSafeCellWrap::create(SharedStatus::create()),
shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))),
update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()),
})
}
Expand Down Expand Up @@ -169,6 +182,7 @@ impl InputPort {
unsafe {
let flags = self.shared.set_flags(NEED_DATA, NEED_DATA);
if flags & NEED_DATA == 0 {
// info!("[input_port] trigger input port set need data");
UpdateTrigger::update_input(&self.update_trigger);
}
}
Expand All @@ -184,24 +198,63 @@ impl InputPort {
(self.shared.get_flags() & HAS_DATA) != 0
}

#[inline(always)]
pub fn pull_data(&self) -> Option<Result<DataBlock>> {
unsafe {
// info!("[input_port] trigger input port pull data");
UpdateTrigger::update_input(&self.update_trigger);
let unset_flags = HAS_DATA | NEED_DATA;
match self.shared.swap(std::ptr::null_mut(), 0, unset_flags) {
address if address.is_null() => None,
address => {
let block = (*Box::from_raw(address)).0;
if let Ok(data_block) = block.as_ref() {
ExecutorStats::record_thread_tracker(data_block.num_rows());
}
Some(block)

// First, swap out the data without unsetting flags to prevent race conditions
let address = self.shared.swap(std::ptr::null_mut(), 0, 0);

if address.is_null() {
// No data available, now safe to unset flags
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
return None;
}

let shared_data = Box::from_raw(address);
match shared_data.0 {
Ok(data_block) => self.process_data_block(data_block),
Err(e) => {
// Error case, unset both flags
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
Some(Err(e))
}
}
}
}

fn process_data_block(&self, data_block: DataBlock) -> Option<Result<DataBlock>> {
let block_limit = self.shared.get_block_limit();
let limit_rows =
block_limit.calculate_limit_rows(data_block.num_rows(), data_block.memory_size());

if data_block.num_rows() > limit_rows && limit_rows > 0 {
// info!(
// "[input_port] pull data with slice, limit/all: {}/{}",
// limit_rows,
// data_block.num_rows()
// );
// Need to split the block
let taken_block = data_block.slice(0..limit_rows);
let remaining_block = data_block.slice(limit_rows..data_block.num_rows());

let remaining_data = Box::new(SharedData(Ok(remaining_block)));
self.shared.swap(Box::into_raw(remaining_data), 0, 0);
self.shared.slice_occurred.store(true, Ordering::Relaxed);
ExecutorStats::record_thread_tracker(taken_block.num_rows());
Some(Ok(taken_block))
} else {
// info!("[input_port] pull data all: {}", data_block.num_rows());
// No need to split, take the whole block
// Unset both HAS_DATA and NEED_DATA flags
self.shared.set_flags(0, HAS_DATA | NEED_DATA);

ExecutorStats::record_thread_tracker(data_block.num_rows());
Some(Ok(data_block))
}
}

/// # Safety
///
/// Method is thread unsafe and require thread safe call
Expand All @@ -215,6 +268,14 @@ impl InputPort {
pub unsafe fn set_trigger(&self, update_trigger: *mut UpdateTrigger) {
self.update_trigger.set_value(update_trigger)
}

pub fn slice_occurred(&self) -> bool {
self.shared.slice_occurred.load(Ordering::Relaxed)
}

pub fn reset_slice_occurred(&self) {
self.shared.slice_occurred.store(false, Ordering::Relaxed);
}
}

pub struct OutputPort {
Expand All @@ -227,14 +288,15 @@ impl OutputPort {
pub fn create() -> Arc<OutputPort> {
Arc::new(OutputPort {
record_profile: UnSafeCellWrap::create(false),
shared: UnSafeCellWrap::create(SharedStatus::create()),
shared: UnSafeCellWrap::create(SharedStatus::create(Arc::new(Default::default()))),
update_trigger: UnSafeCellWrap::create(std::ptr::null_mut()),
})
}

#[inline(always)]
pub fn push_data(&self, data: Result<DataBlock>) {
unsafe {
// info!("[output_port] trigger output port push_data");
UpdateTrigger::update_output(&self.update_trigger);

if let Ok(data_block) = &data {
Expand Down Expand Up @@ -318,8 +380,8 @@ impl OutputPort {
/// Connect input and output ports.
///
/// # Safety
pub unsafe fn connect(input: &InputPort, output: &OutputPort) {
let shared_status = SharedStatus::create();
pub unsafe fn connect(input: &InputPort, output: &OutputPort, block_limit: Arc<BlockLimit>) {
let shared_status = SharedStatus::create(block_limit);

input.set_shared(shared_status.clone());
output.set_shared(shared_status);
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/core/src/processors/port_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ unsafe impl Send for UpdateList {}

unsafe impl Sync for UpdateList {}

#[derive(Debug, Clone)]
pub enum DirectedEdge {
Source(EdgeIndex),
Target(EdgeIndex),
Expand Down
9 changes: 9 additions & 0 deletions src/query/pipeline/core/src/processors/resize_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::VecDeque;
use std::sync::Arc;

use databend_common_exception::Result;
use log::info;

use crate::pipe::PipeItem;
use crate::processors::Event;
Expand Down Expand Up @@ -110,6 +111,10 @@ impl Processor for ResizeProcessor {
}
}
}
info!(
"[resize] event_with_cause: {:?}, input port {:?} output port {:?}",
&cause, &self.waiting_inputs, &self.waiting_outputs
);

while !self.waiting_outputs.is_empty() && !self.waiting_inputs.is_empty() {
let output_index = self.waiting_outputs.pop_front().unwrap();
Expand Down Expand Up @@ -159,6 +164,10 @@ impl Processor for ResizeProcessor {

return Ok(Event::Finished);
}
info!(
"[resize] event_with_cause: {:?}, input port {:?} output port {:?}",
&cause, &self.waiting_inputs, &self.waiting_outputs
);

match self.waiting_outputs.is_empty() {
true => Ok(Event::NeedConsume),
Expand Down
59 changes: 47 additions & 12 deletions src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::types::Int32Type;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_pipeline_core::processors::connect;
use databend_common_pipeline_core::processors::BlockLimit;
use databend_common_pipeline_core::processors::DuplicateProcessor;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
Expand All @@ -40,9 +43,17 @@ async fn test_duplicate_output_finish() -> Result<()> {
let downstream_input2 = InputPort::create();

unsafe {
connect(&input, &upstream_output);
connect(&downstream_input1, &output1);
connect(&downstream_input2, &output2);
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
connect(
&downstream_input1,
&output1,
Arc::new(BlockLimit::default()),
);
connect(
&downstream_input2,
&output2,
Arc::new(BlockLimit::default()),
);
}

downstream_input1.set_need_data();
Expand All @@ -68,9 +79,17 @@ async fn test_duplicate_output_finish() -> Result<()> {
let downstream_input2 = InputPort::create();

unsafe {
connect(&input, &upstream_output);
connect(&downstream_input1, &output1);
connect(&downstream_input2, &output2);
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
connect(
&downstream_input1,
&output1,
Arc::new(BlockLimit::default()),
);
connect(
&downstream_input2,
&output2,
Arc::new(BlockLimit::default()),
);
}

downstream_input1.finish();
Expand All @@ -94,9 +113,17 @@ async fn test_duplicate_output_finish() -> Result<()> {
let downstream_input2 = InputPort::create();

unsafe {
connect(&input, &upstream_output);
connect(&downstream_input1, &output1);
connect(&downstream_input2, &output2);
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
connect(
&downstream_input1,
&output1,
Arc::new(BlockLimit::default()),
);
connect(
&downstream_input2,
&output2,
Arc::new(BlockLimit::default()),
);
}

downstream_input1.finish();
Expand All @@ -120,9 +147,17 @@ async fn test_duplicate_processor() -> Result<()> {
let downstream_input2 = InputPort::create();

unsafe {
connect(&input, &upstream_output);
connect(&downstream_input1, &output1);
connect(&downstream_input2, &output2);
connect(&input, &upstream_output, Arc::new(BlockLimit::default()));
connect(
&downstream_input1,
&output1,
Arc::new(BlockLimit::default()),
);
connect(
&downstream_input2,
&output2,
Arc::new(BlockLimit::default()),
);
}

downstream_input1.set_need_data();
Expand Down
Loading
Loading