Skip to content

Commit 13146a5

Browse files
authored
refactor(query): Combining TransformSortMergeLimit, TransformSortMerge, and TransformSortSpill (#17762)
* transform_sort * update * fix * fix * fix * fix * refine * fix * refine * fix * move next_index * refine * arc * refine * memory_merger * fix
1 parent b424b0d commit 13146a5

39 files changed

+1101
-988
lines changed

src/query/expression/src/row/row_converter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::SortField;
3838
/// Convert column-oriented data into comparable row-oriented data.
3939
///
4040
/// **NOTE**: currently, Variant is treat as String.
41+
#[derive(Debug)]
4142
pub struct RowConverter {
4243
fields: Arc<[SortField]>,
4344
}

src/query/pipeline/transforms/src/processors/transforms/sort/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@ mod list_domain;
1919
mod loser_tree;
2020
mod merger;
2121
mod rows;
22-
mod spill;
2322
pub mod utils;
2423

2524
pub use cursor::*;
2625
pub use k_way_merge_sort_partition::KWaySortPartitioner;
2726
pub use k_way_merge_sort_partition::SortTaskMeta;
2827
pub use merger::*;
2928
pub use rows::*;
30-
pub use spill::*;

src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl RowConverter<BinaryColumn> for CommonRowConverter {
7777
CommonRowConverter::new(sort_fields)
7878
}
7979

80-
fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result<BinaryColumn> {
80+
fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result<BinaryColumn> {
8181
let columns = columns
8282
.iter()
8383
.map(|entry| match &entry.value {

src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ pub use utils::*;
3232

3333
/// Convert columns to rows.
3434
pub trait RowConverter<T: Rows>
35-
where Self: Sized
35+
where Self: Sized + Debug
3636
{
3737
fn create(
3838
sort_columns_descriptions: &[SortColumnDescription],
3939
output_schema: DataSchemaRef,
4040
) -> Result<Self>;
41-
fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result<T>;
41+
fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result<T>;
4242
}
4343

4444
/// Rows can be compared.

src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ where
117117

118118
/// If there is only one sort field and its type is a primitive type,
119119
/// use this converter.
120+
#[derive(Debug)]
120121
pub struct SimpleRowConverter<T> {
121122
_t: PhantomData<T>,
122123
}
@@ -135,7 +136,7 @@ where
135136
Ok(Self { _t: PhantomData })
136137
}
137138

138-
fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result<SimpleRowsAsc<T>> {
139+
fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result<SimpleRowsAsc<T>> {
139140
self.convert_rows(columns, num_rows, true)
140141
}
141142
}
@@ -154,14 +155,14 @@ where
154155
Ok(Self { _t: PhantomData })
155156
}
156157

157-
fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result<SimpleRowsDesc<T>> {
158+
fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result<SimpleRowsDesc<T>> {
158159
self.convert_rows(columns, num_rows, false)
159160
}
160161
}
161162

162163
impl<T: ArgType> SimpleRowConverter<T> {
163164
fn convert_rows<R: Rows>(
164-
&mut self,
165+
&self,
165166
columns: &[BlockEntry],
166167
num_rows: usize,
167168
asc: bool,

src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_expression::with_number_mapped_type;
2424
use databend_common_expression::BlockEntry;
2525
use databend_common_expression::Column;
2626
use databend_common_expression::DataBlock;
27+
use databend_common_expression::DataSchema;
2728
use databend_common_expression::DataSchemaRef;
2829
use databend_common_expression::SortColumnDescription;
2930
use match_template::match_template;
@@ -86,8 +87,8 @@ fn convert_columns<R: Rows, C: RowConverter<R>>(
8687
columns: &[BlockEntry],
8788
num_rows: usize,
8889
) -> Result<Column> {
89-
let mut converter = C::create(sort_desc, schema)?;
90-
let rows = C::convert(&mut converter, columns, num_rows)?;
90+
let converter = C::create(sort_desc, schema)?;
91+
let rows = C::convert(&converter, columns, num_rows)?;
9192
Ok(rows.to_column())
9293
}
9394

@@ -103,25 +104,25 @@ pub fn select_row_type(visitor: &mut impl RowsTypeVisitor) {
103104
match sort_type {
104105
DataType::T => {
105106
if asc {
106-
visitor.visit_type::<SimpleRowsAsc<T>>()
107+
visitor.visit_type::<SimpleRowsAsc<T>, SimpleRowConverter<T>>()
107108
} else {
108-
visitor.visit_type::<SimpleRowsDesc<T>>()
109+
visitor.visit_type::<SimpleRowsDesc<T>, SimpleRowConverter<T>>()
109110
}
110111
},
111112
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
112113
NumberDataType::NUM_TYPE => {
113114
if asc {
114-
visitor.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>>()
115+
visitor.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
115116
} else {
116-
visitor.visit_type::<SimpleRowsDesc<NumberType<NUM_TYPE>>>()
117+
visitor.visit_type::<SimpleRowsDesc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
117118
}
118119
}
119120
}),
120-
_ => visitor.visit_type::<CommonRows>()
121+
_ => visitor.visit_type::<CommonRows, CommonConverter>()
121122
}
122123
}
123124
} else {
124-
visitor.visit_type::<CommonRows>()
125+
visitor.visit_type::<CommonRows, CommonConverter>()
125126
}
126127
}
127128

@@ -130,5 +131,26 @@ pub trait RowsTypeVisitor {
130131

131132
fn sort_desc(&self) -> &[SortColumnDescription];
132133

133-
fn visit_type<R: Rows + 'static>(&mut self);
134+
fn visit_type<R, C>(&mut self)
135+
where
136+
R: Rows + 'static,
137+
C: RowConverter<R> + Send + 'static;
138+
}
139+
140+
pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) -> DataType {
141+
debug_assert!(!desc.is_empty());
142+
if desc.len() == 1 {
143+
let order_by_field = schema.field(desc[0].offset);
144+
if matches!(
145+
order_by_field.data_type(),
146+
DataType::Number(_)
147+
| DataType::Date
148+
| DataType::Timestamp
149+
| DataType::Binary
150+
| DataType::String
151+
) {
152+
return order_by_field.data_type().clone();
153+
}
154+
}
155+
DataType::Binary
134156
}

