2323#include < glog/logging.h>
2424
2525#include < array>
26+ #include < memory>
2627
28+ #include " common/exception.h"
2729#include " common/logging.h"
2830#include " common/status.h"
2931#include " vec/aggregate_functions/aggregate_function.h"
3032#include " vec/aggregate_functions/aggregate_function_distinct.h"
3133#include " vec/columns/column_nullable.h"
3234#include " vec/common/assert_cast.h"
35+ #include " vec/common/string_buffer.hpp"
3336#include " vec/core/types.h"
3437#include " vec/data_types/data_type_nullable.h"
3538
@@ -161,9 +164,7 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
161164 nested_function->reset (nested_place (place));
162165 }
163166
164- bool has_trivial_destructor () const override {
165- return nested_function->has_trivial_destructor ();
166- }
167+ bool is_trivial () const override { return false ; }
167168
168169 size_t size_of_data () const override { return prefix_size + nested_function->size_of_data (); }
169170
@@ -177,11 +178,10 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
177178
178179 void merge (AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
179180 Arena& arena) const override {
180- if (result_is_nullable && get_flag (rhs)) {
181+ if (get_flag (rhs)) {
181182 set_flag (place);
183+ nested_function->merge (nested_place (place), nested_place (rhs), arena);
182184 }
183-
184- nested_function->merge (nested_place (place), nested_place (rhs), arena);
185185 }
186186
187187 void serialize (ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
@@ -206,6 +206,83 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
206206 }
207207 }
208208
209+ void deserialize_and_merge_vec (const AggregateDataPtr* places, size_t offset,
210+ AggregateDataPtr rhs, const IColumn* column, Arena& arena,
211+ const size_t num_rows) const override {
212+ if (nested_function->is_trivial ()) {
213+ BufferReadable buf ({column->get_data_at (0 ).data , 0 });
214+ size_t size_of_data = this ->size_of_data ();
215+ if constexpr (result_is_nullable) {
216+ for (int i = 0 ; i != num_rows; ++i) {
217+ buf.read_binary (*(bool *)(rhs + size_of_data * i));
218+ if (get_flag (rhs + size_of_data * i)) {
219+ nested_function->deserialize (nested_place (rhs + size_of_data * i), buf,
220+ arena);
221+ }
222+ }
223+ for (size_t i = 0 ; i != num_rows; ++i) {
224+ if (get_flag (rhs + size_of_data * i)) {
225+ set_flag (places[i] + offset);
226+ nested_function->merge (nested_place (places[i] + offset),
227+ nested_place (rhs + size_of_data * i), arena);
228+ }
229+ }
230+ } else {
231+ for (size_t i = 0 ; i != num_rows; ++i) {
232+ nested_function->deserialize (rhs + size_of_data * i, buf, arena);
233+ }
234+ for (size_t i = 0 ; i != num_rows; ++i) {
235+ nested_function->merge (places[i] + offset, rhs + size_of_data * i, arena);
236+ }
237+ }
238+ } else {
239+ IAggregateFunctionHelper<Derived>::deserialize_and_merge_vec (places, offset, rhs,
240+ column, arena, num_rows);
241+ }
242+ }
243+
244+ void deserialize_and_merge_vec_selected (const AggregateDataPtr* places, size_t offset,
245+ AggregateDataPtr rhs, const IColumn* column,
246+ Arena& arena, const size_t num_rows) const override {
247+ if (nested_function->is_trivial ()) {
248+ BufferReadable buf ({column->get_data_at (0 ).data , 0 });
249+ size_t size_of_data = this ->size_of_data ();
250+ if constexpr (result_is_nullable) {
251+ for (int i = 0 ; i != num_rows; ++i) {
252+ if (!places[i]) {
253+ continue ;
254+ }
255+ buf.read_binary (*(bool *)(rhs + size_of_data * i));
256+ if (get_flag (rhs + size_of_data * i)) {
257+ nested_function->deserialize (nested_place (rhs + size_of_data * i), buf,
258+ arena);
259+ }
260+ }
261+ for (size_t i = 0 ; i != num_rows; ++i) {
262+ if (places[i] && get_flag (rhs + size_of_data * i)) {
263+ set_flag (places[i] + offset);
264+ nested_function->merge (nested_place (places[i] + offset),
265+ nested_place (rhs + size_of_data * i), arena);
266+ }
267+ }
268+ } else {
269+ for (size_t i = 0 ; i != num_rows; ++i) {
270+ if (places[i]) {
271+ nested_function->deserialize (rhs + size_of_data * i, buf, arena);
272+ }
273+ }
274+ for (size_t i = 0 ; i != num_rows; ++i) {
275+ if (places[i]) {
276+ nested_function->merge (places[i] + offset, rhs + size_of_data * i, arena);
277+ }
278+ }
279+ }
280+ } else {
281+ IAggregateFunctionHelper<Derived>::deserialize_and_merge_vec_selected (
282+ places, offset, rhs, column, arena, num_rows);
283+ }
284+ }
285+
209286 void deserialize_and_merge (AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
210287 BufferReadable& buf, Arena& arena) const override {
211288 bool flag = true ;
0 commit comments