Skip to content

Commit 301e81f

Browse files
authored
Merge pull request ClickHouse#89421 from ClickHouse/backport/25.8/89177
Backport ClickHouse#89177 to 25.8: Calculate all subquery sets inplace before Iceberg partition pruning
2 parents 67049ee + e46dda8 commit 301e81f

File tree

5 files changed

+99
-5
lines changed

5 files changed

+99
-5
lines changed

src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Formats/FormatFactory.h>
1616
#include <IO/ReadBufferFromString.h>
1717
#include <Interpreters/Context.h>
18+
#include <Storages/VirtualColumnUtils.h>
1819

1920

2021
namespace DB
@@ -61,6 +62,14 @@ QueryPlanStepPtr ReadFromObjectStorageStep::clone() const
6162
void ReadFromObjectStorageStep::applyFilters(ActionDAGNodes added_filter_nodes)
6263
{
6364
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
65+
// It is important to build the inplace sets for the filter here, before reading data from object storage.
66+
// If we delay building these sets until later in the pipeline, the filter can be applied after the data
67+
// has already been read, potentially in parallel across many streams. This can significantly reduce the
68+
// effectiveness of an Iceberg partition pruning, as unnecessary data may be read. Additionally, building ordered sets
69+
// at this stage enables the KeyCondition class to apply more efficient optimizations than for unordered sets.
70+
if (!filter_actions_dag)
71+
return;
72+
VirtualColumnUtils::buildOrderedSetsForDAG(*filter_actions_dag, getContext());
6473
}
6574

6675
void ReadFromObjectStorageStep::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value)

src/Storages/VirtualColumnUtils.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ namespace DB
6464
namespace VirtualColumnUtils
6565
{
6666

67-
void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
67+
void buildSetsForDagImpl(const ActionsDAG & dag, const ContextPtr & context, bool ordered)
6868
{
6969
for (const auto & node : dag.getNodes())
7070
{
@@ -80,13 +80,28 @@ void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
8080
if (!future_set->get())
8181
{
8282
if (auto * set_from_subquery = typeid_cast<FutureSetFromSubquery *>(future_set.get()))
83-
set_from_subquery->buildSetInplace(context);
83+
{
84+
if (ordered)
85+
set_from_subquery->buildOrderedSetInplace(context);
86+
else
87+
set_from_subquery->buildSetInplace(context);
88+
}
8489
}
8590
}
8691
}
8792
}
8893
}
8994

95+
void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
96+
{
97+
buildSetsForDagImpl(dag, context, /* ordered = */ false);
98+
}
99+
100+
void buildOrderedSetsForDAG(const ActionsDAG & dag, const ContextPtr & context)
101+
{
102+
buildSetsForDagImpl(dag, context, /* ordered = */ true);
103+
}
104+
90105
ExpressionActionsPtr buildFilterExpression(ActionsDAG dag, ContextPtr context)
91106
{
92107
buildSetsForDAG(dag, context);

src/Storages/VirtualColumnUtils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
4040
/// Builds sets used by ActionsDAG inplace.
4141
void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context);
4242

43+
/// Builds ordered sets used by ActionsDAG inplace.
44+
void buildOrderedSetsForDAG(const ActionsDAG & dag, const ContextPtr & context);
45+
4346
/// Checks if all functions used in DAG are deterministic.
4447
bool isDeterministic(const ActionsDAG::Node * node);
4548

tests/integration/test_storage_iceberg/test.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3199,3 +3199,73 @@ def execute_spark_query(query: str):
31993199
except:
32003200
print("Dictionary: {}, Allowed Content Types: {}".format(diction, allowed_content_types))
32013201
raise
3202+
3203+
3204+
@pytest.mark.parametrize(
3205+
"storage_type",
3206+
["s3", "azure", "local"],
3207+
)
3208+
def test_partition_pruning_with_subquery_set(started_cluster, storage_type):
3209+
instance = started_cluster.instances["node1"]
3210+
spark = started_cluster.spark_session
3211+
TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str()
3212+
IN_MEMORY_TABLE = "in_memory_table_" + get_uuid_str()
3213+
3214+
def execute_spark_query(query: str):
3215+
return execute_spark_query_general(
3216+
spark,
3217+
started_cluster,
3218+
storage_type,
3219+
TABLE_NAME,
3220+
query,
3221+
)
3222+
3223+
execute_spark_query(
3224+
f"""
3225+
CREATE TABLE {TABLE_NAME} (
3226+
id INT,
3227+
data STRING
3228+
)
3229+
USING iceberg
3230+
PARTITIONED BY (identity(id))
3231+
OPTIONS('format-version'='2')
3232+
"""
3233+
)
3234+
3235+
execute_spark_query(
3236+
f"""
3237+
INSERT INTO {TABLE_NAME} VALUES
3238+
(1, 'a'),
3239+
(2, 'b'),
3240+
(3, 'c'),
3241+
(4, 'd'),
3242+
(5, 'e');
3243+
"""
3244+
)
3245+
3246+
3247+
creation_expression = get_creation_expression(
3248+
storage_type, TABLE_NAME, started_cluster, table_function=True
3249+
)
3250+
3251+
instance.query(f"CREATE TABLE {IN_MEMORY_TABLE} (id INT) ENGINE = Memory")
3252+
instance.query(f"INSERT INTO {IN_MEMORY_TABLE} VALUES (2), (4)")
3253+
3254+
3255+
def check_validity_and_get_prunned_files(select_expression):
3256+
settings1 = {
3257+
"use_iceberg_partition_pruning": 0
3258+
}
3259+
settings2 = {
3260+
"use_iceberg_partition_pruning": 1
3261+
}
3262+
return check_validity_and_get_prunned_files_general(
3263+
instance, TABLE_NAME, settings1, settings2, 'IcebergPartitionPrunedFiles', select_expression
3264+
)
3265+
3266+
assert (
3267+
check_validity_and_get_prunned_files(
3268+
f"SELECT * FROM {creation_expression} WHERE id in (SELECT id FROM {IN_MEMORY_TABLE}) ORDER BY ALL"
3269+
)
3270+
== 3
3271+
)

tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ CreatingSets (Create sets before main query execution)
66
Expression ((Project names + Projection))
77
Filter ((WHERE + Change column names to column identifiers))
88
ReadFromObjectStorage
9-
CreatingSet (Create set for subquery)
10-
Expression ((Project names + (Projection + Change column names to column identifiers)))
11-
ReadFromObjectStorage
129
Expression ((Project names + Projection))
1310
Aggregating
1411
Expression ((Before GROUP BY + (Change column names to column identifiers + (Project names + (Projection + Change column names to column identifiers)))))

0 commit comments

Comments
 (0)