src/query/pipeline/transforms/src/processors/transforms/sort/spill.rs

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/query/pipeline/transforms/src/processors/transforms/sort/utils.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414

1515
use std::collections::BinaryHeap;
1616

17-
use databend_common_expression::types::DataType;
1817
use databend_common_expression::DataField;
1918
use databend_common_expression::DataSchema;
2019
use databend_common_expression::DataSchemaRef;
2120
use databend_common_expression::DataSchemaRefExt;
2221
use databend_common_expression::SortColumnDescription;
2322

23+
use super::order_field_type;
24+
2425
pub const ORDER_COL_NAME: &str = "_order_col";
2526

2627
/// Find the bigger child of the root of the heap.
@@ -35,33 +36,13 @@ pub fn find_bigger_child_of_root<T: Ord>(heap: &BinaryHeap<T>) -> &T {
3536
}
3637
}
3738

38-
#[inline(always)]
39-
fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) -> DataType {
40-
debug_assert!(!desc.is_empty());
41-
if desc.len() == 1 {
42-
let order_by_field = schema.field(desc[0].offset);
43-
if matches!(
44-
order_by_field.data_type(),
45-
DataType::Number(_)
46-
| DataType::Date
47-
| DataType::Timestamp
48-
| DataType::Binary
49-
| DataType::String
50-
) {
51-
return order_by_field.data_type().clone();
52-
}
53-
}
54-
DataType::Binary
55-
}
56-
5739
pub fn has_order_field(schema: &DataSchema) -> bool {
5840
schema
5941
.fields
6042
.last()
6143
.is_some_and(|f| f.name() == ORDER_COL_NAME)
6244
}
6345

64-
#[inline(always)]
6546
pub fn add_order_field(schema: DataSchemaRef, desc: &[SortColumnDescription]) -> DataSchemaRef {
6647
if has_order_field(&schema) {
6748
schema

src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub fn add_k_way_merge_sort(
5656
worker: usize,
5757
block_size: usize,
5858
limit: Option<usize>,
59-
sort_desc: Arc<Vec<SortColumnDescription>>,
59+
sort_desc: Arc<[SortColumnDescription]>,
6060
remove_order_col: bool,
6161
enable_loser_tree: bool,
6262
) -> Result<()> {

src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub fn try_add_multi_sort_merge(
5353
schema: DataSchemaRef,
5454
block_size: usize,
5555
limit: Option<usize>,
56-
sort_columns_descriptions: Arc<Vec<SortColumnDescription>>,
56+
sort_columns_descriptions: Arc<[SortColumnDescription]>,
5757
remove_order_col: bool,
5858
enable_loser_tree: bool,
5959
) -> Result<()> {
@@ -104,12 +104,10 @@ fn create_processor(
104104
schema: DataSchemaRef,
105105
block_size: usize,
106106
limit: Option<usize>,
107-
sort_columns_descriptions: Arc<Vec<SortColumnDescription>>,
107+
sort_desc: Arc<[SortColumnDescription]>,
108108
remove_order_col: bool,
109109
enable_loser_tree: bool,
110110
) -> Result<Box<dyn Processor>> {
111-
let sort_desc = sort_columns_descriptions;
112-
113111
macro_rules! create {
114112
($algo:ident, $rows:ty) => {
115113
MultiSortMergeProcessor::<$algo<$rows>>::create(
@@ -118,7 +116,6 @@ fn create_processor(
118116
schema,
119117
block_size,
120118
limit,
121-
sort_desc,
122119
remove_order_col,
123120
)?
124121
};
@@ -220,7 +217,6 @@ where A: SortAlgorithm
220217
schema: DataSchemaRef,
221218
block_size: usize,
222219
limit: Option<usize>,
223-
_sort_desc: Arc<Vec<SortColumnDescription>>,
224220
remove_order_col: bool,
225221
) -> Result<Self> {
226222
let streams = inputs

0 commit comments

Comments
 (0)