Skip to content

Commit 0ec9919

Browse files
novikdzvonand
authored andcommitted
Merge pull request ClickHouse#87303 from ClickHouse/row_policy_prewhere_with_optimize_2
Fix condition not being moved to PREWHERE in case there is a row policy (version 2)
1 parent e7db2db commit 0ec9919

File tree

52 files changed

+671
-474
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+671
-474
lines changed

src/Formats/FormatFactory.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,9 @@ InputFormatPtr FormatFactory::getInput(
399399
const FormatSettings format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
400400
const Settings & settings = context->getSettingsRef();
401401

402-
if (format_filter_info && format_filter_info->prewhere_info && (!creators.random_access_input_creator || !creators.prewhere_support_checker || !creators.prewhere_support_checker(format_settings)))
403-
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE passed to format that doesn't support it");
402+
if (format_filter_info && (format_filter_info->prewhere_info || format_filter_info->row_level_filter) && (!creators.random_access_input_creator || !creators.prewhere_support_checker || !creators.prewhere_support_checker(format_settings)))
403+
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} passed to format that doesn't support it",
404+
format_filter_info->prewhere_info ? "PREWHERE" : "ROW LEVEL FILTER");
404405

405406
if (!parser_shared_resources)
406407
parser_shared_resources = std::make_shared<FormatParserSharedResources>(

src/Formats/FormatFilterInfo.cpp

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,21 @@ std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>
4343
return {clickhouse_to_parquet_names, parquet_names_to_clickhouse};
4444
}
4545

46-
FormatFilterInfo::FormatFilterInfo(std::shared_ptr<const ActionsDAG> filter_actions_dag_, const ContextPtr & context_, ColumnMapperPtr column_mapper_)
47-
: filter_actions_dag(filter_actions_dag_)
48-
, context(context_)
49-
, column_mapper(column_mapper_)
50-
{
51-
}
46+
FormatFilterInfo::FormatFilterInfo(
47+
std::shared_ptr<const ActionsDAG> filter_actions_dag_,
48+
const ContextPtr & context_,
49+
ColumnMapperPtr column_mapper_,
50+
FilterDAGInfoPtr row_level_filter_,
51+
PrewhereInfoPtr prewhere_info_)
52+
: filter_actions_dag(filter_actions_dag_)
53+
, context(context_)
54+
, row_level_filter(std::move(row_level_filter_))
55+
, prewhere_info(std::move(prewhere_info_))
56+
, column_mapper(column_mapper_)
57+
{
58+
}
5259

53-
FormatFilterInfo::FormatFilterInfo()
54-
: filter_actions_dag(nullptr)
55-
, context(static_cast<const ContextPtr &>(nullptr))
56-
, column_mapper(nullptr)
57-
{
58-
}
60+
FormatFilterInfo::FormatFilterInfo() = default;
5961

6062

6163
bool FormatFilterInfo::hasFilter() const
@@ -73,7 +75,7 @@ void FormatFilterInfo::initKeyCondition(const Block & keys)
7375
if (!ctx)
7476
throw Exception(ErrorCodes::LOGICAL_ERROR, "Context has expired");
7577

76-
if (prewhere_info)
78+
if (prewhere_info || row_level_filter)
7779
{
7880
auto add_columns = [&](const ActionsDAG & dag)
7981
{
@@ -83,9 +85,11 @@ void FormatFilterInfo::initKeyCondition(const Block & keys)
8385
additional_columns.insert({col.type->createColumn(), col.type, col.name});
8486
}
8587
};
86-
if (prewhere_info->row_level_filter.has_value())
87-
add_columns(prewhere_info->row_level_filter.value());
88-
add_columns(prewhere_info->prewhere_actions);
88+
89+
if (row_level_filter)
90+
add_columns(row_level_filter->actions);
91+
if (prewhere_info)
92+
add_columns(prewhere_info->prewhere_actions);
8993
}
9094

9195
ColumnsWithTypeAndName columns = keys.getColumnsWithTypeAndName();

