Skip to content

Commit 3d026be

Browse files
authored
refactor: reduce the memory of the HashIndex in the AggregateHashTable (#19046)
1 parent b66c492 commit 3d026be

37 files changed

+3373
-2307
lines changed

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use bumpalo::Bump;
2121
use databend_common_exception::Result;
2222

23-
use super::group_hash_columns;
23+
use super::group_hash_entries;
2424
use super::hash_index::AdapterImpl;
2525
use super::hash_index::HashIndex;
2626
use super::partitioned_payload::PartitionedPayload;
@@ -29,6 +29,7 @@ use super::probe_state::ProbeState;
2929
use super::Entry;
3030
use super::HashTableConfig;
3131
use super::Payload;
32+
use super::BATCH_SIZE;
3233
use super::LOAD_FACTOR;
3334
use super::MAX_PAGE_SIZE;
3435
use crate::types::DataType;
@@ -37,8 +38,6 @@ use crate::BlockEntry;
3738
use crate::ColumnBuilder;
3839
use crate::ProjectedBlock;
3940

40-
const BATCH_ADD_SIZE: usize = 2048;
41-
4241
pub struct AggregateHashTable {
4342
pub payload: PartitionedPayload,
4443
// use for append rows directly during deserialize
@@ -129,12 +128,12 @@ impl AggregateHashTable {
129128
agg_states: ProjectedBlock,
130129
row_count: usize,
131130
) -> Result<usize> {
132-
if row_count <= BATCH_ADD_SIZE {
131+
if row_count <= BATCH_SIZE {
133132
self.add_groups_inner(state, group_columns, params, agg_states, row_count)
134133
} else {
135134
let mut new_count = 0;
136-
for start in (0..row_count).step_by(BATCH_ADD_SIZE) {
137-
let end = (start + BATCH_ADD_SIZE).min(row_count);
135+
for start in (0..row_count).step_by(BATCH_SIZE) {
136+
let end = (start + BATCH_SIZE).min(row_count);
138137
let step_group_columns = group_columns
139138
.iter()
140139
.map(|entry| entry.slice(start..end))
@@ -188,11 +187,11 @@ impl AggregateHashTable {
188187
}
189188

190189
state.row_count = row_count;
191-
group_hash_columns(group_columns, &mut state.group_hashes);
190+
group_hash_entries(group_columns, &mut state.group_hashes[..row_count]);
192191

193192
let new_group_count = if self.direct_append {
194-
for idx in 0..row_count {
195-
state.empty_vector[idx] = idx;
193+
for i in 0..row_count {
194+
state.empty_vector[i] = i.into();
196195
}
197196
self.payload.append_rows(state, row_count, group_columns);
198197
row_count
@@ -232,7 +231,7 @@ impl AggregateHashTable {
232231

233232
if self.config.partial_agg {
234233
// check size
235-
if self.hash_index.count + BATCH_ADD_SIZE > self.hash_index.resize_threshold()
234+
if self.hash_index.count + BATCH_SIZE > self.hash_index.resize_threshold()
236235
&& self.hash_index.capacity >= self.config.max_partial_capacity
237236
{
238237
self.clear_ht();

src/query/expression/src/aggregate/group_hash.rs

Lines changed: 209 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,13 @@ use databend_common_column::types::Index;
1919
use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
2121

22-
use crate::types::i256;
23-
use crate::types::number::Number;
24-
use crate::types::AccessType;
25-
use crate::types::AnyType;
26-
use crate::types::BinaryColumn;
27-
use crate::types::BinaryType;
28-
use crate::types::BitmapType;
29-
use crate::types::BooleanType;
30-
use crate::types::DataType;
31-
use crate::types::DateType;
32-
use crate::types::DecimalColumn;
33-
use crate::types::DecimalDataKind;
34-
use crate::types::DecimalScalar;
35-
use crate::types::DecimalView;
36-
use crate::types::GeographyColumn;
37-
use crate::types::GeographyType;
38-
use crate::types::GeometryType;
39-
use crate::types::NullableColumn;
40-
use crate::types::NumberColumn;
41-
use crate::types::NumberDataType;
42-
use crate::types::NumberScalar;
43-
use crate::types::NumberType;
44-
use crate::types::OpaqueScalarRef;
45-
use crate::types::StringColumn;
46-
use crate::types::StringType;
47-
use crate::types::TimestampType;
48-
use crate::types::ValueType;
49-
use crate::types::VariantType;
22+
use crate::types::decimal::Decimal;
23+
use crate::types::*;
5024
use crate::visitor::ValueVisitor;
5125
use crate::with_decimal_mapped_type;
5226
use crate::with_number_mapped_type;
5327
use crate::with_number_type;
28+
use crate::BlockEntry;
5429
use crate::Column;
5530
use crate::ProjectedBlock;
5631
use crate::Scalar;
@@ -59,23 +34,101 @@ use crate::Value;
5934

6035
const NULL_HASH_VAL: u64 = 0xd1cefa08eb382d69;
6136

62-
pub fn group_hash_columns(cols: ProjectedBlock, values: &mut [u64]) {
63-
debug_assert!(!cols.is_empty());
64-
for (i, entry) in cols.iter().enumerate() {
65-
if i == 0 {
66-
combine_group_hash_column::<true>(&entry.to_column(), values);
67-
} else {
68-
combine_group_hash_column::<false>(&entry.to_column(), values);
37+
pub fn group_hash_entries(entries: ProjectedBlock, values: &mut [u64]) {
38+
debug_assert!(!entries.is_empty());
39+
for (i, entry) in entries.iter().enumerate() {
40+
debug_assert_eq!(entry.len(), values.len());
41+
match entry {
42+
BlockEntry::Const(scalar, data_type, _) => {
43+
if i == 0 {
44+
combine_group_hash_const::<true>(scalar, data_type, values);
45+
} else {
46+
combine_group_hash_const::<false>(scalar, data_type, values);
47+
}
48+
}
49+
BlockEntry::Column(column) => {
50+
if i == 0 {
51+
combine_group_hash_column::<true>(column, values);
52+
} else {
53+
combine_group_hash_column::<false>(column, values);
54+
}
55+
}
6956
}
7057
}
7158
}
7259

73-
pub fn combine_group_hash_column<const IS_FIRST: bool>(c: &Column, values: &mut [u64]) {
60+
fn combine_group_hash_column<const IS_FIRST: bool>(c: &Column, values: &mut [u64]) {
7461
HashVisitor::<IS_FIRST> { values }
7562
.visit_column(c.clone())
7663
.unwrap()
7764
}
7865

66+
fn combine_group_hash_const<const IS_FIRST: bool>(
67+
scalar: &Scalar,
68+
data_type: &DataType,
69+
values: &mut [u64],
70+
) {
71+
match data_type {
72+
DataType::Null | DataType::EmptyArray | DataType::EmptyMap => {}
73+
DataType::Nullable(inner) => {
74+
if scalar.is_null() {
75+
apply_const_hash::<IS_FIRST>(values, NULL_HASH_VAL);
76+
} else {
77+
combine_group_hash_const_nonnull::<IS_FIRST>(scalar, inner, values);
78+
}
79+
}
80+
_ => combine_group_hash_const_nonnull::<IS_FIRST>(scalar, data_type, values),
81+
}
82+
}
83+
84+
fn combine_group_hash_const_nonnull<const IS_FIRST: bool>(
85+
scalar: &Scalar,
86+
_data_type: &DataType,
87+
values: &mut [u64],
88+
) {
89+
let hash = match scalar {
90+
Scalar::Null => unreachable!(),
91+
Scalar::EmptyArray | Scalar::EmptyMap => return,
92+
Scalar::Number(v) => with_number_type!(|NUM_TYPE| match v {
93+
NumberScalar::NUM_TYPE(value) => value.agg_hash(),
94+
}),
95+
Scalar::Decimal(v) => {
96+
with_decimal_mapped_type!(|F| match v {
97+
DecimalScalar::F(v, size) => {
98+
with_decimal_mapped_type!(|T| match size.data_kind() {
99+
DecimalDataKind::T => {
100+
v.as_decimal::<T>().agg_hash()
101+
}
102+
})
103+
}
104+
})
105+
}
106+
Scalar::Timestamp(value) => value.agg_hash(),
107+
Scalar::Date(value) => value.agg_hash(),
108+
Scalar::Boolean(value) => value.agg_hash(),
109+
Scalar::String(value) => value.as_bytes().agg_hash(),
110+
Scalar::Binary(value)
111+
| Scalar::Bitmap(value)
112+
| Scalar::Variant(value)
113+
| Scalar::Geometry(value) => value.agg_hash(),
114+
Scalar::Geography(value) => value.0.agg_hash(),
115+
_ => scalar.as_ref().agg_hash(),
116+
};
117+
apply_const_hash::<IS_FIRST>(values, hash);
118+
}
119+
120+
fn apply_const_hash<const IS_FIRST: bool>(values: &mut [u64], hash: u64) {
121+
if IS_FIRST {
122+
for val in values.iter_mut() {
123+
*val = hash;
124+
}
125+
} else {
126+
for val in values.iter_mut() {
127+
*val = merge_hash(*val, hash);
128+
}
129+
}
130+
}
131+
79132
struct HashVisitor<'a, const IS_FIRST: bool> {
80133
values: &'a mut [u64],
81134
}
@@ -101,7 +154,7 @@ impl<const IS_FIRST: bool> ValueVisitor for HashVisitor<'_, IS_FIRST> {
101154
fn visit_any_number(&mut self, column: NumberColumn) -> Result<()> {
102155
with_number_mapped_type!(|NUM_TYPE| match column.data_type() {
103156
NumberDataType::NUM_TYPE => {
104-
let c = NUM_TYPE::try_downcast_column(&column).unwrap();
157+
let c = <NUM_TYPE as Number>::try_downcast_column(&column).unwrap();
105158
self.combine_group_hash_type_column::<NumberType<NUM_TYPE>>(&c)
106159
}
107160
});
@@ -573,22 +626,138 @@ impl AggHash for ScalarRef<'_> {
573626
#[cfg(test)]
574627
mod tests {
575628
use databend_common_column::bitmap::Bitmap;
629+
use databend_common_column::types::months_days_micros;
630+
use databend_common_column::types::timestamp_tz;
576631

577632
use super::*;
633+
use crate::types::geography::Geography;
578634
use crate::types::ArgType;
579-
use crate::types::Int32Type;
635+
use crate::types::DecimalSize;
580636
use crate::types::NullableColumn;
581-
use crate::types::NullableType;
582-
use crate::types::StringType;
637+
use crate::types::NumberScalar;
638+
use crate::types::OpaqueScalar;
639+
use crate::types::VectorDataType;
640+
use crate::types::VectorScalar;
583641
use crate::BlockEntry;
584642
use crate::DataBlock;
585643
use crate::FromData;
644+
use crate::ProjectedBlock;
586645
use crate::Value;
587646

588647
fn merge_hash_slice(ls: &[u64]) -> u64 {
589648
ls.iter().cloned().reduce(merge_hash).unwrap()
590649
}
591650

651+
#[test]
652+
fn test_group_hash_entries_const_equals_column() {
653+
let num_rows = 5;
654+
let block = sample_block(num_rows);
655+
let full_block = block.convert_to_full();
656+
657+
for projection in (0..block.num_columns())
658+
.map_windows(|[a, b]| vec![*a, *b])
659+
.chain(Some((0..block.num_columns()).collect()))
660+
.collect::<Vec<_>>()
661+
{
662+
let mut const_hashes = vec![0; block.num_rows()];
663+
let mut col_hashes = vec![0; full_block.num_rows()];
664+
group_hash_entries(
665+
ProjectedBlock::project(&projection, &block),
666+
&mut const_hashes,
667+
);
668+
group_hash_entries(
669+
ProjectedBlock::project(&projection, &full_block),
670+
&mut col_hashes,
671+
);
672+
assert_eq!(const_hashes, col_hashes);
673+
}
674+
}
675+
676+
fn sample_block(num_rows: usize) -> DataBlock {
677+
let cases = [
678+
(DataType::Null, Scalar::Null),
679+
(DataType::EmptyArray, Scalar::EmptyArray),
680+
(DataType::EmptyMap, Scalar::EmptyMap),
681+
(DataType::Boolean, Scalar::Boolean(true)),
682+
(DataType::Binary, Scalar::Binary(vec![1, 2, 3, 4])),
683+
(DataType::String, Scalar::String("const_str".to_string())),
684+
(
685+
Int32Type::data_type(),
686+
Scalar::Number(NumberScalar::Int32(123)),
687+
),
688+
(
689+
DataType::Number(NumberDataType::Float64),
690+
Scalar::Number(NumberScalar::Float64(OrderedFloat(1.25))),
691+
),
692+
{
693+
let decimal_size = DecimalSize::new(20, 3).unwrap();
694+
(
695+
DataType::Decimal(decimal_size),
696+
Scalar::Decimal(DecimalScalar::Decimal128(123456_i128, decimal_size)),
697+
)
698+
},
699+
(DataType::Timestamp, Scalar::Timestamp(123_456_789)),
700+
(
701+
DataType::TimestampTz,
702+
Scalar::TimestampTz(timestamp_tz::new(123_456, 3_600)),
703+
),
704+
(DataType::Date, Scalar::Date(42)),
705+
(
706+
DataType::Interval,
707+
Scalar::Interval(months_days_micros::new(1, 2, 3)),
708+
),
709+
(DataType::Bitmap, Scalar::Bitmap(vec![0b10101010])),
710+
(DataType::Variant, Scalar::Variant(vec![1, 2, 3, 4])),
711+
(DataType::Geometry, Scalar::Geometry(vec![9, 9, 9])),
712+
(
713+
DataType::Geography,
714+
Scalar::Geography(Geography(vec![1, 2, 3])),
715+
),
716+
(
717+
DataType::Vector(VectorDataType::Int8(2)),
718+
Scalar::Vector(VectorScalar::Int8(vec![1, 2])),
719+
),
720+
(
721+
DataType::Opaque(2),
722+
Scalar::Opaque(OpaqueScalar::Opaque2([1, 2])),
723+
),
724+
{
725+
let array_ty = DataType::Array(Box::new(Int32Type::data_type()));
726+
(array_ty.clone(), Scalar::default_value(&array_ty))
727+
},
728+
{
729+
let map_ty = DataType::Map(Box::new(DataType::Tuple(vec![
730+
DataType::String,
731+
Int32Type::data_type(),
732+
])));
733+
let val = Scalar::default_value(&map_ty);
734+
(map_ty, val)
735+
},
736+
{
737+
let tuple_ty = DataType::Tuple(vec![DataType::String, Int32Type::data_type()]);
738+
(
739+
tuple_ty,
740+
Scalar::Tuple(vec![
741+
Scalar::String("tuple_0".to_string()),
742+
Scalar::Number(NumberScalar::Int32(0)),
743+
]),
744+
)
745+
},
746+
(
747+
Int32Type::data_type().wrap_nullable(),
748+
Scalar::Number(NumberScalar::Int32(999)),
749+
),
750+
(Int32Type::data_type().wrap_nullable(), Scalar::Null),
751+
];
752+
753+
DataBlock::from_iter(
754+
cases.into_iter().map(|(data_type, scalar)| {
755+
BlockEntry::new_const_column(data_type, scalar, num_rows)
756+
}),
757+
num_rows,
758+
)
759+
}
760+
592761
#[test]
593762
fn test_value_spread() -> Result<()> {
594763
let data = DataBlock::new(

0 commit comments

Comments
 (0)