Skip to content

Commit cb63b57

Browse files
authored
refactor: Mutil Column Aggregate Function State Serialization Interface (#18398)
1 parent 0f00f38 commit cb63b57

File tree

57 files changed

+2215
-668
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2215
-668
lines changed

src/common/base/src/runtime/profile/profile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::runtime::metrics::ScopedRegistry;
2323
use crate::runtime::profile::ProfileStatisticsName;
2424
use crate::runtime::ThreadTracker;
2525

26-
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
26+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2727
pub struct ProfileLabel {
2828
pub name: String,
2929
pub value: Vec<String>,

src/common/storage/src/copy.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use serde::Deserialize;
2020
use serde::Serialize;
2121
use thiserror::Error;
2222

23-
#[derive(Default, Clone, Serialize, Deserialize)]
23+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
2424
pub struct CopyStatus {
2525
/// Key is file path.
2626
pub files: DashMap<String, FileStatus>,
@@ -45,7 +45,7 @@ impl CopyStatus {
4545
}
4646
}
4747

48-
#[derive(Default, Clone, Serialize, Deserialize)]
48+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
4949
pub struct FileStatus {
5050
pub num_rows_loaded: usize,
5151
pub error: Option<FileErrorsInfo>,
@@ -79,7 +79,7 @@ impl FileStatus {
7979
}
8080
}
8181

82-
#[derive(Clone, Serialize, Deserialize)]
82+
#[derive(Debug, Clone, Serialize, Deserialize)]
8383
pub struct FileErrorsInfo {
8484
pub num_errors: usize,
8585
pub first_error: FileParseErrorAtLine,
@@ -156,7 +156,7 @@ impl FileParseError {
156156
}
157157
}
158158

159-
#[derive(Clone, Serialize, Deserialize)]
159+
#[derive(Debug, Clone, Serialize, Deserialize)]
160160
pub struct FileParseErrorAtLine {
161161
pub error: FileParseError,
162162
pub line: usize,

src/common/storage/src/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use serde::Deserialize;
1616
use serde::Serialize;
1717

18-
#[derive(Default, Clone, Serialize, Deserialize)]
18+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1919
pub struct MutationStatus {
2020
pub insert_rows: u64,
2121
pub deleted_rows: u64,

src/query/catalog/src/plan/agg_index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub struct AggIndexInfo {
5151
}
5252

5353
/// This meta just indicate the block is from aggregating index.
54-
#[derive(Debug, Clone)]
54+
#[derive(Debug, Clone, Copy)]
5555
pub struct AggIndexMeta {
5656
pub is_agg: bool,
5757
// Number of aggregation functions.
@@ -75,6 +75,6 @@ local_block_meta_serde!(AggIndexMeta);
7575
#[typetag::serde(name = "agg_index_meta")]
7676
impl BlockMetaInfo for AggIndexMeta {
7777
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
78-
Box::new(self.clone())
78+
Box::new(*self)
7979
}
8080
}

src/query/catalog/src/statistics/data_cache_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub struct DataCacheMetrics {
2424
bytes_from_memory: AtomicUsize,
2525
}
2626

27-
#[derive(Default, Clone, Serialize, Deserialize)]
27+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
2828
pub struct DataCacheMetricValues {
2929
pub bytes_from_remote_disk: usize,
3030
pub bytes_from_local_disk: usize,

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ use super::AggrState;
2222
use super::AggrStateLoc;
2323
use super::AggrStateRegistry;
2424
use super::StateAddr;
25-
use crate::types::BinaryType;
2625
use crate::types::DataType;
2726
use crate::BlockEntry;
2827
use crate::ColumnBuilder;
29-
use crate::ColumnView;
3028
use crate::ProjectedBlock;
3129
use crate::Scalar;
30+
use crate::StateSerdeItem;
31+
use crate::StateSerdeType;
3232

3333
pub type AggregateFunctionRef = Arc<dyn AggregateFunction>;
3434

@@ -69,35 +69,28 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
6969
// Used in aggregate_null_adaptor
7070
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>;
7171

72-
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;
72+
fn serialize_type(&self) -> Vec<StateSerdeItem>;
7373

74-
fn serialize_size_per_row(&self) -> Option<usize> {
75-
None
74+
fn serialize_data_type(&self) -> DataType {
75+
let serde_type = StateSerdeType::new(self.serialize_type());
76+
serde_type.data_type()
7677
}
7778

78-
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>;
79+
fn batch_serialize(
80+
&self,
81+
places: &[StateAddr],
82+
loc: &[AggrStateLoc],
83+
builders: &mut [ColumnBuilder],
84+
) -> Result<()>;
7985

80-
/// Batch merge and deserialize the state from binary array
86+
/// Batch deserialize the state and merge
8187
fn batch_merge(
8288
&self,
8389
places: &[StateAddr],
8490
loc: &[AggrStateLoc],
85-
state: &ColumnView<BinaryType>,
86-
) -> Result<()> {
87-
for (place, mut data) in places.iter().zip(state.iter()) {
88-
self.merge(AggrState::new(*place, loc), &mut data)?;
89-
}
90-
91-
Ok(())
92-
}
93-
94-
fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> {
95-
let view = state.downcast::<BinaryType>().unwrap();
96-
for mut data in view.iter() {
97-
self.merge(place, &mut data)?;
98-
}
99-
Ok(())
100-
}
91+
state: &BlockEntry,
92+
filter: Option<&Bitmap>,
93+
) -> Result<()>;
10194

10295
fn batch_merge_states(
10396
&self,
@@ -149,9 +142,4 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
149142
fn get_if_condition(&self, _columns: ProjectedBlock) -> Option<Bitmap> {
150143
None
151144
}
152-
153-
// some features
154-
fn convert_const_to_full(&self) -> bool {
155-
true
156-
}
157145
}

src/query/expression/src/aggregate/aggregate_function_state.rs

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use enum_as_inner::EnumAsInner;
2020

2121
use super::AggregateFunctionRef;
2222
use crate::types::binary::BinaryColumnBuilder;
23+
use crate::types::DataType;
24+
use crate::ColumnBuilder;
2325

2426
#[derive(Clone, Copy, Debug)]
2527
pub struct StateAddr {
@@ -113,11 +115,11 @@ impl From<StateAddr> for usize {
113115

114116
pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result<StatesLayout> {
115117
let mut registry = AggrStateRegistry::default();
116-
let mut serialize_size = Vec::with_capacity(funcs.len());
118+
let mut serialize_type = Vec::with_capacity(funcs.len());
117119
for func in funcs {
118120
func.register_state(&mut registry);
119121
registry.commit();
120-
serialize_size.push(func.serialize_size_per_row());
122+
serialize_type.push(StateSerdeType(func.serialize_type().into()));
121123
}
122124

123125
let AggrStateRegistry { states, offsets } = registry;
@@ -132,7 +134,7 @@ pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result<StatesLayout>
132134
Ok(StatesLayout {
133135
layout,
134136
states_loc,
135-
serialize_size,
137+
serialize_type,
136138
})
137139
}
138140

@@ -191,18 +193,66 @@ impl AggrStateLoc {
191193
}
192194
}
193195

196+
#[derive(Debug, Clone)]
197+
pub enum StateSerdeItem {
198+
DataType(DataType),
199+
Binary(Option<usize>),
200+
}
201+
202+
#[derive(Debug, Clone)]
203+
pub struct StateSerdeType(Box<[StateSerdeItem]>);
204+
205+
impl StateSerdeType {
206+
pub fn new(items: impl Into<Box<[StateSerdeItem]>>) -> Self {
207+
StateSerdeType(items.into())
208+
}
209+
210+
pub fn data_type(&self) -> DataType {
211+
DataType::Tuple(
212+
self.0
213+
.iter()
214+
.map(|item| match item {
215+
StateSerdeItem::DataType(data_type) => data_type.clone(),
216+
StateSerdeItem::Binary(_) => DataType::Binary,
217+
})
218+
.collect(),
219+
)
220+
}
221+
}
222+
194223
#[derive(Debug, Clone)]
195224
pub struct StatesLayout {
196225
pub layout: Layout,
197226
pub states_loc: Vec<Box<[AggrStateLoc]>>,
198-
serialize_size: Vec<Option<usize>>,
227+
pub(super) serialize_type: Vec<StateSerdeType>,
199228
}
200229

201230
impl StatesLayout {
202-
pub fn serialize_builders(&self, num_rows: usize) -> Vec<BinaryColumnBuilder> {
203-
self.serialize_size
231+
pub fn num_aggr_func(&self) -> usize {
232+
self.states_loc.len()
233+
}
234+
235+
pub fn serialize_builders(&self, num_rows: usize) -> Vec<ColumnBuilder> {
236+
self.serialize_type
204237
.iter()
205-
.map(|size| BinaryColumnBuilder::with_capacity(num_rows, num_rows * size.unwrap_or(0)))
238+
.map(|serde_type| {
239+
let builder = serde_type
240+
.0
241+
.iter()
242+
.map(|item| match item {
243+
StateSerdeItem::DataType(data_type) => {
244+
ColumnBuilder::with_capacity(data_type, num_rows)
245+
}
246+
StateSerdeItem::Binary(size) => {
247+
ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity(
248+
num_rows,
249+
num_rows * size.unwrap_or(0),
250+
))
251+
}
252+
})
253+
.collect();
254+
ColumnBuilder::Tuple(builder)
255+
})
206256
.collect()
207257
}
208258
}

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use crate::aggregate::payload_row::row_match_columns;
2727
use crate::group_hash_columns;
2828
use crate::new_sel;
2929
use crate::read;
30-
use crate::types::BinaryType;
3130
use crate::types::DataType;
3231
use crate::AggregateFunctionRef;
3332
use crate::BlockEntry;
@@ -219,7 +218,7 @@ impl AggregateHashTable {
219218
.zip(agg_states.iter())
220219
.zip(states_layout.states_loc.iter())
221220
{
222-
func.batch_merge(state_places, loc, &state.downcast::<BinaryType>().unwrap())?;
221+
func.batch_merge(state_places, loc, state, None)?;
223222
}
224223
}
225224
}

src/query/expression/src/aggregate/payload.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -421,11 +421,15 @@ impl Payload {
421421
true
422422
}
423423

424-
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
425-
let fake_rows = fake_rows.unwrap_or(0);
426-
let entries = (0..self.aggrs.len())
427-
.map(|_| {
428-
ColumnBuilder::repeat_default(&DataType::Binary, fake_rows)
424+
pub fn empty_block(&self, fake_rows: usize) -> DataBlock {
425+
assert_eq!(self.aggrs.is_empty(), self.states_layout.is_none());
426+
let entries = self
427+
.states_layout
428+
.as_ref()
429+
.iter()
430+
.flat_map(|layout| layout.serialize_type.iter())
431+
.map(|serde_type| {
432+
ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows)
429433
.build()
430434
.into()
431435
})

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use databend_common_io::prelude::bincode_deserialize_from_slice;
1818
use super::partitioned_payload::PartitionedPayload;
1919
use super::payload::Payload;
2020
use super::probe_state::ProbeState;
21-
use super::AggrState;
2221
use crate::read;
2322
use crate::types::binary::BinaryColumn;
2423
use crate::types::binary::BinaryColumnBuilder;
@@ -125,7 +124,7 @@ impl Payload {
125124
}
126125

127126
if blocks.is_empty() {
128-
return Ok(self.empty_block(None));
127+
return Ok(self.empty_block(0));
129128
}
130129
DataBlock::concat(&blocks)
131130
}
@@ -141,26 +140,17 @@ impl Payload {
141140
if let Some(state_layout) = self.states_layout.as_ref() {
142141
let mut builders = state_layout.serialize_builders(row_count);
143142

144-
for place in state.state_places.as_slice()[0..row_count].iter() {
145-
for (idx, (loc, func)) in state_layout
146-
.states_loc
147-
.iter()
148-
.zip(self.aggrs.iter())
149-
.enumerate()
150-
{
151-
{
152-
let builder = &mut builders[idx];
153-
func.serialize(AggrState::new(*place, loc), &mut builder.data)?;
154-
builder.commit_row();
155-
}
156-
}
143+
for ((loc, func), builder) in state_layout
144+
.states_loc
145+
.iter()
146+
.zip(self.aggrs.iter())
147+
.zip(builders.iter_mut())
148+
{
149+
let builders = builder.as_tuple_mut().unwrap().as_mut_slice();
150+
func.batch_serialize(&state.state_places.as_slice()[0..row_count], loc, builders)?;
157151
}
158152

159-
entries.extend(
160-
builders
161-
.into_iter()
162-
.map(|builder| Column::Binary(builder.build()).into()),
163-
);
153+
entries.extend(builders.into_iter().map(|builder| builder.build().into()));
164154
}
165155

166156
entries.extend_from_slice(&state.take_group_columns());
@@ -177,7 +167,7 @@ impl Payload {
177167
}
178168

179169
if blocks.is_empty() {
180-
return Ok(self.empty_block(None));
170+
return Ok(self.empty_block(0));
181171
}
182172

183173
DataBlock::concat(&blocks)

0 commit comments

Comments
 (0)