src/Formats/FormatFilterInfo.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ struct Settings;
1212
class KeyCondition;
1313
struct PrewhereInfo;
1414
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
15+
struct FilterDAGInfo;
16+
using FilterDAGInfoPtr = std::shared_ptr<FilterDAGInfo>;
1517

1618
/// Some formats needs to custom mapping between columns in file and clickhouse columns.
1719
class ColumnMapper
@@ -41,12 +43,18 @@ using FormatFilterInfoPtr = std::shared_ptr<FormatFilterInfo>;
4143
/// because most implementations don't use most of this struct.
4244
struct FormatFilterInfo
4345
{
44-
FormatFilterInfo(std::shared_ptr<const ActionsDAG> filter_actions_dag_, const ContextPtr & context_, ColumnMapperPtr column_mapper_);
46+
FormatFilterInfo(
47+
std::shared_ptr<const ActionsDAG> filter_actions_dag_,
48+
const ContextPtr & context_,
49+
ColumnMapperPtr column_mapper_,
50+
FilterDAGInfoPtr row_level_filter_,
51+
PrewhereInfoPtr prewhere_info_);
4552

4653
FormatFilterInfo();
4754

4855
std::shared_ptr<const ActionsDAG> filter_actions_dag;
4956
ContextWeakPtr context; // required only if `filter_actions_dag` is set
57+
FilterDAGInfoPtr row_level_filter;
5058
PrewhereInfoPtr prewhere_info; // assigned only if the format supports prewhere
5159

5260
/// Optionally created from filter_actions_dag, if the format needs it.

src/Interpreters/ExpressionAnalyzer.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1936,7 +1936,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
19361936
bool first_stage_,
19371937
bool second_stage_,
19381938
bool only_types,
1939-
const FilterDAGInfoPtr & filter_info_,
1939+
const FilterDAGInfoPtr & row_policy_info_,
19401940
const FilterDAGInfoPtr & additional_filter,
19411941
const Block & source_header)
19421942
: first_stage(first_stage_)
@@ -2034,10 +2034,10 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
20342034
columns_for_additional_filter.begin(), columns_for_additional_filter.end());
20352035
}
20362036

2037-
if (storage && filter_info_)
2037+
if (storage && row_policy_info_)
20382038
{
2039-
filter_info = filter_info_;
2040-
filter_info->do_remove_column = true;
2039+
row_policy_info = row_policy_info_;
2040+
row_policy_info->do_remove_column = true;
20412041
}
20422042

20432043
if (prewhere_dag_and_flags = query_analyzer.appendPrewhere(chain, !first_stage); prewhere_dag_and_flags)
@@ -2376,9 +2376,9 @@ std::string ExpressionAnalysisResult::dump() const
23762376
ss << "prewhere_info " << prewhere_info->dump() << "\n";
23772377
}
23782378

2379-
if (filter_info)
2379+
if (row_policy_info)
23802380
{
2381-
ss << "filter_info " << filter_info->dump() << "\n";
2381+
ss << "filter_info " << row_policy_info->dump() << "\n";
23822382
}
23832383

23842384
if (before_aggregation)

src/Interpreters/ExpressionAnalyzer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ struct ExpressionAnalysisResult
273273
NameSet columns_to_remove_after_prewhere;
274274

275275
PrewhereInfoPtr prewhere_info;
276-
FilterDAGInfoPtr filter_info;
276+
FilterDAGInfoPtr row_policy_info;
277277
ConstantFilterDescription prewhere_constant_filter_description;
278278
ConstantFilterDescription where_constant_filter_description;
279279
/// Actions by every element of ORDER BY
@@ -288,12 +288,12 @@ struct ExpressionAnalysisResult
288288
bool first_stage,
289289
bool second_stage,
290290
bool only_types,
291-
const FilterDAGInfoPtr & filter_info,
291+
const FilterDAGInfoPtr & row_policy_info,
292292
const FilterDAGInfoPtr & additional_filter, /// for setting additional_filters
293293
const Block & source_header);
294294

