Skip to content

Commit 11d5900

Browse files
Merge pull request ClickHouse#79417 from amosbird/projection-index-2
Add `_part_starting_offset` virtual column and key condition support for offset-based querying
2 parents 13a8559 + f41142b commit 11d5900

28 files changed

+486
-248
lines changed

docs/en/engines/table-engines/mergetree-family/mergetree.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,8 +966,11 @@ ClickHouse versions 22.3 through 22.7 use a different cache configuration, see [
966966

967967
- `_part` — Name of a part.
968968
- `_part_index` — Sequential index of the part in the query result.
969+
- `_part_starting_offset` — Cumulative starting row of the part in the query result.
970+
- `_part_offset` — Number of row in the part.
969971
- `_partition_id` — Name of a partition.
970972
- `_part_uuid` — Unique part identifier (if enabled MergeTree setting `assign_part_uuids`).
973+
- `_part_data_version` — Data version of part (either min block number or mutation version).
971974
- `_partition_value` — Values (a tuple) of a `partition by` expression.
972975
- `_sample_factor` — Sample factor (from the query).
973976
- `_block_number` — Block number of the row, it is persisted on merges when `allow_experimental_block_number_column` is set to true.

src/Interpreters/MutationsInterpreter.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,10 +1283,9 @@ void MutationsInterpreter::Source::read(
12831283
plan,
12841284
*data,
12851285
storage_snapshot,
1286-
part,
1286+
RangesInDataPart(part),
12871287
alter_conversions,
12881288
nullptr,
1289-
0,
12901289
required_columns,
12911290
nullptr,
12921291
apply_deleted_mask_,

src/Processors/QueryPlan/Optimizations/actionsDAGUtils.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,4 +434,66 @@ void applyActionsToSortDescription(
434434
description.resize(prefix_size);
435435
}
436436

437+
std::optional<std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *>> resolveMatchedInputs(
438+
const MatchedTrees::Matches & matches,
439+
const std::unordered_set<const ActionsDAG::Node *> & allowed_inputs,
440+
const ActionsDAG::NodeRawConstPtrs & nodes)
441+
{
442+
struct Frame
443+
{
444+
const ActionsDAG::Node * node;
445+
size_t next_child_to_visit = 0;
446+
};
447+
448+
std::stack<Frame> stack;
449+
std::unordered_set<const ActionsDAG::Node *> visited;
450+
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> new_inputs;
451+
452+
for (const auto * node : nodes)
453+
{
454+
if (visited.contains(node))
455+
continue;
456+
457+
stack.push({.node = node});
458+
459+
while (!stack.empty())
460+
{
461+
auto & frame = stack.top();
462+
463+
if (frame.next_child_to_visit == 0)
464+
{
465+
auto jt = matches.find(frame.node);
466+
if (jt != matches.end())
467+
{
468+
const auto & match = jt->second;
469+
if (match.node && !match.monotonicity && allowed_inputs.contains(match.node))
470+
{
471+
visited.insert(frame.node);
472+
new_inputs[frame.node] = match.node;
473+
stack.pop();
474+
continue;
475+
}
476+
}
477+
}
478+
479+
if (frame.next_child_to_visit < frame.node->children.size())
480+
{
481+
stack.push({.node = frame.node->children[frame.next_child_to_visit]});
482+
++frame.next_child_to_visit;
483+
continue;
484+
}
485+
486+
/// Not a match and there is no matched child.
487+
if (frame.node->type == ActionsDAG::ActionType::INPUT)
488+
return std::nullopt;
489+
490+
/// Not a match, but all children matched.
491+
visited.insert(frame.node);
492+
stack.pop();
493+
}
494+
}
495+
496+
return new_inputs;
497+
}
498+
437499
}

src/Processors/QueryPlan/Optimizations/actionsDAGUtils.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,20 @@ void applyActionsToSortDescription(
6363
const ActionsDAG & dag,
6464
/// Ignore this one output of DAG. Used for FilterStep where filter column is removed.
6565
const ActionsDAG::Node * output_to_skip = nullptr);
66+
67+
/// Traverses a set of root nodes (`nodes`) and attempts to replace them (and their subtrees)
68+
/// with matched input nodes from the `matches` map, under the condition that:
69+
/// - The match is non-monotonic (`!match.monotonicity`)
70+
/// - The matched node is allowed (present in `allowed_inputs`)
71+
///
72+
/// This function performs a depth-first traversal to ensure all nodes and their dependencies
73+
/// can be substituted by valid inputs. If any node ultimately resolves to an INPUT node that
74+
/// has no valid match in `matches`, the function returns `std::nullopt` to indicate that
75+
/// a consistent input substitution map cannot be constructed.
76+
///
77+
/// The primary use case is to construct the input substitution map required by `ActionsDAG::foldActionsByProjection`.
78+
std::optional<std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *>> resolveMatchedInputs(
79+
const MatchedTrees::Matches & matches,
80+
const std::unordered_set<const ActionsDAG::Node *> & allowed_inputs,
81+
const ActionsDAG::NodeRawConstPtrs & nodes);
6682
}

