From 546aafdce6f1b39cdae129024f72ddfa0f0769b9 Mon Sep 17 00:00:00 2001 From: Pxl Date: Mon, 29 Dec 2025 11:30:33 +0800 Subject: [PATCH 1/2] [Improvement](function) optimize trivial function deserialize_and_merge_vec (#58882) ### What problem does this PR solve? ```sql SELECT max(csales) tpcds_cmax FROM ( SELECT c_customer_sk, sum((ss_quantity * ss_sales_price)) csales FROM store_sales, customer, date_dim WHERE (ss_customer_sk = c_customer_sk) AND (ss_sold_date_sk = d_date_sk) AND ( d_year IN ( 2000, (2000 + 1), (2000 + 2), (2000 + 3) ) ) GROUP BY c_customer_sk ) x; ``` DeserializeAndMergeTime 202123088 row avg old d4f6f5: 1min25sec sum old d4f6f5: 1min9sec avg new: 45sec157ms sum new: 36sec84ms --- .../aggregate_functions/aggregate_function.h | 8 +- .../aggregate_function_avg.h | 4 +- .../aggregate_function_foreach.h | 4 +- .../aggregate_function_null.h | 89 +++++++++++++++++-- .../aggregate_function_state_union.h | 2 +- .../aggregate_function_sum.h | 4 +- be/src/vec/common/string_buffer.hpp | 3 +- be/test/vec/olap/jsonb_value_test.cpp | 1 + 8 files changed, 96 insertions(+), 19 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index b9c2de9c148f0f..cdcef2f248f593 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -27,13 +27,11 @@ #include "common/status.h" #include "util/defer_op.h" #include "vec/columns/column_complex.h" +#include "vec/columns/column_fixed_length_object.h" #include "vec/columns/column_string.h" #include "vec/common/assert_cast.h" #include "vec/common/hash_table/phmap_fwd_decl.h" #include "vec/common/string_buffer.hpp" -#include "vec/core/block.h" -#include "vec/core/column_numbers.h" -#include "vec/core/field.h" #include "vec/core/types.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_string.h" @@ -110,7 +108,7 @@ class IAggregateFunction { virtual void reset(AggregateDataPtr place) const = 0; /// It is not necessary to delete data. - virtual bool has_trivial_destructor() const = 0; + virtual bool is_trivial() const = 0; /// Get `sizeof` of structure with data. virtual size_t size_of_data() const = 0; @@ -641,7 +639,7 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper { void destroy(AggregateDataPtr __restrict place) const noexcept override { data(place).~Data(); } - bool has_trivial_destructor() const override { return std::is_trivially_destructible_v; } + bool is_trivial() const override { return false; } size_t size_of_data() const override { return sizeof(Data); } diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h index 0e263c0ec795c1..f36a11e04d0759 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -147,6 +147,8 @@ class AggregateFunctionAvg final } } + bool is_trivial() const override { return true; } + template NO_SANITIZE_UNDEFINED void update_value(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num) const { @@ -271,7 +273,6 @@ class AggregateFunctionAvg final AggregateDataPtr rhs, const IColumn* column, Arena& arena, const size_t num_rows) const override { this->deserialize_from_column(rhs, *column, arena, num_rows); - DEFER({ this->destroy_vec(rhs, num_rows); }); this->merge_vec(places, offset, rhs, arena, num_rows); } @@ -279,7 +280,6 @@ class AggregateFunctionAvg final AggregateDataPtr rhs, const IColumn* column, Arena& arena, const size_t num_rows) const override { this->deserialize_from_column(rhs, *column, arena, num_rows); - DEFER({ this->destroy_vec(rhs, num_rows); }); this->merge_vec_selected(places, offset, rhs, arena, num_rows); } diff --git a/be/src/vec/aggregate_functions/aggregate_function_foreach.h b/be/src/vec/aggregate_functions/aggregate_function_foreach.h index b39ce6a530edee..0e7eb9de49509b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_foreach.h +++ b/be/src/vec/aggregate_functions/aggregate_function_foreach.h @@ -158,8 +158,8 @@ class AggregateFunctionForEach : public IAggregateFunctionDataHelper && nested_function->has_trivial_destructor(); + bool is_trivial() const override { + return std::is_trivial_v && nested_function->is_trivial(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h index b9477e710c48f9..5b8e5559a3c3d2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.h +++ b/be/src/vec/aggregate_functions/aggregate_function_null.h @@ -23,13 +23,16 @@ #include #include +#include +#include "common/exception.h" #include "common/logging.h" #include "common/status.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/aggregate_functions/aggregate_function_distinct.h" #include "vec/columns/column_nullable.h" #include "vec/common/assert_cast.h" +#include "vec/common/string_buffer.hpp" #include "vec/core/types.h" #include "vec/data_types/data_type_nullable.h" @@ -161,9 +164,7 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper nested_function->reset(nested_place(place)); } - bool has_trivial_destructor() const override { - return nested_function->has_trivial_destructor(); - } + bool is_trivial() const override { return false; } size_t size_of_data() const override { return prefix_size + nested_function->size_of_data(); } @@ -177,11 +178,10 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena& arena) const override { - if (result_is_nullable && get_flag(rhs)) { + if (get_flag(rhs)) { set_flag(place); + nested_function->merge(nested_place(place), nested_place(rhs), arena); } - - nested_function->merge(nested_place(place), nested_place(rhs), arena); } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { @@ -206,6 +206,83 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const IColumn* column, Arena& arena, + const size_t num_rows) const override { + if (nested_function->is_trivial()) { + BufferReadable buf({column->get_data_at(0).data, 0}); + size_t size_of_data = this->size_of_data(); + if constexpr (result_is_nullable) { + for (int i = 0; i != num_rows; ++i) { + buf.read_binary(*(bool*)(rhs + size_of_data * i)); + if (get_flag(rhs + size_of_data * i)) { + nested_function->deserialize(nested_place(rhs + size_of_data * i), buf, + arena); + } + } + for (size_t i = 0; i != num_rows; ++i) { + if (get_flag(rhs + size_of_data * i)) { + set_flag(places[i] + offset); + nested_function->merge(nested_place(places[i] + offset), + nested_place(rhs + size_of_data * i), arena); + } + } + } else { + for (size_t i = 0; i != num_rows; ++i) { + nested_function->deserialize(rhs + size_of_data * i, buf, arena); + } + for (size_t i = 0; i != num_rows; ++i) { + nested_function->merge(places[i] + offset, rhs + size_of_data * i, arena); + } + } + } else { + IAggregateFunctionHelper::deserialize_and_merge_vec(places, offset, rhs, + column, arena, num_rows); + } + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const IColumn* column, + Arena& arena, const size_t num_rows) const override { + if (nested_function->is_trivial()) { + BufferReadable buf({column->get_data_at(0).data, 0}); + size_t size_of_data = this->size_of_data(); + if constexpr (result_is_nullable) { + for (int i = 0; i != num_rows; ++i) { + if (!places[i]) { + continue; + } + buf.read_binary(*(bool*)(rhs + size_of_data * i)); + if (get_flag(rhs + size_of_data * i)) { + nested_function->deserialize(nested_place(rhs + size_of_data * i), buf, + arena); + } + } + for (size_t i = 0; i != num_rows; ++i) { + if (places[i] && get_flag(rhs + size_of_data * i)) { + set_flag(places[i] + offset); + nested_function->merge(nested_place(places[i] + offset), + nested_place(rhs + size_of_data * i), arena); + } + } + } else { + for (size_t i = 0; i != num_rows; ++i) { + if (places[i]) { + nested_function->deserialize(rhs + size_of_data * i, buf, arena); + } + } + for (size_t i = 0; i != num_rows; ++i) { + if (places[i]) { + nested_function->merge(places[i] + offset, rhs + size_of_data * i, arena); + } + } + } + } else { + IAggregateFunctionHelper::deserialize_and_merge_vec_selected( + places, offset, rhs, column, arena, num_rows); + } + } + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, BufferReadable& buf, Arena& arena) const override { bool flag = true; diff --git a/be/src/vec/aggregate_functions/aggregate_function_state_union.h b/be/src/vec/aggregate_functions/aggregate_function_state_union.h index 9b9bfb84fc4439..09705cc3647d0b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_state_union.h +++ b/be/src/vec/aggregate_functions/aggregate_function_state_union.h @@ -101,7 +101,7 @@ class AggregateStateUnion : public IAggregateFunctionHelper _function->destroy(place); } - bool has_trivial_destructor() const override { return _function->has_trivial_destructor(); } + bool is_trivial() const override { return false; } size_t size_of_data() const override { return _function->size_of_data(); } diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h b/be/src/vec/aggregate_functions/aggregate_function_sum.h index 81a6127d9a9b55..e7967db7d9498e 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sum.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h @@ -102,6 +102,8 @@ class AggregateFunctionSum final } } + bool is_trivial() const override { return true; } + void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, Arena&) const override { const auto& column = @@ -192,7 +194,6 @@ class AggregateFunctionSum final AggregateDataPtr rhs, const IColumn* column, Arena& arena, const size_t num_rows) const override { this->deserialize_from_column(rhs, *column, arena, num_rows); - DEFER({ this->destroy_vec(rhs, num_rows); }); this->merge_vec(places, offset, rhs, arena, num_rows); } @@ -200,7 +201,6 @@ class AggregateFunctionSum final AggregateDataPtr rhs, const IColumn* column, Arena& arena, const size_t num_rows) const override { this->deserialize_from_column(rhs, *column, arena, num_rows); - DEFER({ this->destroy_vec(rhs, num_rows); }); this->merge_vec_selected(places, offset, rhs, arena, num_rows); } diff --git a/be/src/vec/common/string_buffer.hpp b/be/src/vec/common/string_buffer.hpp index 33a8397f610cd3..15d4f933db9c66 100644 --- a/be/src/vec/common/string_buffer.hpp +++ b/be/src/vec/common/string_buffer.hpp @@ -256,7 +256,8 @@ class BufferReadable { template void read_binary(Type& x) { static_assert(std::is_standard_layout_v); - read(reinterpret_cast(&x), sizeof(x)); + memcpy_fixed(reinterpret_cast(&x), _data); + _data += sizeof(x); } template diff --git a/be/test/vec/olap/jsonb_value_test.cpp b/be/test/vec/olap/jsonb_value_test.cpp index d6b5db784e2b47..97f858d63e19a9 100644 --- a/be/test/vec/olap/jsonb_value_test.cpp +++ b/be/test/vec/olap/jsonb_value_test.cpp @@ -23,6 +23,7 @@ #include "gtest/gtest_pred_impl.h" #include "vec/columns/column_string.h" #include "vec/common/string_ref.h" +#include "vec/core/block.h" #include "vec/core/columns_with_type_and_name.h" #include "vec/data_types/serde/data_type_serde.h" #include "vec/olap/olap_data_convertor.h" From 59c959eec661e6be24cd8baabe9befcf139884e1 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 5 Jan 2026 11:58:14 +0800 Subject: [PATCH 2/2] update --- be/src/vec/exec/format/table/paimon_jni_reader.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index 942f6a83971715..f62e7afa14c9b6 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -22,6 +22,7 @@ #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/types.h" +#include "vec/core/block.h" #include "vec/core/types.h" namespace doris { class RuntimeProfile;