Skip to content

Commit aee08f1

Browse files
Backport ClickHouse#89177 to 25.8: Calculate all subquery sets inplace before Iceberg partition pruning
1 parent 09c51f4 commit aee08f1

File tree

5 files changed

+108
-5
lines changed

5 files changed

+108
-5
lines changed

src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Formats/FormatFactory.h>
1616
#include <IO/ReadBufferFromString.h>
1717
#include <Interpreters/Context.h>
18+
#include <Storages/VirtualColumnUtils.h>
1819

1920

2021
namespace DB
@@ -61,6 +62,14 @@ QueryPlanStepPtr ReadFromObjectStorageStep::clone() const
6162
void ReadFromObjectStorageStep::applyFilters(ActionDAGNodes added_filter_nodes)
6263
{
6364
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
65+
// It is important to build the inplace sets for the filter here, before reading data from object storage.
66+
// If we delay building these sets until later in the pipeline, the filter can be applied after the data
67+
// has already been read, potentially in parallel across many streams. This can significantly reduce the
68+
// effectiveness of an Iceberg partition pruning, as unnecessary data may be read. Additionally, building ordered sets
69+
// at this stage enables the KeyCondition class to apply more efficient optimizations than for unordered sets.
70+
if (!filter_actions_dag)
71+
return;
72+
VirtualColumnUtils::buildOrderedSetsForDAG(*filter_actions_dag, getContext());
6473
}
6574

6675
void ReadFromObjectStorageStep::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value)

src/Storages/VirtualColumnUtils.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ namespace DB
6464
namespace VirtualColumnUtils
6565
{
6666

67-
void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
67+
void buildSetsForDagImpl(const ActionsDAG & dag, const ContextPtr & context, bool ordered)
6868
{
6969
for (const auto & node : dag.getNodes())
7070
{
@@ -80,13 +80,28 @@ void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
8080
if (!future_set->get())
8181
{
8282
if (auto * set_from_subquery = typeid_cast<FutureSetFromSubquery *>(future_set.get()))
83-
set_from_subquery->buildSetInplace(context);
83+
{
84+
if (ordered)
85+
set_from_subquery->buildOrderedSetInplace(context);
86+
else
87+
set_from_subquery->buildSetInplace(context);
88+
}
8489
}
8590
}
8691
}
8792
}
8893
}
8994

95+
void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
96+
{
97+
buildSetsForDagImpl(dag, context, /* ordered = */ false);
98+
}
99+
100+
void buildOrderedSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
101+
{
102+
buildSetsForDagImpl(dag, context, /* ordered = */ true);
103+
}
104+
90105
ExpressionActionsPtr buildFilterExpression(ActionsDAG dag, ContextPtr context)
91106
{
92107
buildSetsForDAG(dag, context);

src/Storages/VirtualColumnUtils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
4040
/// Builds sets used by ActionsDAG inplace.
4141
void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context);
4242

43+
/// Builds ordered sets used by ActionsDAG inplace.
44+
void buildOrderedSetsForDAG(const ActionsDAG & dag, const ContextPtr & context);
45+
4346
/// Checks if all functions used in DAG are deterministic.
4447
bool isDeterministic(const ActionsDAG::Node * node);
4548

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import pytest
2+
3+
from helpers.iceberg_utils import (
4+
check_validity_and_get_prunned_files_general,
5+
execute_spark_query_general,
6+
get_creation_expression,
7+
get_uuid_str
8+
)
9+
10+
@pytest.mark.parametrize(
11+
"storage_type",
12+
["s3", "azure", "local"],
13+
)
14+
def test_partition_pruning_with_subquery_set(started_cluster_iceberg_with_spark, storage_type):
15+
instance = started_cluster_iceberg_with_spark.instances["node1"]
16+
spark = started_cluster_iceberg_with_spark.spark_session
17+
TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str()
18+
IN_MEMORY_TABLE = "in_memory_table_" + get_uuid_str()
19+
20+
def execute_spark_query(query: str):
21+
return execute_spark_query_general(
22+
spark,
23+
started_cluster_iceberg_with_spark,
24+
storage_type,
25+
TABLE_NAME,
26+
query,
27+
)
28+
29+
execute_spark_query(
30+
f"""
31+
CREATE TABLE {TABLE_NAME} (
32+
id INT,
33+
data STRING
34+
)
35+
USING iceberg
36+
PARTITIONED BY (identity(id))
37+
OPTIONS('format-version'='2')
38+
"""
39+
)
40+
41+
execute_spark_query(
42+
f"""
43+
INSERT INTO {TABLE_NAME} VALUES
44+
(1, 'a'),
45+
(2, 'b'),
46+
(3, 'c'),
47+
(4, 'd'),
48+
(5, 'e');
49+
"""
50+
)
51+
52+
53+
creation_expression = get_creation_expression(
54+
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True
55+
)
56+
57+
instance.query(f"CREATE TABLE {IN_MEMORY_TABLE} (id INT) ENGINE = Memory")
58+
instance.query(f"INSERT INTO {IN_MEMORY_TABLE} VALUES (2), (4)")
59+
60+
61+
def check_validity_and_get_prunned_files(select_expression):
62+
settings1 = {
63+
"use_iceberg_partition_pruning": 0
64+
}
65+
settings2 = {
66+
"use_iceberg_partition_pruning": 1
67+
}
68+
return check_validity_and_get_prunned_files_general(
69+
instance, TABLE_NAME, settings1, settings2, 'IcebergPartitionPrunedFiles', select_expression
70+
)
71+
72+
assert (
73+
check_validity_and_get_prunned_files(
74+
f"SELECT * FROM {creation_expression} WHERE id in (SELECT id FROM {IN_MEMORY_TABLE}) ORDER BY ALL"
75+
)
76+
== 3
77+
)
78+
79+

tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ CreatingSets (Create sets before main query execution)
66
Expression ((Project names + Projection))
77
Filter ((WHERE + Change column names to column identifiers))
88
ReadFromObjectStorage
9-
CreatingSet (Create set for subquery)
10-
Expression ((Project names + (Projection + Change column names to column identifiers)))
11-
ReadFromObjectStorage
129
Expression ((Project names + Projection))
1310
Aggregating
1411
Expression ((Before GROUP BY + (Change column names to column identifiers + (Project names + (Projection + Change column names to column identifiers)))))

0 commit comments

Comments
 (0)