Skip to content

Commit c725461

Browse files
committed
Fix incorrect result when reading reverse-ordered keys in partitioned tables.
1 parent 4a54d21 commit c725461

File tree

3 files changed

+79
-14
lines changed

3 files changed

+79
-14
lines changed

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
237237
static SortDescription getSortDescriptionForOutputHeader(
238238
const Header & output_header,
239239
const Names & sorting_key_columns,
240+
const std::vector<bool> & reverse_flags,
240241
const int sort_direction,
241242
InputOrderInfoPtr input_order_info,
242243
PrewhereInfoPtr prewhere_info,
@@ -273,15 +274,21 @@ static SortDescription getSortDescriptionForOutputHeader(
273274

274275
SortDescription sort_description;
275276
const Block & header = output_header;
276-
for (const auto & sorting_key : sorting_key_columns)
277+
size_t sort_columns_size = sorting_key_columns.size();
278+
sort_description.reserve(sort_columns_size);
279+
for (size_t i = 0; i < sort_columns_size; ++i)
277280
{
281+
const auto & sorting_key = sorting_key_columns[i];
278282
const auto it = std::find_if(
279283
original_header.begin(), original_header.end(), [&sorting_key](const auto & column) { return column.name == sorting_key; });
280284
if (it == original_header.end())
281285
break;
282286

283287
const size_t column_pos = std::distance(original_header.begin(), it);
284-
sort_description.emplace_back((header.begin() + column_pos)->name, sort_direction);
288+
if (!reverse_flags.empty() && reverse_flags[i])
289+
sort_description.emplace_back((header.begin() + column_pos)->name, sort_direction * -1);
290+
else
291+
sort_description.emplace_back((header.begin() + column_pos)->name, sort_direction);
285292
}
286293

287294
if (input_order_info && !enable_vertical_final)
@@ -659,8 +666,8 @@ Pipe ReadFromMergeTree::readInOrder(
659666
else
660667
algorithm = std::make_unique<MergeTreeInOrderSelectAlgorithm>(i);
661668

662-
auto processor
663-
= std::make_unique<MergeTreeSelectProcessor>(pool, std::move(algorithm), prewhere_info, lazily_read_info, actions_settings, reader_settings);
669+
auto processor = std::make_unique<MergeTreeSelectProcessor>(
670+
pool, std::move(algorithm), prewhere_info, lazily_read_info, actions_settings, reader_settings);
664671

665672
processor->addPartLevelToChunk(isQueryWithFinal());
666673

@@ -855,12 +862,19 @@ Pipe ReadFromMergeTree::readByLayers(const RangesInDataParts & parts_with_ranges
855862
}
856863
auto sorting_expr = storage_snapshot->metadata->getSortingKey().expression;
857864
const auto & sorting_columns = storage_snapshot->metadata->getSortingKey().column_names;
865+
std::vector<bool> reverse_flags = storage_snapshot->metadata->getSortingKeyReverseFlags();
858866

859867
sort_description.compile_sort_description = settings[Setting::compile_sort_description];
860868
sort_description.min_count_to_compile_sort_description = settings[Setting::min_count_to_compile_sort_description];
861869

862-
for (size_t j = 0; j < input_order_info->used_prefix_of_sorting_key_size; ++j)
863-
sort_description.emplace_back(sorting_columns[j], input_order_info->direction);
870+
sort_description.reserve(input_order_info->used_prefix_of_sorting_key_size);
871+
for (size_t i = 0; i < input_order_info->used_prefix_of_sorting_key_size; ++i)
872+
{
873+
if (!reverse_flags.empty() && reverse_flags[i])
874+
sort_description.emplace_back(sorting_columns[i], input_order_info->direction * -1);
875+
else
876+
sort_description.emplace_back(sorting_columns[i], input_order_info->direction);
877+
}
864878

865879
reading_step_getter = [this, &in_order_column_names_to_read, &info, sorting_expr, &sort_description](auto parts)
866880
{
@@ -889,8 +903,17 @@ Pipe ReadFromMergeTree::readByLayers(const RangesInDataParts & parts_with_ranges
889903
if (pipe.numOutputPorts() != 1)
890904
{
891905
auto transform = std::make_shared<MergingSortedTransform>(
892-
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch,
893-
0, false, nullptr, false, /*apply_virtual_row_conversions*/ false);
906+
pipe.getHeader(),
907+
pipe.numOutputPorts(),
908+
sort_description,
909+
block_size.max_block_size_rows,
910+
/*max_block_size_bytes=*/0,
911+
SortingQueueStrategy::Batch,
912+
0,
913+
false,
914+
nullptr,
915+
false,
916+
/*apply_virtual_row_conversions*/ false);
894917

895918
pipe.addTransform(std::move(transform));
896919
}
@@ -1262,13 +1285,20 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
12621285
auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, storage_snapshot->metadata->getColumns().get(GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns()));
12631286
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false);
12641287
const auto & sorting_columns = storage_snapshot->metadata->getSortingKey().column_names;
1288+
std::vector<bool> reverse_flags = storage_snapshot->metadata->getSortingKeyReverseFlags();
12651289

12661290
SortDescription sort_description;
12671291
sort_description.compile_sort_description = settings[Setting::compile_sort_description];
12681292
sort_description.min_count_to_compile_sort_description = settings[Setting::min_count_to_compile_sort_description];
12691293

1270-
for (size_t j = 0; j < prefix_size; ++j)
1271-
sort_description.emplace_back(sorting_columns[j], input_order_info->direction);
1294+
sort_description.reserve(prefix_size);
1295+
for (size_t i = 0; i < prefix_size; ++i)
1296+
{
1297+
if (!reverse_flags.empty() && reverse_flags[i])
1298+
sort_description.emplace_back(sorting_columns[i], input_order_info->direction * -1);
1299+
else
1300+
sort_description.emplace_back(sorting_columns[i], input_order_info->direction);
1301+
}
12721302

12731303
auto sorting_key_expr = std::make_shared<ExpressionActions>(std::move(sorting_key_prefix_expr));
12741304

@@ -1280,8 +1310,17 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
12801310
if (pipe.numOutputPorts() > 1)
12811311
{
12821312
auto transform = std::make_shared<MergingSortedTransform>(
1283-
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch,
1284-
0, false, nullptr, false, /*apply_virtual_row_conversions*/ false);
1313+
pipe.getHeader(),
1314+
pipe.numOutputPorts(),
1315+
sort_description,
1316+
block_size.max_block_size_rows,
1317+
/*max_block_size_bytes=*/0,
1318+
SortingQueueStrategy::Batch,
1319+
0,
1320+
false,
1321+
nullptr,
1322+
false,
1323+
/*apply_virtual_row_conversions*/ false);
12851324

12861325
pipe.addTransform(std::move(transform));
12871326
}
@@ -1554,9 +1593,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
15541593
for (size_t i = 0; i < sort_columns_size; ++i)
15551594
{
15561595
if (!reverse_flags.empty() && reverse_flags[i])
1557-
sort_description.emplace_back(sort_columns[i], -1, 1);
1596+
sort_description.emplace_back(sort_columns[i], -1);
15581597
else
1559-
sort_description.emplace_back(sort_columns[i], 1, 1);
1598+
sort_description.emplace_back(sort_columns[i], 1);
15601599
}
15611600

15621601
for (auto & pipe : pipes)
@@ -2032,6 +2071,7 @@ void ReadFromMergeTree::updateSortDescription()
20322071
result_sort_description = getSortDescriptionForOutputHeader(
20332072
*output_header,
20342073
storage_snapshot->metadata->getSortingKeyColumns(),
2074+
storage_snapshot->metadata->getSortingKeyReverseFlags(),
20352075
getSortDirection(),
20362076
query_info.input_order_info,
20372077
prewhere_info,
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- { echo ON }
2+
3+
drop table if exists t;
4+
create table t(A Int64) partition by (A % 64) order by A desc settings allow_experimental_reverse_key=1
5+
as select intDiv(number,11111) from numbers(7e5) union all select number from numbers(7e5);
6+
set max_threads=1;
7+
select cityHash64(groupArray(A)) from (select A from t order by A desc limit 10);
8+
17781438751011572588
9+
select cityHash64(groupArray(A)) from (select A from t order by identity(A) desc limit 10);
10+
17781438751011572588
11+
drop table t;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- { echo ON }
2+
3+
drop table if exists t;
4+
5+
create table t(A Int64) partition by (A % 64) order by A desc settings allow_experimental_reverse_key=1
6+
as select intDiv(number,11111) from numbers(7e5) union all select number from numbers(7e5);
7+
8+
set max_threads=1;
9+
10+
select cityHash64(groupArray(A)) from (select A from t order by A desc limit 10);
11+
12+
select cityHash64(groupArray(A)) from (select A from t order by identity(A) desc limit 10);
13+
14+
drop table t;

0 commit comments

Comments
 (0)