src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp

Lines changed: 4 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -335,67 +335,11 @@ std::optional<ActionsDAG> analyzeAggregateProjection(
335335

336336
/// Here we want to match query keys with projection keys.
337337
/// Query key can be any expression depending on projection keys.
338+
auto new_inputs = resolveMatchedInputs(matches, proj_key_nodes, query_key_nodes);
339+
if (!new_inputs)
340+
return {};
338341

339-
struct Frame
340-
{
341-
const ActionsDAG::Node * node;
342-
size_t next_child_to_visit = 0;
343-
};
344-
345-
std::stack<Frame> stack;
346-
std::unordered_set<const ActionsDAG::Node *> visited;
347-
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> new_inputs;
348-
349-
for (const auto * key_node : query_key_nodes)
350-
{
351-
if (visited.contains(key_node))
352-
continue;
353-
354-
stack.push({.node = key_node});
355-
356-
while (!stack.empty())
357-
{
358-
auto & frame = stack.top();
359-
360-
if (frame.next_child_to_visit == 0)
361-
{
362-
auto jt = matches.find(frame.node);
363-
if (jt != matches.end())
364-
{
365-
auto & match = jt->second;
366-
if (match.node && !match.monotonicity && proj_key_nodes.contains(match.node))
367-
{
368-
visited.insert(frame.node);
369-
new_inputs[frame.node] = match.node;
370-
stack.pop();
371-
continue;
372-
}
373-
}
374-
}
375-
376-
if (frame.next_child_to_visit < frame.node->children.size())
377-
{
378-
stack.push({.node = frame.node->children[frame.next_child_to_visit]});
379-
++frame.next_child_to_visit;
380-
continue;
381-
}
382-
383-
/// Not a match and there is no matched child.
384-
if (frame.node->type == ActionsDAG::ActionType::INPUT)
385-
{
386-
// LOG_TRACE(getLogger("optimizeUseProjections"), "Cannot find match for {}", frame.node->result_name);
387-
return {};
388-
}
389-
390-
/// Not a match, but all children matched.
391-
visited.insert(frame.node);
392-
stack.pop();
393-
}
394-
}
395-
396-
// LOG_TRACE(getLogger("optimizeUseProjections"), "Folding actions by projection");
397-
398-
auto proj_dag = ActionsDAG::foldActionsByProjection(new_inputs, query_key_nodes);
342+
auto proj_dag = ActionsDAG::foldActionsByProjection(*new_inputs, query_key_nodes);
399343
appendAggregateFunctions(proj_dag, aggregates, *matched_aggregates);
400344
return proj_dag;
401345
}

src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,17 +229,22 @@ bool analyzeProjectionCandidate(
229229
const PartitionIdToMaxBlockPtr & max_added_blocks,
230230
const ActionsDAG * dag)
231231
{
232-
MergeTreeData::DataPartsVector projection_parts;
233-
MergeTreeData::DataPartsVector normal_parts;
232+
RangesInDataParts projection_parts;
233+
RangesInDataParts normal_parts;
234234

235235
for (const auto & part_with_ranges : parts_with_ranges)
236236
{
237237
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
238238
auto it = created_projections.find(candidate.projection->name);
239239
if (it != created_projections.end() && !it->second->is_broken)
240-
projection_parts.push_back(it->second);
240+
{
241+
projection_parts.push_back(
242+
RangesInDataPart(it->second, part_with_ranges.part_index_in_query, part_with_ranges.part_starting_offset_in_query));
243+
}
241244
else
242-
normal_parts.push_back(part_with_ranges.data_part);
245+
{
246+
normal_parts.push_back(part_with_ranges);
247+
}
243248
}
244249

245250
if (projection_parts.empty())

src/Processors/QueryPlan/PartsSplitter.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ class RangesInDataPartsBuilder
251251
ranges_in_data_parts.emplace_back(
252252
initial_ranges_in_data_parts[part_index].data_part,
253253
initial_ranges_in_data_parts[part_index].part_index_in_query,
254+
initial_ranges_in_data_parts[part_index].part_starting_offset_in_query,
254255
MarkRanges{mark_range});
255256
part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index;
256257
return;

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,6 @@ size_t countPartitions(const RangesInDataParts & parts_with_ranges)
9090
return countPartitions(parts_with_ranges, get_partition_id);
9191
}
9292

