Skip to content

Commit b76bef0

Browse files
authored
refactor(query): adjust the serialization of each aggregate function state one by one (#18462)
1 parent f91bf6f commit b76bef0

File tree

57 files changed

+2887
-1792
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2887
-1792
lines changed

โ€ŽCargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,17 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
113113
builder: &mut ColumnBuilder,
114114
) -> Result<()> {
115115
for place in places {
116-
self.merge_result(AggrState::new(*place, &loc), builder)?;
116+
self.merge_result(AggrState::new(*place, &loc), false, builder)?;
117117
}
118118
Ok(())
119119
}
120120

121-
fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()>;
121+
fn merge_result(
122+
&self,
123+
place: AggrState,
124+
read_only: bool,
125+
builder: &mut ColumnBuilder,
126+
) -> Result<()>;
122127

123128
// std::mem::needs_drop::<State>
124129
// if true will call drop_state

โ€Žsrc/query/expression/src/aggregate/aggregate_function_state.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ impl StateAddr {
3737
}
3838

3939
#[inline]
40-
pub fn get<'a, T>(&self) -> &'a mut T {
40+
pub fn get<'a, T>(&self) -> &'a mut T
41+
where T: Send + 'static {
4142
unsafe { &mut *(self.addr as *mut T) }
4243
}
4344

