Skip to content

Commit 3f823fc

Browse files
authored
feat(query): when decimal is used as an aggregate group by column, decimal64 is used (#18171)
* group_hash * ArrayAggStateSimple * ArrayAggStateZST * fix * refine
1 parent 425e7b2 commit 3f823fc

28 files changed

+571
-333
lines changed

src/query/expression/src/aggregate/group_hash.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ use crate::types::BitmapType;
2929
use crate::types::BooleanType;
3030
use crate::types::DataType;
3131
use crate::types::DateType;
32-
use crate::types::Decimal64As128Type;
3332
use crate::types::DecimalColumn;
33+
use crate::types::DecimalDataKind;
3434
use crate::types::DecimalScalar;
35-
use crate::types::DecimalType;
35+
use crate::types::DecimalView;
3636
use crate::types::GeographyColumn;
3737
use crate::types::GeographyType;
3838
use crate::types::GeometryType;
@@ -47,7 +47,7 @@ use crate::types::TimestampType;
4747
use crate::types::ValueType;
4848
use crate::types::VariantType;
4949
use crate::visitor::ValueVisitor;
50-
use crate::with_decimal_type;
50+
use crate::with_decimal_mapped_type;
5151
use crate::with_number_mapped_type;
5252
use crate::with_number_type;
5353
use crate::Column;
@@ -107,18 +107,16 @@ impl<const IS_FIRST: bool> ValueVisitor for HashVisitor<'_, IS_FIRST> {
107107
Ok(())
108108
}
109109