295295
/// Filter for row-level security.
296-
bool hasFilter() const { return filter_info.get(); }
296+
bool hasRowPolicyFilter() const { return row_policy_info.get(); }
297297

298298
bool hasJoin() const { return join.get(); }
299299
bool hasPrewhere() const { return prewhere_info.get(); }

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 43 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
894894
/// Fix source_header for filter actions.
895895
if (row_policy_filter && !row_policy_filter->empty())
896896
{
897-
filter_info = generateFilterActions(
897+
row_policy_info = generateFilterActions(
898898
table_id, row_policy_filter->expression, context, storage, storage_snapshot, metadata_snapshot, required_columns,
899899
prepared_sets);
900900

@@ -1063,8 +1063,6 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
10631063
max_rows = max_rows ? std::min(max_rows, settings[Setting::max_rows_to_read].value) : settings[Setting::max_rows_to_read];
10641064
query_info_copy.trivial_limit = max_rows;
10651065

1066-
/// Apply filters to prewhere and add them to the query_info so we can filter out parts efficiently during row estimation
1067-
applyFiltersToPrewhereInAnalysis(analysis_copy);
10681066
if (analysis_copy.prewhere_info)
10691067
{
10701068
query_info_copy.prewhere_info = analysis_copy.prewhere_info;
@@ -1080,13 +1078,13 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
10801078
= query_info_copy.prewhere_info->prewhere_actions.findInOutputs(query_info_copy.prewhere_info->prewhere_column_name);
10811079
added_filter_nodes.nodes.push_back(&node);
10821080
}
1081+
}
10831082

1084-
if (query_info_copy.prewhere_info->row_level_filter)
1085-
{
1086-
const auto & node
1087-
= query_info_copy.prewhere_info->row_level_filter->findInOutputs(query_info_copy.prewhere_info->row_level_column_name);
1088-
added_filter_nodes.nodes.push_back(&node);
1089-
}
1083+
if (query_info_copy.row_level_filter)
1084+
{
1085+
const auto & node
1086+
= query_info_copy.row_level_filter->actions.findInOutputs(query_info_copy.row_level_filter->column_name);
1087+
added_filter_nodes.nodes.push_back(&node);
10901088
}
10911089

10921090
if (auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes))
@@ -1189,7 +1187,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
11891187
&& options.to_stage > QueryProcessingStage::WithMergeableState;
11901188

11911189
analysis_result = ExpressionAnalysisResult(
1192-
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, filter_info, additional_filter_info, *source_header);
1190+
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, row_policy_info, additional_filter_info, *source_header);
11931191

11941192
if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
11951193
{
@@ -1632,32 +1630,20 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
16321630
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
16331631
query_plan.addStep(std::move(read_nothing));
16341632

1635-
if (expressions.filter_info)
1633+
if (expressions.row_policy_info)
16361634
{
16371635
auto row_level_security_step = std::make_unique<FilterStep>(
16381636
query_plan.getCurrentHeader(),
1639-
expressions.filter_info->actions.clone(),
1640-
expressions.filter_info->column_name,
1641-
expressions.filter_info->do_remove_column);
1637+
expressions.row_policy_info->actions.clone(),
1638+
expressions.row_policy_info->column_name,
1639+
expressions.row_policy_info->do_remove_column);
16421640

16431641
row_level_security_step->setStepDescription("Row-level security filter");
16441642
query_plan.addStep(std::move(row_level_security_step));
16451643
}
16461644