@@ -71,15 +72,19 @@ impl StateAddr {
7172

7273
#[inline]
7374
pub fn write<T, F>(&self, f: F)
74-
where F: FnOnce() -> T {
75+
where
76+
F: FnOnce() -> T,
77+
T: Send + 'static,
78+
{
7579
unsafe {
7680
let ptr = self.addr as *mut T;
7781
std::ptr::write(ptr, f());
7882
}
7983
}
8084

8185
#[inline]
82-
pub fn write_state<T>(&self, state: T) {
86+
pub fn write_state<T>(&self, state: T)
87+
where T: Send + 'static {
8388
unsafe {
8489
let ptr = self.addr as *mut T;
8590
std::ptr::write(ptr, state);
@@ -199,6 +204,12 @@ pub enum StateSerdeItem {
199204
Binary(Option<usize>),
200205
}
201206

207+
impl From<DataType> for StateSerdeItem {
208+
fn from(value: DataType) -> Self {
209+
Self::DataType(value)
210+
}
211+
}
212+
202213
#[derive(Debug, Clone)]
203214
pub struct StateSerdeType(Box<[StateSerdeItem]>);
204215

@@ -268,15 +279,19 @@ impl<'a> AggrState<'a> {
268279
Self { addr, loc }
269280
}
270281

271-
pub fn get<'b, T>(&self) -> &'b mut T {
282+
pub fn get<'b, T>(&self) -> &'b mut T
283+
where T: Send + 'static {
272284
debug_assert_eq!(self.loc.len(), 1);
273285
self.addr
274286
.next(self.loc[0].into_custom().unwrap().1)
275287
.get::<T>()
276288
}
277289

278290
pub fn write<T, F>(&self, f: F)
279-
where F: FnOnce() -> T {
291+
where
292+
F: FnOnce() -> T,
293+
T: Send + 'static,
294+
{
280295
debug_assert_eq!(self.loc.len(), 1);
281296
self.addr
282297
.next(self.loc[0].into_custom().unwrap().1)

โ€Žsrc/query/expression/src/projected_block.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::slice::SliceIndex;
1919
use crate::BlockEntry;
2020
use crate::DataBlock;
2121

22-
#[derive(Copy, Clone)]
22+
#[derive(Debug, Clone, Copy)]
2323
pub struct ProjectedBlock<'a> {
2424
map: Option<&'a [usize]>,
2525
entries: &'a [BlockEntry],

โ€Žsrc/query/expression/src/types.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ use std::ops::Deref;
4545
use std::ops::DerefMut;
4646
use std::ops::Range;
4747

48-
use borsh::BorshDeserialize;
49-
use borsh::BorshSerialize;
5048
use databend_common_ast::ast::TypeName;
5149
pub use databend_common_base::base::OrderedFloat;
5250
pub use databend_common_io::deserialize_bitmap;
@@ -105,18 +103,7 @@ use crate::ScalarRef;
105103

106104
pub type GenericMap = [DataType];
107105

108-
#[derive(
109-
Debug,
110-
Clone,
111-
PartialEq,
112-
Eq,
113-
Hash,
114-
Serialize,
115-
Deserialize,
116-
BorshSerialize,
117-
BorshDeserialize,
118-
EnumAsInner,
119-
)]
106+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, EnumAsInner)]
120107
pub enum DataType {
121108
Null,
122109
EmptyArray,
@@ -494,7 +481,7 @@ pub fn convert_to_type_name(ty: &DataType) -> TypeName {
494481

495482
/// [AccessType] defines a series of access methods for a data type
496483
pub trait AccessType: Debug + Clone + PartialEq + Sized + 'static {
497-
type Scalar: Debug + Clone + PartialEq;
484+
type Scalar: Debug + Clone + PartialEq + Send + 'static;
498485
type ScalarRef<'a>: Debug + Clone + PartialEq;
499486
type Column: Debug + Clone + PartialEq + Send;
500487
type Domain: Debug + Clone + PartialEq;

โ€Žsrc/query/functions/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ jiff = { workspace = true }
4545
jsonb = { workspace = true }
4646
lexical-core = { workspace = true }
4747
libm = { workspace = true }
48+
log = { workspace = true }
4849
match-template = { workspace = true }
4950
md-5 = { workspace = true }
5051
naive-cityhash = { workspace = true }

โ€Žsrc/query/functions/src/aggregates/aggregate_combinator_distinct.rs renamed to โ€Žsrc/query/functions/src/aggregates/adaptors/aggregate_combinator_distinct.rs

Lines changed: 29 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ use std::sync::Arc;
1919

2020
use databend_common_exception::Result;
2121
use databend_common_expression::types::number::NumberColumnBuilder;
22-
use databend_common_expression::types::BinaryType;
2322
use databend_common_expression::types::Bitmap;
2423
use databend_common_expression::types::DataType;
2524
use databend_common_expression::types::NumberDataType;
26-
use databend_common_expression::types::UnaryType;
2725
use databend_common_expression::with_number_mapped_type;
2826
use databend_common_expression::AggrStateRegistry;
2927
use databend_common_expression::AggrStateType;
@@ -38,16 +36,16 @@ use super::aggregate_distinct_state::AggregateDistinctState;
3836
use super::aggregate_distinct_state::AggregateDistinctStringState;
3937
use super::aggregate_distinct_state::AggregateUniqStringState;
4038
use super::aggregate_distinct_state::DistinctStateFunc;
41-
use super::aggregate_function::AggregateFunction;
42-
use super::aggregate_function_factory::AggregateFunctionCreator;
43-
use super::aggregate_function_factory::AggregateFunctionDescription;
44-
use super::aggregate_function_factory::AggregateFunctionSortDesc;
45-
use super::aggregate_function_factory::CombinatorDescription;
46-
use super::aggregator_common::assert_variadic_arguments;
39+
use super::assert_variadic_arguments;
40+
use super::AggrState;
41+
use super::AggrStateLoc;
4742
use super::AggregateCountFunction;
48-
use crate::aggregates::AggrState;
49-
use crate::aggregates::AggrStateLoc;
50-
use crate::aggregates::StateAddr;
43+
use super::AggregateFunction;
44+
use super::AggregateFunctionCreator;
45+
use super::AggregateFunctionDescription;
46+
use super::AggregateFunctionSortDesc;
47+
use super::CombinatorDescription;
48+
use super::StateAddr;
5149

5250
#[derive(Clone)]
5351
pub struct AggregateDistinctCombinator<State> {
@@ -56,10 +54,12 @@ pub struct AggregateDistinctCombinator<State> {
5654
nested_name: String,
5755
arguments: Vec<DataType>,
5856
nested: Arc<dyn AggregateFunction>,
59-
_state: PhantomData<State>,
57+
_s: PhantomData<fn(State)>,
6058
}
6159

62-
impl<State> AggregateDistinctCombinator<State> {
60+
impl<State> AggregateDistinctCombinator<State>
61+
where State: Send + 'static
62+
{
6363
fn get_state(place: AggrState) -> &mut State {
6464
place
6565
.addr
@@ -113,7 +113,7 @@ where State: DistinctStateFunc
113113
}
114114

115115
fn serialize_type(&self) -> Vec<StateSerdeItem> {
116-
vec![StateSerdeItem::Binary(None)]
116+
State::serialize_type(None)
117117
}
118118

119119
fn batch_serialize(
@@ -122,13 +122,7 @@ where State: DistinctStateFunc
122122
loc: &[AggrStateLoc],
123123
builders: &mut [ColumnBuilder],
124124
) -> Result<()> {
125-
let binary_builder = builders[0].as_binary_mut().unwrap();
126-
for place in places {
127-
let state = Self::get_state(AggrState::new(*place, loc));
128-
state.serialize(&mut binary_builder.data)?;
129-
binary_builder.commit_row();
130-
}
131-
Ok(())
125+
State::batch_serialize(places, &loc[..1], builders)
132126
}
133127

134128
fn batch_merge(
@@ -138,23 +132,7 @@ where State: DistinctStateFunc
138132
state: &BlockEntry,
139133
filter: Option<&Bitmap>,
140134
) -> Result<()> {
141-
let view = state.downcast::<UnaryType<BinaryType>>().unwrap();
142-
let iter = places.iter().zip(view.iter());
143-
144-
if let Some(filter) = filter {
145-
for (place, mut data) in iter.zip(filter.iter()).filter_map(|(v, b)| b.then_some(v)) {
146-
let state = Self::get_state(AggrState::new(*place, loc));
147-
let rhs = State::deserialize(&mut data)?;
148-
state.merge(&rhs)?;
149-
}
150-
} else {
151-
for (place, mut data) in iter {
152-
let state = Self::get_state(AggrState::new(*place, loc));
153-
let rhs = State::deserialize(&mut data)?;
154-
state.merge(&rhs)?;
155-
}
156-
}
157-
Ok(())
135+
State::batch_merge(places, &loc[..1], state, filter)
158136
}
159137

160138
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
@@ -163,7 +141,12 @@ where State: DistinctStateFunc
163141
state.merge(other)
164142
}
165143

166-
fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> {
144+
fn merge_result(
145+
&self,
146+
place: AggrState,
147+
read_only: bool,
148+
builder: &mut ColumnBuilder,
149+
) -> Result<()> {
167150
let state = Self::get_state(place);
168151
let nested_place = place.remove_first_loc();
169152

@@ -178,13 +161,13 @@ where State: DistinctStateFunc
178161
Ok(())
179162
} else {
180163
if state.is_empty() {
181-
return self.nested.merge_result(nested_place, builder);
164+
return self.nested.merge_result(nested_place, read_only, builder);
182165
}
183166
let entries = &state.build_entries(&self.arguments).unwrap();
184167
self.nested
185168
.accumulate(nested_place, entries.into(), None, state.len())?;
186169
// merge_result
187-
self.nested.merge_result(nested_place, builder)
170+
self.nested.merge_result(nested_place, read_only, builder)
188171
}
189172
}
190173

@@ -220,7 +203,7 @@ pub fn aggregate_combinator_distinct_desc() -> CombinatorDescription {
220203
}
221204

222205
pub fn aggregate_combinator_uniq_desc() -> AggregateFunctionDescription {
223-
let features = super::aggregate_function_factory::AggregateFunctionFeatures {
206+
let features = super::AggregateFunctionFeatures {
224207
returns_default_when_only_null: true,
225208
..Default::default()
226209
};
@@ -264,7 +247,7 @@ pub fn try_create(
264247
arguments,
265248
nested,
266249
name,
267-
_state: PhantomData,
250+
_s: PhantomData,
268251
}));
269252
}
270253
}),
@@ -277,7 +260,7 @@ pub fn try_create(
277260
arguments,
278261
nested,
279262
nested_name: nested_name.to_owned(),
280-
_state: PhantomData,
263+
_s: PhantomData,
281264
})),
282265
_ => Ok(Arc::new(AggregateDistinctCombinator::<
283266
AggregateDistinctStringState,
@@ -286,7 +269,7 @@ pub fn try_create(
286269
arguments,
287270
nested,
288271
name,
289-
_state: PhantomData,
272+
_s: PhantomData,
290273
})),
291274
};
292275
}
@@ -300,6 +283,6 @@ pub fn try_create(
300283
arguments,
301284
nested,
302285
name,
303-
_state: PhantomData,
286+
_s: PhantomData,
304287
}))
305288
}

