Skip to content

Commit 65df4a1

Browse files
Merge pull request ClickHouse#88952 from ClickHouse/backport/25.8/88828
Backport ClickHouse#88828 to 25.8: Revert "Revert "Fix coalescing merge tree for tuple types""
2 parents b8b9ce2 + ee93c6b commit 65df4a1

File tree

3 files changed

+126
-12
lines changed

3 files changed

+126
-12
lines changed

src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1+
#include <memory>
12
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
23

34
#include <AggregateFunctions/AggregateFunctionFactory.h>
45
#include <Columns/ColumnAggregateFunction.h>
56
#include <Columns/ColumnTuple.h>
7+
#include <Common/Exception.h>
68
#include <Common/AlignedBuffer.h>
79
#include <Common/Arena.h>
810
#include <Common/FieldVisitorSum.h>
911
#include <Common/StringUtils.h>
12+
#include <DataTypes/DataTypeTuple.h>
1013
#include <DataTypes/DataTypeArray.h>
1114
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
1215
#include <DataTypes/NestedUtils.h>
@@ -51,7 +54,7 @@ struct SummingSortedAlgorithm::AggregateDescription
5154
bool is_agg_func_type = false;
5255
bool is_simple_agg_func_type = false;
5356
bool remove_default_values;
54-
bool aggregate_all_columns;
57+
bool aggregate_all_columns = false;
5558

5659
String sum_function_map_name;
5760

@@ -257,6 +260,45 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
257260
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
258261

