Skip to content

Commit 2823d11

Browse files
authored
[Improvement](function) optimize trivial function deserialize_and_merge_vec (#58882)
### What problem does this PR solve? ```sql SELECT max(csales) tpcds_cmax FROM ( SELECT c_customer_sk, sum((ss_quantity * ss_sales_price)) csales FROM store_sales, customer, date_dim WHERE (ss_customer_sk = c_customer_sk) AND (ss_sold_date_sk = d_date_sk) AND ( d_year IN ( 2000, (2000 + 1), (2000 + 2), (2000 + 3) ) ) GROUP BY c_customer_sk ) x; ``` DeserializeAndMergeTime 202123088 row avg old d4f6f5: 1min25sec sum old d4f6f5: 1min9sec avg new: 45sec157ms sum new: 36sec84ms
1 parent f09e5cc commit 2823d11

File tree

8 files changed

+96
-19
lines changed

8 files changed

+96
-19
lines changed

be/src/vec/aggregate_functions/aggregate_function.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@
2727
#include "common/status.h"
2828
#include "util/defer_op.h"
2929
#include "vec/columns/column_complex.h"
30+
#include "vec/columns/column_fixed_length_object.h"
3031
#include "vec/columns/column_string.h"
3132
#include "vec/common/assert_cast.h"
3233
#include "vec/common/hash_table/phmap_fwd_decl.h"
3334
#include "vec/common/string_buffer.hpp"
34-
#include "vec/core/block.h"
35-
#include "vec/core/column_numbers.h"
36-
#include "vec/core/field.h"
3735
#include "vec/core/types.h"
3836
#include "vec/data_types/data_type_nullable.h"
3937
#include "vec/data_types/data_type_string.h"
@@ -110,7 +108,7 @@ class IAggregateFunction {
110108
virtual void reset(AggregateDataPtr place) const = 0;
111109

112110
/// It is not necessary to delete data.
113-
virtual bool has_trivial_destructor() const = 0;
111+
virtual bool is_trivial() const = 0;
114112

115113
/// Get `sizeof` of structure with data.
116114
virtual size_t size_of_data() const = 0;
@@ -641,7 +639,7 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived> {
641639

642640
void destroy(AggregateDataPtr __restrict place) const noexcept override { data(place).~Data(); }
643641

644-
bool has_trivial_destructor() const override { return std::is_trivially_destructible_v<Data>; }
642+
bool is_trivial() const override { return false; }
645643

646644
size_t size_of_data() const override { return sizeof(Data); }
647645

be/src/vec/aggregate_functions/aggregate_function_avg.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ class AggregateFunctionAvg<T, TResult, Data> final
164164
}
165165
}
166166

167+
bool is_trivial() const override { return true; }
168+
167169
template <bool is_add>
168170
NO_SANITIZE_UNDEFINED void update_value(AggregateDataPtr __restrict place,
169171
const IColumn** columns, ssize_t row_num) const {
@@ -292,15 +294,13 @@ class AggregateFunctionAvg<T, TResult, Data> final
292294
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
293295
const size_t num_rows) const override {
294296
this->deserialize_from_column(rhs, *column, arena, num_rows);
295-
DEFER({ this->destroy_vec(rhs, num_rows); });
296297
this->merge_vec(places, offset, rhs, arena, num_rows);
297298
}
298299

299300
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
300301
AggregateDataPtr rhs, const IColumn* column,
301302
Arena& arena, const size_t num_rows) const override {
302303
this->deserialize_from_column(rhs, *column, arena, num_rows);
303-
DEFER({ this->destroy_vec(rhs, num_rows); });
304304
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
305305
}
306306

be/src/vec/aggregate_functions/aggregate_function_foreach.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ class AggregateFunctionForEach : public IAggregateFunctionDataHelper<AggregateFu
158158
}
159159
}
160160

161-
bool has_trivial_destructor() const override {
162-
return std::is_trivially_destructible_v<Data> && nested_function->has_trivial_destructor();
161+
bool is_trivial() const override {
162+
return std::is_trivial_v<Data> && nested_function->is_trivial();
163163
}
164164

165165
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,

be/src/vec/aggregate_functions/aggregate_function_null.h

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
#include <glog/logging.h>
2424

2525
#include <array>
26+
#include <memory>
2627

28+
#include "common/exception.h"
2729
#include "common/logging.h"
2830
#include "common/status.h"
2931
#include "vec/aggregate_functions/aggregate_function.h"
3032
#include "vec/aggregate_functions/aggregate_function_distinct.h"
3133
#include "vec/columns/column_nullable.h"
3234
#include "vec/common/assert_cast.h"
35+
#include "vec/common/string_buffer.hpp"
3336
#include "vec/core/types.h"
3437
#include "vec/data_types/data_type_nullable.h"
3538

@@ -161,9 +164,7 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
161164
nested_function->reset(nested_place(place));
162165
}
163166

164-
bool has_trivial_destructor() const override {
165-
return nested_function->has_trivial_destructor();
166-
}
167+
bool is_trivial() const override { return false; }
167168

168169
size_t size_of_data() const override { return prefix_size + nested_function->size_of_data(); }
169170

@@ -177,11 +178,10 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
177178

178179
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
179180
Arena& arena) const override {
180-
if (result_is_nullable && get_flag(rhs)) {
181+
if (get_flag(rhs)) {
181182
set_flag(place);
183+
nested_function->merge(nested_place(place), nested_place(rhs), arena);
182184
}
183-
184-
nested_function->merge(nested_place(place), nested_place(rhs), arena);
185185
}
186186

187187
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
@@ -206,6 +206,83 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
206206
}
207207
}
208208