โ€Žsrc/query/functions/src/aggregates/aggregate_combinator_if.rs renamed to โ€Žsrc/query/functions/src/aggregates/adaptors/aggregate_combinator_if.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ use databend_common_expression::ProjectedBlock;
2828
use databend_common_expression::Scalar;
2929
use databend_common_expression::StateSerdeItem;
3030

31+
use super::AggrState;
32+
use super::AggrStateLoc;
33+
use super::AggregateFunction;
34+
use super::AggregateFunctionCreator;
35+
use super::AggregateFunctionRef;
36+
use super::AggregateFunctionSortDesc;
37+
use super::CombinatorDescription;
3138
use super::StateAddr;
32-
use crate::aggregates::aggregate_function_factory::AggregateFunctionCreator;
33-
use crate::aggregates::aggregate_function_factory::AggregateFunctionSortDesc;
34-
use crate::aggregates::aggregate_function_factory::CombinatorDescription;
35-
use crate::aggregates::AggrState;
36-
use crate::aggregates::AggrStateLoc;
37-
use crate::aggregates::AggregateFunction;
38-
use crate::aggregates::AggregateFunctionRef;
39-
use crate::aggregates::StateAddrs;
39+
use super::StateAddrs;
4040

4141
#[derive(Clone)]
4242
pub struct AggregateIfCombinator {
@@ -181,8 +181,13 @@ impl AggregateFunction for AggregateIfCombinator {
181181
self.nested.merge_states(place, rhs)
182182
}
183183

184-
fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> {
185-
self.nested.merge_result(place, builder)
184+
fn merge_result(
185+
&self,
186+
place: AggrState,
187+
read_only: bool,
188+
builder: &mut ColumnBuilder,
189+
) -> Result<()> {
190+
self.nested.merge_result(place, read_only, builder)
186191
}
187192

188193
fn need_manual_drop_state(&self) -> bool {

0 commit comments

Comments
ย (0)