110-
fn visit_any_decimal(&mut self, column: DecimalColumn) -> Result<()> {
111-
match column {
112-
DecimalColumn::Decimal64(buffer, _) => {
113-
self.combine_group_hash_type_column::<Decimal64As128Type>(&buffer);
114-
}
115-
DecimalColumn::Decimal128(buffer, _) => {
116-
self.combine_group_hash_type_column::<DecimalType<i128>>(&buffer);
117-
}
118-
DecimalColumn::Decimal256(buffer, _) => {
119-
self.combine_group_hash_type_column::<DecimalType<i256>>(&buffer);
110+
fn visit_any_decimal(&mut self, decimal_column: DecimalColumn) -> Result<()> {
111+
with_decimal_mapped_type!(|F| match decimal_column {
112+
DecimalColumn::F(buffer, size) => {
113+
with_decimal_mapped_type!(|T| match size.best_type().data_kind() {
114+
DecimalDataKind::T => {
115+
self.combine_group_hash_type_column::<DecimalView<F, T>>(&buffer);
116+
}
117+
});
120118
}
121-
}
119+
});
122120
Ok(())
123121
}
124122

@@ -329,10 +327,15 @@ where I: Index
329327
}
330328

331329
fn visit_any_decimal(&mut self, column: DecimalColumn) -> Result<()> {
332-
with_decimal_type!(|DECIMAL_TYPE| match column {
333-
DecimalColumn::DECIMAL_TYPE(buffer, _) => {
334-
let buffer = buffer.as_ref();
335-
self.visit_indices(|i| buffer[i.to_usize()].agg_hash())
330+
with_decimal_mapped_type!(|F| match &column {
331+
DecimalColumn::F(_, size) => {
332+
with_decimal_mapped_type!(|T| match size.best_type().data_kind() {
333+
DecimalDataKind::T => {
334+
type D = DecimalView<F, T>;
335+
let buffer = D::try_downcast_column(&Column::Decimal(column)).unwrap();
336+
self.visit_indices(|i| buffer[i.to_usize()].agg_hash())
337+
}
338+
})
336339
}
337340
})
338341
}

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::types::ArgType;
3333
use crate::types::BooleanType;
3434
use crate::types::DataType;
3535
use crate::types::DateType;
36+
use crate::types::DecimalDataKind;
3637
use crate::types::DecimalSize;
3738
use crate::types::NumberDataType;
3839
use crate::types::NumberType;
@@ -238,13 +239,17 @@ impl Payload {
238239
NumberDataType::NUM_TYPE =>
239240
self.flush_type_column::<NumberType<NUM_TYPE>>(col_offset, state),
240241
}),
241-
DataType::Decimal(size) => {
242-
if size.can_carried_by_128() {
242+
DataType::Decimal(size) => match size.best_type().data_kind() {
243+
DecimalDataKind::Decimal64 => {
244+
self.flush_decimal_column::<i64>(col_offset, state, size)
245+
}
246+
DecimalDataKind::Decimal128 => {
243247
self.flush_decimal_column::<i128>(col_offset, state, size)
244-
} else {
248+
}
249+
DecimalDataKind::Decimal256 => {
245250
self.flush_decimal_column::<i256>(col_offset, state, size)
246251
}
247-
}
252+
},
248253
DataType::Timestamp => self.flush_type_column::<TimestampType>(col_offset, state),
249254
DataType::Date => self.flush_type_column::<DateType>(col_offset, state),
250255
DataType::Binary => Column::Binary(self.flush_binary_column(col_offset, state)),

src/query/expression/src/aggregate/payload_row.rs

Lines changed: 62 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,36 @@ use crate::read;
2121
use crate::store;
2222
use crate::types::binary::BinaryColumn;
2323
use crate::types::decimal::DecimalColumn;
24-
use crate::types::decimal::DecimalType;
2524
use crate::types::i256;
2625
use crate::types::AccessType;
2726
use crate::types::AnyType;
2827
use crate::types::BinaryType;
2928
use crate::types::BooleanType;
3029
use crate::types::DataType;
3130
use crate::types::DateType;
32-
use crate::types::Decimal128As256Type;
33-
use crate::types::Decimal256As128Type;
31+
use crate::types::DecimalDataKind;
32+
use crate::types::DecimalView;
3433
use crate::types::NumberColumn;
3534
use crate::types::NumberType;
3635
use crate::types::StringColumn;
3736
use crate::types::StringType;
3837
use crate::types::TimestampType;
38+
use crate::with_decimal_mapped_type;
3939
use crate::with_number_mapped_type;
4040
use crate::Column;
4141
use crate::InputColumns;
4242
use crate::Scalar;
4343
use crate::SelectVector;
4444

45-
pub fn rowformat_size(data_type: &DataType) -> usize {
45+
pub(super) fn rowformat_size(data_type: &DataType) -> usize {
4646
match data_type {
4747
DataType::Null | DataType::EmptyArray | DataType::EmptyMap => 0,
4848
DataType::Boolean => 1,
4949
DataType::Number(n) => n.bit_width() as usize / 8,
50-
DataType::Decimal(n) => {
51-
if n.can_carried_by_128() {
50+
DataType::Decimal(size) => {
51+
if size.can_carried_by_64() {
52+
8
53+
} else if size.can_carried_by_128() {
5254
16
5355
} else {
5456
32
@@ -71,7 +73,7 @@ pub fn rowformat_size(data_type: &DataType) -> usize {
7173
}
7274

7375
/// This serialize column into row format by fixed size
74-
pub unsafe fn serialize_column_to_rowformat(
76+
pub(super) unsafe fn serialize_column_to_rowformat(
7577
arena: &Bump,
7678
column: &Column,
7779
select_vector: &SelectVector,
@@ -89,33 +91,22 @@ pub unsafe fn serialize_column_to_rowformat(
8991
}
9092
}
9193
}),
92-
Column::Decimal(v) => match v {
93-
DecimalColumn::Decimal64(_, _) => unimplemented!(),
94-
DecimalColumn::Decimal128(buffer, size) => {
95-
if size.can_carried_by_128() {
96-
for index in select_vector.iter().take(rows).copied() {
97-
store(&buffer[index], address[index].add(offset) as *mut u8);
98-
}
99-
} else {
100-
for index in select_vector.iter().take(rows).copied() {
101-
let val = Decimal128As256Type::index_column_unchecked(buffer, index);
102-
store(&val, address[index].add(offset) as *mut u8);
103-
}
104-
}
105-
}
106-
DecimalColumn::Decimal256(buffer, size) => {
107-
if size.can_carried_by_128() {
108-
for index in select_vector.iter().take(rows).copied() {
109-
let val = Decimal256As128Type::index_column_unchecked(buffer, index);
110-
store(&val, address[index].add(offset) as *mut u8);
111-
}
112-
} else {
113-
for index in select_vector.iter().take(rows).copied() {
114-
store(&buffer[index], address[index].add(offset) as *mut u8);
115-
}
94+
Column::Decimal(decimal_column) => {
95+
with_decimal_mapped_type!(|F| match decimal_column {
96+
DecimalColumn::F(buffer, size) => {
97+
with_decimal_mapped_type!(|T| match size.best_type().data_kind() {
98+
DecimalDataKind::T => {
99+
serialize_fixed_size_column_to_rowformat::<DecimalView<F, T>>(
100+
buffer,
101+
&select_vector[0..rows],
102+
address,
103+
offset,
104+
);
105+
}
106+
});
116107
}
117-
}
118-
},
108+
});
109+
}
119110
Column::Boolean(v) => {
120111
if v.null_count() == 0 || v.null_count() == v.len() {
121112
let val: u8 = if v.null_count() == 0 { 1 } else { 0 };
@@ -190,7 +181,21 @@ pub unsafe fn serialize_column_to_rowformat(
190181
}
191182
}
192183

193-
pub unsafe fn row_match_columns(
184+
unsafe fn serialize_fixed_size_column_to_rowformat<T>(
185+
column: &T::Column,
186+
select_vector: &[usize],
187+
address: &[*const u8],
188+
offset: usize,
189+
) where
190+
T: AccessType<Scalar: Copy>,
191+
{
192+
for index in select_vector.iter().copied() {
193+
let val = T::index_column_unchecked_scalar(column, index);
194+
store(&val, address[index].add(offset) as *mut u8);
195+
}
196+
}
197+
198+
pub(super) unsafe fn row_match_columns(
194199
cols: InputColumns,
195200
address: &[*const u8],
196201
select_vector: &mut SelectVector,
@@ -226,7 +231,7 @@ pub unsafe fn row_match_columns(
226231
}
227232
}
228233

229-
pub unsafe fn row_match_column(
234+
unsafe fn row_match_column(
230235
col: &Column,
231236
address: &[*const u8],
232237
select_vector: &mut SelectVector,
@@ -264,33 +269,28 @@ pub unsafe fn row_match_column(
264269
)
265270
}
266271
}),
267-
Column::Decimal(v) => match v {
268-
DecimalColumn::Decimal64(_, _) => unreachable!(),
269-
DecimalColumn::Decimal128(_, _) => row_match_column_type::<DecimalType<i128>>(
270-
col,
271-
validity,
272-
address,
273-
select_vector,
274-
temp_vector,
275-
count,
276-
validity_offset,
277-
col_offset,
278-
no_match,
279-
no_match_count,
280-
),
281-
DecimalColumn::Decimal256(_, _) => row_match_column_type::<DecimalType<i256>>(
282-
col,
283-
validity,
284-
address,
285-
select_vector,
286-
temp_vector,
287-
count,
288-
validity_offset,
289-
col_offset,
290-
no_match,
291-
no_match_count,
292-
),
293-
},
272+
Column::Decimal(decimal_column) => {
273+
with_decimal_mapped_type!(|F| match decimal_column {
274+
DecimalColumn::F(_, size) => {
275+
with_decimal_mapped_type!(|T| match size.best_type().data_kind() {
276+
DecimalDataKind::T => {
277+
row_match_column_type::<DecimalView<F, T>>(
278+
col,
279+
validity,
280+
address,
281+
select_vector,
282+
temp_vector,
283+
count,
284+
validity_offset,
285+
col_offset,
286+
no_match,
287+
no_match_count,
288+
)
289+
}
290+
});
291+
}
292+
});
293+
}
294294
Column::Boolean(_) => row_match_column_type::<BooleanType>(
295295
col,
296296
validity,

src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ where T: Clone
104104
impl<T> HashMethodFixedKeys<T>
105105
where T: Clone
106106
{
107-
pub fn deserialize_group_columns(
107+
#[expect(dead_code)]
108+
fn deserialize_group_columns(
108109
&self,
109110
keys: Vec<T>,
110111
group_items: &[(usize, DataType)],

src/query/expression/src/types/decimal.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2815,6 +2815,8 @@ impl Ord for i256 {
28152815
}
28162816
}
28172817

2818+
pub type DecimalView<F, T> = ComputeView<DecimalConvert<F, T>, CoreDecimal<F>, CoreDecimal<T>>;
2819+
28182820
#[derive(Debug, Clone, PartialEq, Eq, Default)]
28192821
pub struct DecimalConvert<F, T>(std::marker::PhantomData<(F, T)>);
28202822

src/query/expression/src/types/nullable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ impl<T: ReturnType> NullableColumn<T> {
362362
impl NullableColumn<AnyType> {
363363
pub fn new(column: Column, validity: Bitmap) -> Self {
364364
debug_assert_eq!(column.len(), validity.len());
365-
debug_assert!(!column.is_nullable());
365+
debug_assert!(!column.is_nullable() && !column.is_null());
366366
NullableColumn { column, validity }
367367
}
368368
}

src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ use databend_common_expression::Scalar;
3535
use databend_common_expression::SortColumnDescription;
3636
use itertools::Itertools;
3737

38-
use crate::aggregates::borsh_deserialize_state;
39-
use crate::aggregates::borsh_serialize_state;
4038
use crate::aggregates::AggregateFunctionSortDesc;
4139

4240
#[derive(BorshSerialize, BorshDeserialize, Debug)]
@@ -119,14 +117,12 @@ impl AggregateFunction for AggregateFunctionSortAdaptor {
119117

120118
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
121119
let state = Self::get_state(place);
122-
borsh_serialize_state(writer, state)?;
123-
124-
Ok(())
120+
Ok(state.serialize(writer)?)
125121
}
126122

127123
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> {
128124
let state = Self::get_state(place);
129-
let rhs: SortAggState = borsh_deserialize_state(reader)?;
125+
let rhs = SortAggState::deserialize(reader)?;
130126

131127
Self::merge_states_inner(state, &rhs);
132128
Ok(())

src/query/functions/src/aggregates/aggregate_arg_min_max.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ use super::aggregate_scalar_state::CmpMin;
4040
use super::aggregate_scalar_state::TYPE_ANY;
4141
use super::aggregate_scalar_state::TYPE_MAX;
4242
use super::aggregate_scalar_state::TYPE_MIN;
43-
use super::borsh_deserialize_state;
44-
use super::borsh_serialize_state;
43+
use super::borsh_partial_deserialize;
4544
use super::AggregateFunctionRef;
4645
use super::StateAddr;
4746
use crate::aggregates::assert_binary_arguments;
@@ -273,12 +272,12 @@ where
273272

274273
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
275274
let state = place.get::<State>();
276-
borsh_serialize_state(writer, state)
275+
Ok(state.serialize(writer)?)
277276
}
278277

279278
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> {
280279
let state = place.get::<State>();
281-
let rhs: State = borsh_deserialize_state(reader)?;
280+
let rhs: State = borsh_partial_deserialize(reader)?;
282281
state.merge_from(rhs)
283282
}
284283

0 commit comments

Comments
 (0)