93-
size_t countPartitions(const MergeTreeData::DataPartsVector & prepared_parts)
94-
{
95-
auto get_partition_id = [](const MergeTreeData::DataPartPtr data_part) { return data_part->info.getPartitionId(); };
96-
return countPartitions(prepared_parts, get_partition_id);
97-
}
98-
9993
bool restoreDAGInputs(ActionsDAG & dag, const NameSet & inputs)
10094
{
10195
std::unordered_set<const ActionsDAG::Node *> outputs(dag.getOutputs().begin(), dag.getOutputs().end());
@@ -336,7 +330,7 @@ void ReadFromMergeTree::AnalysisResult::checkLimits(const Settings & settings, c
336330
}
337331

338332
ReadFromMergeTree::ReadFromMergeTree(
339-
MergeTreeData::DataPartsVector parts_,
333+
RangesInDataParts parts_,
340334
MergeTreeData::MutationsSnapshotPtr mutations_,
341335
Names all_column_names_,
342336
const MergeTreeData & data_,
@@ -1245,7 +1239,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
12451239
}
12461240

12471241
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
1248-
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
1242+
new_parts.emplace_back(
1243+
part.data_part, part.part_index_in_query, part.part_starting_offset_in_query, std::move(ranges_to_get_from_part));
12491244
}
12501245

12511246
split_parts_and_ranges.emplace_back(std::move(new_parts));
@@ -1488,7 +1483,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
14881483
RangesInDataParts new_parts;
14891484

14901485
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
1491-
new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges);
1486+
new_parts.emplace_back(
1487+
part_it->data_part, part_it->part_index_in_query, part_it->part_starting_offset_in_query, part_it->ranges);
14921488

14931489
if (new_parts.empty())
14941490
continue;
@@ -1631,7 +1627,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(bool
16311627
return selectRangesToRead(prepared_parts, find_exact_ranges);
16321628
}
16331629

1634-
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges) const
1630+
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(RangesInDataParts parts, bool find_exact_ranges) const
16351631
{
16361632
return selectRangesToRead(
16371633
std::move(parts),
@@ -1653,7 +1649,7 @@ static void buildIndexes(
16531649
std::optional<ReadFromMergeTree::Indexes> & indexes,
16541650
const ActionsDAG * filter_actions_dag,
16551651
const MergeTreeData & data,
1656-
const MergeTreeData::DataPartsVector & parts,
1652+
const RangesInDataParts & parts,
16571653
const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot,
16581654
[[maybe_unused]] const std::optional<VectorSearchParameters> & vector_search_parameters,
16591655
const ContextPtr & context,
@@ -1684,8 +1680,15 @@ static void buildIndexes(
16841680
indexes->partition_pruner.emplace(metadata_snapshot, filter_dag, context, false /* strict */);
16851681
}
16861682

1687-
indexes->part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_dag.predicate, context);
1688-
MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_dag.predicate, context);
1683+
indexes->part_values
1684+
= MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_dag.predicate, context);
1685+
1686+
/// Perform virtual column key analysis only when no corresponding physical columns exist.
1687+
const auto & columns = metadata_snapshot->getColumns();
1688+
if (!columns.has("_part_offset") && !columns.has("_part"))
1689+
MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_dag.predicate, context);
1690+
if (!columns.has("_part_offset") && !columns.has("_part_starting_offset"))
1691+
MergeTreeDataSelectExecutor::buildKeyConditionFromTotalOffset(indexes->total_offset_condition, filter_dag.predicate, context);
16891692

