Skip to content

Commit f3c9150

Browse files
shankar-iyermkmkme
authored andcommitted
Merge pull request ClickHouse#88090 from shankar-iyer/fix_is_deleted_with_filter
Optimize ReplacingMergeTree is_deleted FINAL queries by adding a filter expression transform
1 parent 7e66622 commit f3c9150

File tree

3 files changed

+112
-5
lines changed

3 files changed

+112
-5
lines changed

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,18 @@ static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionFor
12611261
return {std::make_shared<ExpressionActions>(std::move(actions)), sign_filter->getColumnName()};
12621262
}
12631263

1264+
static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionForIsDeleted(const String & is_deleted_column_name, const Block & header, const ContextPtr & context)
1265+
{
1266+
ASTPtr is_deleted_identifier = std::make_shared<ASTIdentifier>(is_deleted_column_name);
1267+
ASTPtr is_deleted_filter = makeASTFunction("equals", is_deleted_identifier, std::make_shared<ASTLiteral>(Field(static_cast<Int8>(0))));
1268+
1269+
const auto & is_deleted_column = header.getByName(is_deleted_column_name);
1270+
1271+
auto syntax_result = TreeRewriter(context).analyze(is_deleted_filter, {{is_deleted_column.name, is_deleted_column.type}});
1272+
auto actions = ExpressionAnalyzer(is_deleted_filter, syntax_result, context).getActionsDAG(false);
1273+
return {std::make_shared<ExpressionActions>(std::move(actions)), is_deleted_filter->getColumnName()};
1274+
}
1275+
12641276
bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
12651277
{
12661278
const auto & settings = context->getSettingsRef();
@@ -1354,7 +1366,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
13541366
bool no_merging_final = do_not_merge_across_partitions_select_final &&
13551367
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
13561368
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
1357-
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
1369+
!reader_settings.read_in_order;
13581370

13591371
if (no_merging_final)
13601372
{
@@ -1386,11 +1398,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
13861398
info.use_uncompressed_cache);
13871399
};
13881400

1389-
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
1390-
/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
1401+
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column.
1402+
/// Non-intersecting ranges will just go through extra filter added by createExpressionForIsDeleted() to filter
1403+
/// deleted rows.
13911404
bool split_parts_ranges_into_intersecting_and_non_intersecting_final
1392-
= settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final]
1393-
&& data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
1405+
= settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final] &&
1406+
!reader_settings.read_in_order;
13941407

13951408
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
13961409
storage_snapshot->metadata->getPrimaryKey(),
@@ -1477,6 +1490,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
14771490
return std::make_shared<FilterTransform>(header, expression, filter_name, true);
14781491
});
14791492
}
1493+
else if (!data.merging_params.is_deleted_column.empty())
1494+
{
1495+
auto columns_with_is_deleted = origin_column_names;
1496+
if (std::ranges::find(columns_with_is_deleted, data.merging_params.is_deleted_column) == columns_with_is_deleted.end())
1497+
columns_with_is_deleted.push_back(data.merging_params.is_deleted_column);
1498+
1499+
pipe = spreadMarkRangesAmongStreams(
1500+
std::move(non_intersecting_parts_by_primary_key), index_build_context, num_streams, columns_with_is_deleted);
1501+
auto [expression, filter_name] = createExpressionForIsDeleted(data.merging_params.is_deleted_column, pipe.getHeader(), context);
1502+
1503+
pipe.addSimpleTransform([&](const SharedHeader & header)
1504+
{
1505+
return std::make_shared<FilterTransform>(header, expression, filter_name, true);
1506+
});
1507+
}
14801508
else
14811509
{
14821510
pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
10000
2+
9950
3+
0
4+
10000
5+
17700
6+
17700
7+
17700
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
-- Test for FINAL query on ReplacingMergeTree + is_deleted makes use of optimizations.
2+
3+
DROP TABLE IF EXISTS tab;
4+
5+
CREATE TABLE tab (
6+
pkey String,
7+
id Int32,
8+
v Int32,
9+
version UInt64,
10+
is_deleted UInt8
11+
) Engine = ReplacingMergeTree(version,is_deleted)
12+
PARTITION BY pkey ORDER BY id
13+
SETTINGS index_granularity=512;
14+
15+
-- insert 10000 rows in partition 'A' and delete half of them and merge the 2 parts
16+
INSERT INTO tab SELECT 'A', number, number, 1, 0 FROM numbers(10000);
17+
INSERT INTO tab SELECT 'A', number, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000);
18+
19+
OPTIMIZE TABLE tab SETTINGS mutations_sync = 2;
20+
21+
SYSTEM STOP MERGES tab;
22+
23+
-- insert 10000 rows in partition 'B' and delete half of them, but keep 2 parts
24+
INSERT INTO tab SELECT 'B', number+1000000, number, 1, 0 FROM numbers(10000);
25+
INSERT INTO tab SELECT 'B', number+1000000, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000);
26+
27+
SET do_not_merge_across_partitions_select_final=1;
28+
29+
-- verify : 10000 rows expected
30+
SELECT count()
31+
FROM tab FINAL;
32+
33+
-- add a filter : 9950 rows expected
34+
SELECT count()
35+
FROM tab FINAL
36+
WHERE id >= 100;
37+
38+
-- only even id's are left - 0 rows expected
39+
SELECT count()
40+
FROM tab FINAL
41+
WHERE (id % 2) = 1;
42+
43+
-- 10000 rows expected
44+
SELECT count()
45+
FROM tab FINAL
46+
WHERE (id % 2) = 0;
47+
48+
-- create some more partitions
49+
INSERT INTO tab SELECT 'C', number+2000000, number, 1, 0 FROM numbers(100);
50+
51+
-- insert and delete some rows to get intersecting/non-intersecting ranges in same partition
52+
INSERT INTO tab SELECT 'D', number+3000000, number, 1, 0 FROM numbers(10000);
53+
INSERT INTO tab SELECT 'D', number+3000000, number + 1, 1, IF(number % 2 = 0, 0, 1) FROM numbers(5000);
54+
55+
INSERT INTO tab SELECT 'E', number+4000000, number, 1, 0 FROM numbers(100);
56+
57+
-- Total 10000 (From A & B) + 100 (From C) + 7500 (From D) + 100 (From E) = 17700 rows
58+
SELECT count()
59+
FROM tab FINAL
60+
SETTINGS do_not_merge_across_partitions_select_final=0,split_intersecting_parts_ranges_into_layers_final=0;
61+
62+
SELECT count()
63+
FROM tab FINAL
64+
SETTINGS do_not_merge_across_partitions_select_final=1,split_intersecting_parts_ranges_into_layers_final=1;
65+
66+
SYSTEM START MERGES tab;
67+
OPTIMIZE TABLE tab FINAL SETTINGS mutations_sync = 2;
68+
69+
SELECT count()
70+
FROM tab FINAL;
71+
72+
DROP TABLE IF EXISTS tab;

0 commit comments

Comments
 (0)