16471645
if (expressions.prewhere_info)
16481646
{
1649-
if (expressions.prewhere_info->row_level_filter)
1650-
{
1651-
auto row_level_filter_step = std::make_unique<FilterStep>(
1652-
query_plan.getCurrentHeader(),
1653-
expressions.prewhere_info->row_level_filter->clone(),
1654-
expressions.prewhere_info->row_level_column_name,
1655-
true);
1656-
1657-
row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
1658-
query_plan.addStep(std::move(row_level_filter_step));
1659-
}
1660-
16611647
auto prewhere_step = std::make_unique<FilterStep>(
16621648
query_plan.getCurrentHeader(),
16631649
expressions.prewhere_info->prewhere_actions.clone(),
@@ -1759,13 +1745,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
17591745
{
17601746
// If there is a storage that supports prewhere, this will always be nullptr
17611747
// Thus, we don't actually need to check if projection is active.
1762-
if (expressions.filter_info)
1748+
if (expressions.row_policy_info && !(!input_pipe && storage && storage->supportsPrewhere()))
17631749
{
17641750
auto row_level_security_step = std::make_unique<FilterStep>(
17651751
query_plan.getCurrentHeader(),
1766-
expressions.filter_info->actions.clone(),
1767-
expressions.filter_info->column_name,
1768-
expressions.filter_info->do_remove_column);
1752+
expressions.row_policy_info->actions.clone(),
1753+
expressions.row_policy_info->column_name,
1754+
expressions.row_policy_info->do_remove_column);
17691755

17701756
row_level_security_step->setStepDescription("Row-level security filter");
17711757
query_plan.addStep(std::move(row_level_security_step));
@@ -2206,21 +2192,21 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
22062192
{
22072193
Pipe pipe(std::make_shared<NullSource>(std::make_shared<const Block>(source_header)));
22082194

2209-
if (query_info.prewhere_info)
2195+
if (query_info.row_level_filter)
22102196
{
2211-
auto & prewhere_info = *query_info.prewhere_info;
2212-
2213-
if (prewhere_info.row_level_filter)
2197+
auto row_level_actions = std::make_shared<ExpressionActions>(query_info.row_level_filter->actions.clone());
2198+
pipe.addSimpleTransform([&](const SharedHeader & header)
22142199
{
2215-
auto row_level_actions = std::make_shared<ExpressionActions>(prewhere_info.row_level_filter->clone());
2216-
pipe.addSimpleTransform([&](const SharedHeader & header)
2217-
{
2218-
return std::make_shared<FilterTransform>(header,
2219-
row_level_actions,
2220-
prewhere_info.row_level_column_name, true);
2221-
});
2222-
}
2200+
return std::make_shared<FilterTransform>(header,
2201+
row_level_actions,
2202+
query_info.row_level_filter->column_name,
2203+
query_info.row_level_filter->do_remove_column);
2204+
});
2205+
}
22232206

2207+
if (query_info.prewhere_info)
2208+
{
2209+
auto & prewhere_info = *query_info.prewhere_info;
22242210
auto filter_actions = std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions.clone());
22252211
pipe.addSimpleTransform([&](const SharedHeader & header)
22262212
{
@@ -2259,38 +2245,9 @@ bool InterpreterSelectQuery::shouldMoveToPrewhere() const
22592245
return settings[Setting::optimize_move_to_prewhere] && (!query.final() || settings[Setting::optimize_move_to_prewhere_if_final]);
22602246
}
22612247

2262-
/// Note that this is const and accepts the analysis ref to be able to use it to do analysis for parallel replicas
2263-
/// without affecting the final analysis multiple times
2264-
void InterpreterSelectQuery::applyFiltersToPrewhereInAnalysis(ExpressionAnalysisResult & analysis) const
2265-
{
2266-
if (!analysis.filter_info)
2267-
return;
2268-
2269-
if (!analysis.prewhere_info)
2270-
{
2271-
const bool does_storage_support_prewhere = !input_pipe && storage && storage->supportsPrewhere();
2272-
if (does_storage_support_prewhere && shouldMoveToPrewhere())
2273-
{
2274-
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
2275-
analysis.prewhere_info = std::make_shared<PrewhereInfo>(std::move(analysis.filter_info->actions), analysis.filter_info->column_name);
2276-
analysis.prewhere_info->remove_prewhere_column = std::move(analysis.filter_info->do_remove_column);
2277-
analysis.prewhere_info->need_filter = true;
2278-
analysis.filter_info = nullptr;
2279-
}
2280-
}
2281-
else
2282-
{
2283-
/// Add row level security actions to prewhere.
2284-
analysis.prewhere_info->row_level_filter = std::move(analysis.filter_info->actions);
2285-
analysis.prewhere_info->row_level_column_name = std::move(analysis.filter_info->column_name);
2286-
analysis.filter_info = nullptr;
2287-
}
2288-
}
2289-
2290-
22912248
void InterpreterSelectQuery::addPrewhereAliasActions()
22922249
{
2293-
applyFiltersToPrewhereInAnalysis(analysis_result);
2250+
auto & row_level_filter = analysis_result.row_policy_info;
22942251
auto & prewhere_info = analysis_result.prewhere_info;
22952252
auto & columns_to_remove_after_prewhere = analysis_result.columns_to_remove_after_prewhere;
22962253

@@ -2317,12 +2274,12 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
23172274
/// Get some columns directly from PREWHERE expression actions
23182275
auto prewhere_required_columns = prewhere_info->prewhere_actions.getRequiredColumns().getNames();
23192276
columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
2277+
}
23202278

2321-
if (prewhere_info->row_level_filter)
2322-
{
2323-
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
2324-
columns.insert(row_level_required_columns.begin(), row_level_required_columns.end());
2325-
}
2279+
if (row_level_filter)
2280+
{
2281+
auto row_level_required_columns = row_level_filter->actions.getRequiredColumns().getNames();
2282+
columns.insert(row_level_required_columns.begin(), row_level_required_columns.end());
23262283
}
23272284

23282285
return columns;
@@ -2480,13 +2437,15 @@ std::optional<UInt64> InterpreterSelectQuery::getTrivialCount(UInt64 allow_exper
24802437

24812438
// It's possible to optimize count() given only partition predicates
24822439
ActionsDAG::NodeRawConstPtrs filter_nodes;
2440+
if (analysis_result.hasRowPolicyFilter())
2441+
{
2442+
auto & row_level_filter = analysis_result.row_policy_info;
2443+
filter_nodes.push_back(&row_level_filter->actions.findInOutputs(row_level_filter->column_name));
2444+
}
24832445
if (analysis_result.hasPrewhere())
24842446
{
24852447
auto & prewhere_info = analysis_result.prewhere_info;
24862448
filter_nodes.push_back(&prewhere_info->prewhere_actions.findInOutputs(prewhere_info->prewhere_column_name));
2487-
2488-
if (prewhere_info->row_level_filter)
2489-
filter_nodes.push_back(&prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name));
24902449
}
24912450
if (analysis_result.hasWhere())
24922451
{
@@ -2677,10 +2636,11 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
26772636
if (max_streams == 0)
26782637
max_streams = 1;
26792638

2680-
auto & prewhere_info = analysis_result.prewhere_info;
2639+
if (analysis_result.row_policy_info && (!input_pipe && storage && storage->supportsPrewhere()))
2640+
query_info.row_level_filter = analysis_result.row_policy_info;
26812641

2682-
if (prewhere_info)
2683-
query_info.prewhere_info = prewhere_info;
2642+
if (analysis_result.prewhere_info)
2643+
query_info.prewhere_info = analysis_result.prewhere_info;
26842644

26852645
bool optimize_read_in_order = analysis_result.optimize_read_in_order;
26862646
bool optimize_aggregation_in_order = analysis_result.optimize_read_in_order && !query_analyzer->useGroupingSetKey();

src/Interpreters/InterpreterSelectQuery.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
220220
ExpressionAnalysisResult analysis_result;
221221
/// For row-level security.
222222
RowPolicyFilterPtr row_policy_filter;
223-
FilterDAGInfoPtr filter_info;
223+
FilterDAGInfoPtr row_policy_info;
224224

225225
/// For additional_filter setting.
226226
FilterDAGInfoPtr additional_filter_info;

0 commit comments

Comments
 (0)