209+
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
210+
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
211+
const size_t num_rows) const override {
212+
if (nested_function->is_trivial()) {
213+
BufferReadable buf({column->get_data_at(0).data, 0});
214+
size_t size_of_data = this->size_of_data();
215+
if constexpr (result_is_nullable) {
216+
for (int i = 0; i != num_rows; ++i) {
217+
buf.read_binary(*(bool*)(rhs + size_of_data * i));
218+
if (get_flag(rhs + size_of_data * i)) {
219+
nested_function->deserialize(nested_place(rhs + size_of_data * i), buf,
220+
arena);
221+
}
222+
}
223+
for (size_t i = 0; i != num_rows; ++i) {
224+
if (get_flag(rhs + size_of_data * i)) {
225+
set_flag(places[i] + offset);
226+
nested_function->merge(nested_place(places[i] + offset),
227+
nested_place(rhs + size_of_data * i), arena);
228+
}
229+
}
230+
} else {
231+
for (size_t i = 0; i != num_rows; ++i) {
232+
nested_function->deserialize(rhs + size_of_data * i, buf, arena);
233+
}
234+
for (size_t i = 0; i != num_rows; ++i) {
235+
nested_function->merge(places[i] + offset, rhs + size_of_data * i, arena);
236+
}
237+
}
238+
} else {
239+
IAggregateFunctionHelper<Derived>::deserialize_and_merge_vec(places, offset, rhs,
240+
column, arena, num_rows);
241+
}
242+
}
243+
244+
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
245+
AggregateDataPtr rhs, const IColumn* column,
246+
Arena& arena, const size_t num_rows) const override {
247+
if (nested_function->is_trivial()) {
248+
BufferReadable buf({column->get_data_at(0).data, 0});
249+
size_t size_of_data = this->size_of_data();
250+
if constexpr (result_is_nullable) {
251+
for (int i = 0; i != num_rows; ++i) {
252+
if (!places[i]) {
253+
continue;
254+
}
255+
buf.read_binary(*(bool*)(rhs + size_of_data * i));
256+
if (get_flag(rhs + size_of_data * i)) {
257+
nested_function->deserialize(nested_place(rhs + size_of_data * i), buf,
258+
arena);
259+
}
260+
}
261+
for (size_t i = 0; i != num_rows; ++i) {
262+
if (places[i] && get_flag(rhs + size_of_data * i)) {
263+
set_flag(places[i] + offset);
264+
nested_function->merge(nested_place(places[i] + offset),
265+
nested_place(rhs + size_of_data * i), arena);
266+
}
267+
}
268+
} else {
269+
for (size_t i = 0; i != num_rows; ++i) {
270+
if (places[i]) {
271+
nested_function->deserialize(rhs + size_of_data * i, buf, arena);
272+
}
273+
}
274+
for (size_t i = 0; i != num_rows; ++i) {
275+
if (places[i]) {
276+
nested_function->merge(places[i] + offset, rhs + size_of_data * i, arena);
277+
}
278+
}
279+
}
280+
} else {
281+
IAggregateFunctionHelper<Derived>::deserialize_and_merge_vec_selected(
282+
places, offset, rhs, column, arena, num_rows);
283+
}
284+
}
285+
209286
void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
210287
BufferReadable& buf, Arena& arena) const override {
211288
bool flag = true;

be/src/vec/aggregate_functions/aggregate_function_state_union.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class AggregateStateUnion : public IAggregateFunctionHelper<AggregateStateUnion>
101101
_function->destroy(place);
102102
}
103103

104-
bool has_trivial_destructor() const override { return _function->has_trivial_destructor(); }
104+
bool is_trivial() const override { return false; }
105105

106106
size_t size_of_data() const override { return _function->size_of_data(); }
107107

be/src/vec/aggregate_functions/aggregate_function_sum.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ class AggregateFunctionSum<T, TResult, Data> final
102102
}
103103
}
104104

105+
bool is_trivial() const override { return true; }
106+
105107
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
106108
Arena&) const override {
107109
const auto& column =
@@ -192,15 +194,13 @@ class AggregateFunctionSum<T, TResult, Data> final
192194
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
193195
const size_t num_rows) const override {
194196
this->deserialize_from_column(rhs, *column, arena, num_rows);
195-
DEFER({ this->destroy_vec(rhs, num_rows); });
196197
this->merge_vec(places, offset, rhs, arena, num_rows);
197198
}
198199

199200
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
200201
AggregateDataPtr rhs, const IColumn* column,
201202
Arena& arena, const size_t num_rows) const override {
202203
this->deserialize_from_column(rhs, *column, arena, num_rows);
203-
DEFER({ this->destroy_vec(rhs, num_rows); });
204204
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
205205
}
206206

be/src/vec/common/string_buffer.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ class BufferReadable {
266266
template <typename Type>
267267
void read_binary(Type& x) {
268268
static_assert(std::is_standard_layout_v<Type>);
269-
read(reinterpret_cast<char*>(&x), sizeof(x));
269+
memcpy_fixed<Type>(reinterpret_cast<char*>(&x), _data);
270+
_data += sizeof(x);
270271
}
271272

272273
template <typename Type>

be/test/vec/olap/jsonb_value_test.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "gtest/gtest_pred_impl.h"
2424
#include "vec/columns/column_string.h"
2525
#include "vec/common/string_ref.h"
26+
#include "vec/core/block.h"
2627
#include "vec/core/columns_with_type_and_name.h"
2728
#include "vec/data_types/serde/data_type_serde.h"
2829
#include "vec/olap/olap_data_convertor.h"

0 commit comments

Comments
 (0)