1+ #include < memory>
12#include < Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
23
34#include < AggregateFunctions/AggregateFunctionFactory.h>
45#include < Columns/ColumnAggregateFunction.h>
56#include < Columns/ColumnTuple.h>
7+ #include < Common/Exception.h>
68#include < Common/AlignedBuffer.h>
79#include < Common/Arena.h>
810#include < Common/FieldVisitorSum.h>
911#include < Common/StringUtils.h>
12+ #include < DataTypes/DataTypeTuple.h>
1013#include < DataTypes/DataTypeArray.h>
1114#include < DataTypes/DataTypeCustomSimpleAggregateFunction.h>
1215#include < DataTypes/NestedUtils.h>
@@ -51,7 +54,7 @@ struct SummingSortedAlgorithm::AggregateDescription
5154 bool is_agg_func_type = false ;
5255 bool is_simple_agg_func_type = false ;
5356 bool remove_default_values;
54- bool aggregate_all_columns;
57+ bool aggregate_all_columns = false ;
5558
5659 String sum_function_map_name;
5760
@@ -257,6 +260,45 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
257260 const ColumnWithTypeAndName & column = header.safeGetByPosition (i);
258261
259262 const auto * simple = dynamic_cast <const DataTypeCustomSimpleAggregateFunction *>(column.type ->getCustomName ());
263+ bool is_non_empty_tuple = typeid_cast<const DataTypeTuple *>(column.type .get ()) && !typeid_cast<const DataTypeTuple *>(column.type .get ())->getElements ().empty ();
264+ if (aggregate_all_columns && (is_non_empty_tuple || typeid_cast<const DataTypeArray *>(column.type .get ())) && !simple)
265+ {
266+ const auto map_name = Nested::extractTableName (column.name );
267+ // / if nested table name ends with `Map` it is a possible candidate for special handling
268+ if (map_name == column.name || !endsWith (map_name, " Map" ))
269+ {
270+ bool is_agg_func = WhichDataType (column.type ).isAggregateFunction ();
271+
272+ SummingSortedAlgorithm::AggregateDescription desc;
273+ desc.remove_default_values = remove_default_values;
274+ desc.aggregate_all_columns = aggregate_all_columns;
275+ desc.sum_function_map_name = sum_function_map_name;
276+ desc.is_agg_func_type = is_agg_func;
277+ desc.column_numbers = {i};
278+
279+ desc.real_type = column.type ;
280+ desc.nested_type = recursiveRemoveLowCardinality (desc.real_type );
281+ if (desc.real_type .get () == desc.nested_type .get ())
282+ desc.nested_type = nullptr ;
283+
284+ if (simple)
285+ {
286+ // simple aggregate function
287+ desc.init (simple->getFunction (), true );
288+ if (desc.function ->allocatesMemoryInArena ())
289+ def.allocates_memory_in_arena = true ;
290+ }
291+ else if (!is_agg_func)
292+ {
293+ desc.init (sum_function_name.c_str (), {column.type });
294+ }
295+
296+ def.columns_to_aggregate .emplace_back (std::move (desc));
297+ }
298+
299+ continue ;
300+ }
301+
260302 if (column.name == BlockNumberColumn::name || column.name == BlockOffsetColumn::name)
261303 {
262304 def.column_numbers_not_to_aggregate .push_back (i);
@@ -281,7 +323,15 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
281323 bool is_agg_func = WhichDataType (column.type ).isAggregateFunction ();
282324
283325 // / There are special const columns for example after prewhere sections.
284- if (!aggregate_all_columns && ((!column.type ->isSummable () && !is_agg_func && !simple) || isColumnConst (*column.column )))
326+ if (!aggregate_all_columns)
327+ {
328+ if ((!column.type ->isSummable () && !is_agg_func && !simple) || isColumnConst (*column.column ))
329+ {
330+ def.column_numbers_not_to_aggregate .push_back (i);
331+ continue ;
332+ }
333+ }
334+ else if (column.type ->getTypeId () == TypeIndex::Tuple)
285335 {
286336 def.column_numbers_not_to_aggregate .push_back (i);
287337 continue ;
@@ -452,10 +502,17 @@ static void postprocessChunk(
452502
453503 if (!desc.is_agg_func_type && !desc.is_simple_agg_func_type && isTuple (desc.function ->getResultType ()))
454504 {
455- // / Unpack tuple into block.
456- size_t tuple_size = desc.column_numbers .size ();
457- for (size_t i = 0 ; i < tuple_size; ++i)
458- res_columns[desc.column_numbers [i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr (i);
505+ if (desc.aggregate_all_columns )
506+ {
507+ res_columns[desc.column_numbers [0 ]] = column;
508+ }
509+ else
510+ {
511+ // / Unpack tuple into block.
512+ size_t tuple_size = desc.column_numbers .size ();
513+ for (size_t i = 0 ; i < tuple_size; ++i)
514+ res_columns[desc.column_numbers [i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr (i);
515+ }
459516 }
460517 else if (desc.nested_type )
461518 {
@@ -535,12 +592,24 @@ void SummingSortedAlgorithm::SummingMergedData::initialize(const DB::Block & hea
535592 // Wrap aggregated columns in a tuple to match function signature
536593 if (!desc.is_agg_func_type && !desc.is_simple_agg_func_type && isTuple (desc.function ->getResultType ()))
537594 {
538- size_t tuple_size = desc.column_numbers .size ();
539- MutableColumns tuple_columns (tuple_size);
540- for (size_t i = 0 ; i < tuple_size; ++i)
541- tuple_columns[i] = std::move (columns[desc.column_numbers [i]]);
595+ if (desc.aggregate_all_columns )
596+ {
597+ auto column = desc.real_type ->createColumn ();
598+ size_t tuple_size = static_cast <const ColumnTuple &>(*column).tupleSize ();
599+ MutableColumns tuple_columns (tuple_size);
600+ for (size_t i = 0 ; i < tuple_size; ++i)
601+ tuple_columns[i] = static_cast <const ColumnTuple &>(*column).getColumnPtr (i)->cloneEmpty ();
602+ new_columns.emplace_back (ColumnTuple::create (std::move (tuple_columns)));
603+ }
604+ else
605+ {
606+ size_t tuple_size = desc.column_numbers .size ();
607+ MutableColumns tuple_columns (tuple_size);
608+ for (size_t i = 0 ; i < tuple_size; ++i)
609+ tuple_columns[i] = std::move (columns[desc.column_numbers [i]]);
542610
543- new_columns.emplace_back (ColumnTuple::create (std::move (tuple_columns)));
611+ new_columns.emplace_back (ColumnTuple::create (std::move (tuple_columns)));
612+ }
544613 }
545614 else
546615 {
@@ -734,7 +803,6 @@ Chunk SummingSortedAlgorithm::SummingMergedData::pull()
734803{
735804 auto chunk = MergedData::pull ();
736805 postprocessChunk (chunk, def.column_names .size (), def);
737-
738806 initAggregateDescription ();
739807
740808 return chunk;
0 commit comments