259262
const auto * simple = dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName());
263+
bool is_non_empty_tuple = typeid_cast<const DataTypeTuple *>(column.type.get()) && !typeid_cast<const DataTypeTuple *>(column.type.get())->getElements().empty();
264+
if (aggregate_all_columns && (is_non_empty_tuple || typeid_cast<const DataTypeArray *>(column.type.get())) && !simple)
265+
{
266+
const auto map_name = Nested::extractTableName(column.name);
267+
/// if nested table name ends with `Map` it is a possible candidate for special handling
268+
if (map_name == column.name || !endsWith(map_name, "Map"))
269+
{
270+
bool is_agg_func = WhichDataType(column.type).isAggregateFunction();
271+
272+
SummingSortedAlgorithm::AggregateDescription desc;
273+
desc.remove_default_values = remove_default_values;
274+
desc.aggregate_all_columns = aggregate_all_columns;
275+
desc.sum_function_map_name = sum_function_map_name;
276+
desc.is_agg_func_type = is_agg_func;
277+
desc.column_numbers = {i};
278+
279+
desc.real_type = column.type;
280+
desc.nested_type = recursiveRemoveLowCardinality(desc.real_type);
281+
if (desc.real_type.get() == desc.nested_type.get())
282+
desc.nested_type = nullptr;
283+
284+
if (simple)
285+
{
286+
// simple aggregate function
287+
desc.init(simple->getFunction(), true);
288+
if (desc.function->allocatesMemoryInArena())
289+
def.allocates_memory_in_arena = true;
290+
}
291+
else if (!is_agg_func)
292+
{
293+
desc.init(sum_function_name.c_str(), {column.type});
294+
}
295+
296+
def.columns_to_aggregate.emplace_back(std::move(desc));
297+
}
298+
299+
continue;
300+
}
301+
260302
if (column.name == BlockNumberColumn::name || column.name == BlockOffsetColumn::name)
261303
{
262304
def.column_numbers_not_to_aggregate.push_back(i);
@@ -281,7 +323,15 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
281323
bool is_agg_func = WhichDataType(column.type).isAggregateFunction();
282324

283325
/// There are special const columns for example after prewhere sections.
284-
if (!aggregate_all_columns && ((!column.type->isSummable() && !is_agg_func && !simple) || isColumnConst(*column.column)))
326+
if (!aggregate_all_columns)
327+
{
328+
if ((!column.type->isSummable() && !is_agg_func && !simple) || isColumnConst(*column.column))
329+
{
330+
def.column_numbers_not_to_aggregate.push_back(i);
331+
continue;
332+
}
333+
}
334+
else if (column.type->getTypeId() == TypeIndex::Tuple)
285335
{
286336
def.column_numbers_not_to_aggregate.push_back(i);
287337
continue;
@@ -452,10 +502,17 @@ static void postprocessChunk(
452502

453503
if (!desc.is_agg_func_type && !desc.is_simple_agg_func_type && isTuple(desc.function->getResultType()))
454504
{
455-
/// Unpack tuple into block.
456-
size_t tuple_size = desc.column_numbers.size();
457-
for (size_t i = 0; i < tuple_size; ++i)
458-
res_columns[desc.column_numbers[i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr(i);
505+
if (desc.aggregate_all_columns)
506+
{
507+
res_columns[desc.column_numbers[0]] = column;
508+
}
509+
else
510+
{
511+
/// Unpack tuple into block.
512+
size_t tuple_size = desc.column_numbers.size();
513+
for (size_t i = 0; i < tuple_size; ++i)
514+
res_columns[desc.column_numbers[i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr(i);
515+
}
459516
}
460517
else if (desc.nested_type)
461518
{
@@ -535,12 +592,24 @@ void SummingSortedAlgorithm::SummingMergedData::initialize(const DB::Block & hea
535592
// Wrap aggregated columns in a tuple to match function signature
536593
if (!desc.is_agg_func_type && !desc.is_simple_agg_func_type && isTuple(desc.function->getResultType()))
537594
{
538-
size_t tuple_size = desc.column_numbers.size();
539-
MutableColumns tuple_columns(tuple_size);
540-
for (size_t i = 0; i < tuple_size; ++i)
541-
tuple_columns[i] = std::move(columns[desc.column_numbers[i]]);
595+
if (desc.aggregate_all_columns)
596+
{
597+
auto column = desc.real_type->createColumn();
598+
size_t tuple_size = static_cast<const ColumnTuple &>(*column).tupleSize();
599+
MutableColumns tuple_columns(tuple_size);
600+
for (size_t i = 0; i < tuple_size; ++i)
601+
tuple_columns[i] = static_cast<const ColumnTuple &>(*column).getColumnPtr(i)->cloneEmpty();
602+
new_columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
603+
}
604+
else
605+
{
606+
size_t tuple_size = desc.column_numbers.size();
607+
MutableColumns tuple_columns(tuple_size);
608+
for (size_t i = 0; i < tuple_size; ++i)
609+
tuple_columns[i] = std::move(columns[desc.column_numbers[i]]);
542610

543-
new_columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
611+
new_columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
612+
}
544613
}
545614
else
546615
{
@@ -734,7 +803,6 @@ Chunk SummingSortedAlgorithm::SummingMergedData::pull()
734803
{
735804
auto chunk = MergedData::pull();
736805
postprocessChunk(chunk, def.column_names.size(), def);
737-
738806
initAggregateDescription();
739807

740808
return chunk;
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
(1,2)
2+
(3,4)
3+
[1,2]
4+
[3,4]
5+
()
6+
()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
DROP TABLE IF EXISTS t0;
2+
3+
CREATE TABLE t0 (c0 Tuple(a Int32, b Nullable(Int32)), c1 Int32) ENGINE = SummingMergeTree() ORDER BY c1;
4+
INSERT INTO t0 VALUES ((1,2), 0);
5+
INSERT INTO t0 VALUES ((3,4), 0);
6+
SELECT c0 FROM t0 FINAL;
7+
8+
DROP TABLE IF EXISTS t0;
9+
10+
CREATE TABLE t0 (c0 Tuple(a Int32, b Nullable(Int32)), c1 Int32) ENGINE = CoalescingMergeTree() ORDER BY c1;
11+
INSERT INTO t0 VALUES ((1,2), 0); -- return this one because tuple has not a nullable type so we can not aggregate by tuple columns
12+
INSERT INTO t0 VALUES ((3,4), 0);
13+
SELECT c0 FROM t0 FINAL;
14+
15+
DROP TABLE IF EXISTS t0;
16+
17+
CREATE TABLE t0 (c0 Array(Nullable(Int32)), c1 Int32) ENGINE = SummingMergeTree() ORDER BY c1;
18+
INSERT INTO t0 VALUES ([1,2], 0);
19+
INSERT INTO t0 VALUES ([3,4], 0);
20+
SELECT c0 FROM t0 FINAL;
21+
22+
23+
DROP TABLE IF EXISTS t0;
24+
25+
CREATE TABLE t0 (c0 Array(Nullable(Int32)), c1 Int32) ENGINE = CoalescingMergeTree() ORDER BY c1;
26+
INSERT INTO t0 VALUES ([1,2], 0); -- return this one because array has not a nullable type so we can not aggregate by tuple columns
27+
INSERT INTO t0 VALUES ([3,4], 0);
28+
SELECT c0 FROM t0 FINAL;
29+
30+
DROP TABLE IF EXISTS t0;
31+
32+
CREATE TABLE t0 (c0 Tuple) ENGINE = CoalescingMergeTree() ORDER BY tuple();
33+
INSERT INTO t0 (c0) VALUES (());
34+
SELECT c0 FROM t0 FINAL;
35+
36+
DROP TABLE IF EXISTS t0;
37+
38+
CREATE TABLE t0 (c0 Tuple) ENGINE = SummingMergeTree() ORDER BY tuple();
39+
INSERT INTO t0 (c0) VALUES (());
40+
SELECT c0 FROM t0 FINAL;

0 commit comments

Comments
 (0)