16901693
indexes->use_skip_indexes = settings[Setting::use_skip_indexes];
16911694
if (query_info.isFinal() && !settings[Setting::use_skip_indexes_if_final])
@@ -1834,7 +1837,7 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
18341837
}
18351838

18361839
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
1837-
MergeTreeData::DataPartsVector parts,
1840+
RangesInDataParts parts,
18381841
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
18391842
const std::optional<VectorSearchParameters> & vector_search_parameters,
18401843
const StorageMetadataPtr & metadata_snapshot,
@@ -1867,7 +1870,17 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
18671870
const Names & primary_key_column_names = primary_key.column_names;
18681871

18691872
if (!indexes)
1870-
buildIndexes(indexes, query_info_.filter_actions_dag.get(), data, parts, mutations_snapshot, vector_search_parameters, context_, query_info_, metadata_snapshot, log);
1873+
buildIndexes(
1874+
indexes,
1875+
query_info_.filter_actions_dag.get(),
1876+
data,
1877+
parts,
1878+
mutations_snapshot,
1879+
vector_search_parameters,
1880+
context_,
1881+
query_info_,
1882+
metadata_snapshot,
1883+
log);
18711884

18721885
if (indexes->part_values && indexes->part_values->empty())
18731886
return std::make_shared<AnalysisResult>(std::move(result));
@@ -1890,6 +1903,9 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
18901903
if (indexes->part_offset_condition)
18911904
LOG_DEBUG(log, "Part offset condition: {}", indexes->part_offset_condition->toString());
18921905

1906+
if (indexes->total_offset_condition)
1907+
LOG_DEBUG(log, "Total offset condition: {}", indexes->total_offset_condition->toString());
1908+
18931909
if (indexes->key_condition.alwaysFalse())
18941910
return std::make_shared<AnalysisResult>(std::move(result));
18951911

@@ -1924,7 +1940,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
19241940
return std::make_shared<AnalysisResult>(std::move(result));
19251941

19261942
for (const auto & part : parts)
1927-
total_marks_pk += part->index_granularity->getMarksCountWithoutFinal();
1943+
total_marks_pk += part.data_part->index_granularity->getMarksCountWithoutFinal();
19281944
parts_before_pk = parts.size();
19291945

19301946
auto reader_settings = getMergeTreeReaderSettings(context_, query_info_);
@@ -1934,6 +1950,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
19341950
context_,
19351951
indexes->key_condition,
19361952
indexes->part_offset_condition,
1953+
indexes->total_offset_condition,
19371954
indexes->skip_indexes,
19381955
reader_settings,
19391956
log,
@@ -1945,12 +1962,11 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
19451962

19461963
MergeTreeDataSelectExecutor::filterPartsByQueryConditionCache(result.parts_with_ranges, query_info_, context_, log);
19471964

1948-
if (indexes->use_skip_indexes && !indexes->skip_indexes.useful_indices.empty() && query_info_.isFinal() && settings[Setting::use_skip_indexes_if_final_exact_mode])
1965+
if (indexes->use_skip_indexes && !indexes->skip_indexes.useful_indices.empty() && query_info_.isFinal()
1966+
&& settings[Setting::use_skip_indexes_if_final_exact_mode])
19491967
{
1950-
result.parts_with_ranges = findPKRangesForFinalAfterSkipIndex(primary_key,
1951-
metadata_snapshot->getSortingKey(),
1952-
result.parts_with_ranges,
1953-
log);
1968+
result.parts_with_ranges
1969+
= findPKRangesForFinalAfterSkipIndex(primary_key, metadata_snapshot->getSortingKey(), result.parts_with_ranges, log);
19541970
add_index_stat_row_for_pk_expand = true;
19551971
}
19561972
}
@@ -2135,7 +2151,7 @@ bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
21352151
{
21362152
std::unordered_map<String, size_t> partition_rows;
21372153
for (const auto & part : prepared_parts)
2138-
partition_rows[part->info.getPartitionId()] += part->rows_count;
2154+
partition_rows[part.data_part->info.getPartitionId()] += part.data_part->rows_count;
21392155
size_t sum_rows = 0;
21402156
size_t max_rows = 0;
21412157
for (const auto & [_, rows] : partition_rows)

0 commit comments

Comments
 (0)