Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ impl BlockThresholds {
&& file_size < self.min_compressed_per_block / 2
}

#[inline]
pub fn check_too_large(&self, row_count: usize, block_size: usize) -> bool {
row_count > 2 * self.min_rows_per_block || block_size > self.max_bytes_per_block
}

#[inline]
pub fn calc_rows_for_compact(&self, total_bytes: usize, total_rows: usize) -> usize {
if self.check_for_compact(total_rows, total_bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ mod transform_blocking;
mod transform_blocking_async;
mod transform_compact_block;
mod transform_compact_builder;
mod transform_compact_no_split_builder;
mod transform_dummy;
mod transform_hook;
mod transform_ordered_compact_builder;
mod transform_pipeline_helper;
mod transform_retry_async;
pub mod window;
Expand All @@ -40,8 +40,8 @@ pub use transform_blocking::*;
pub use transform_blocking_async::*;
pub use transform_compact_block::*;
pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_hook::*;
pub use transform_ordered_compact_builder::build_ordered_compact_pipeline;
pub use transform_pipeline_helper::TransformPipelineHelper;
pub use transform_retry_async::*;
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::processors::UnknownMode;
pub enum BlockCompactMeta {
Concat(Vec<DataBlock>),
Split {
block: DataBlock,
blocks: Vec<DataBlock>,
rows_per_block: usize,
},
NoChange(Vec<DataBlock>),
Expand Down Expand Up @@ -66,9 +66,9 @@ impl BlockMetaTransform<BlockCompactMeta> for TransformCompactBlock {
match meta {
BlockCompactMeta::Concat(blocks) => Ok(vec![DataBlock::concat(&blocks)?]),
BlockCompactMeta::Split {
block,
blocks,
rows_per_block,
} => Ok(block.split_by_rows_if_needed_no_tail(rows_per_block)),
} => Self::split_blocks(blocks, rows_per_block),
BlockCompactMeta::NoChange(blocks) => Ok(blocks),
}
}
Expand All @@ -77,3 +77,135 @@ impl BlockMetaTransform<BlockCompactMeta> for TransformCompactBlock {
self.aborting.store(true, Ordering::Release);
}
}

impl TransformCompactBlock {
fn split_blocks(blocks: Vec<DataBlock>, rows_per_block: usize) -> Result<Vec<DataBlock>> {
debug_assert!(!blocks.is_empty());
if blocks.len() == 1 {
return Ok(blocks[0].split_by_rows_if_needed_no_tail(rows_per_block));
}

let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut total_rows: usize = blocks.iter().map(DataBlock::num_rows).sum();
let mut blocks = blocks.into_iter();
let mut current = blocks.next();
let mut offset = 0;
let mut output = Vec::new();

// Mirror split_by_rows_if_needed_no_tail, but consume a sequence of blocks
// while preserving their original order. Like the original helper, this
// treats rows_per_block as a target and allows a slightly larger block to
// avoid emitting a tiny tail block.
while total_rows >= max_rows_per_block {
let mut remain_rows = rows_per_block;
let mut pieces = vec![];

while remain_rows > 0 {
let block = current.as_ref().ok_or_else(|| {
ErrorCode::Internal("not enough rows to split compact blocks")
})?;
let block_rows = block.num_rows() - offset;

if block_rows <= remain_rows {
let block = current.take().unwrap();
remain_rows -= block_rows;
pieces.push(if offset == 0 {
block
} else {
block.slice(offset..block.num_rows())
});
current = blocks.next();
offset = 0;
} else {
// Split the current block and keep the remainder for the next output block.
pieces.push(block.slice(offset..offset + remain_rows));
offset += remain_rows;
remain_rows = 0;
}
}

output.push(DataBlock::concat(&pieces)?);
total_rows -= rows_per_block;
}

if let Some(block) = current {
// Emit the final tail block, which may be smaller than rows_per_block by design.
let mut tail = Vec::new();
tail.push(block.slice(offset..block.num_rows()));
tail.extend(blocks);
output.push(DataBlock::concat(&tail)?);
}

Ok(output)
}
}

#[cfg(test)]
mod tests {
use databend_common_exception::Result;
use databend_common_expression::FromData;
use databend_common_expression::ScalarRef;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::number::NumberScalar;

use super::*;

fn block_with_range(start: i32, end: i32) -> DataBlock {
DataBlock::new_from_columns(vec![Int32Type::from_data((start..end).collect::<Vec<_>>())])
}

fn block_values(block: &DataBlock) -> Vec<i32> {
(0..block.num_rows())
.map(|row| match block.get_by_offset(0).index(row).unwrap() {
ScalarRef::Number(NumberScalar::Int32(value)) => value,
value => panic!("unexpected scalar: {value:?}"),
})
.collect()
}

fn assert_split_matches_reference(blocks: Vec<DataBlock>, rows_per_block: usize) -> Result<()> {
let actual = TransformCompactBlock::split_blocks(blocks.clone(), rows_per_block)?;
let expected = DataBlock::concat(&blocks)?.split_by_rows_if_needed_no_tail(rows_per_block);

assert_eq!(
actual.iter().map(DataBlock::num_rows).collect::<Vec<_>>(),
expected.iter().map(DataBlock::num_rows).collect::<Vec<_>>()
);
assert_eq!(
actual.iter().map(block_values).collect::<Vec<_>>(),
expected.iter().map(block_values).collect::<Vec<_>>()
);
Ok(())
}

#[test]
fn test_split_blocks_matches_reference_across_block_boundaries() -> Result<()> {
assert_split_matches_reference(
vec![
block_with_range(0, 2),
block_with_range(2, 6),
block_with_range(6, 10),
],
3,
)?;
assert_split_matches_reference(
vec![
block_with_range(0, 1),
block_with_range(1, 2),
block_with_range(2, 3),
block_with_range(3, 10),
],
4,
)?;
assert_split_matches_reference(
vec![
block_with_range(0, 2),
block_with_range(2, 4),
block_with_range(4, 6),
block_with_range(6, 8),
],
5,
)?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AccumulatingTransform for BlockCompactBuilder {
let rows_per_block = self.thresholds.calc_rows_for_compact(num_bytes, num_rows);
Ok(vec![DataBlock::empty_with_meta(Box::new(
BlockCompactMeta::Split {
block: data,
blocks: vec![data],
rows_per_block,
},
))])
Expand Down

This file was deleted.

Loading
Loading