Skip to content

Commit 1f0f8fa

Browse files
Merge pull request ClickHouse#78429 from amosbird/projection-index-1
Projection Index Step 1: Support _part_offset in normal projections
2 parents bb5a592 + 055406d commit 1f0f8fa

29 files changed

+1640
-88
lines changed

ci/jobs/scripts/check_style/aspell-ignore/en/aspell-dict.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2235,6 +2235,7 @@ mebibytes
22352235
memtable
22362236
memtables
22372237
mergeTreeIndex
2238+
mergeTreeProjection
22382239
mergeable
22392240
mergetree
22402241
messageID

docs/en/sql-reference/statements/alter/projection.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,49 @@ As mentioned before, we could review the `system.query_log` table. On the `proje
137137
SELECT query, projections FROM system.query_log WHERE query_id='<query_id>'
138138
```
139139

140+
## Normal projection with `_part_offset` field {#normal-projection-with-part-offset-field}
141+
142+
Creating a table with a normal projection that utilizes the `_part_offset` field:
143+
144+
```sql
145+
CREATE TABLE events
146+
(
147+
`event_time` DateTime,
148+
`event_id` UInt64,
149+
`user_id` UInt64,
150+
`huge_string` String,
151+
PROJECTION order_by_user_id
152+
(
153+
SELECT
154+
_part_offset
155+
ORDER BY user_id
156+
)
157+
)
158+
ENGINE = MergeTree()
159+
ORDER BY (event_id);
160+
```
161+
162+
Inserting some sample data:
163+
164+
```sql
165+
INSERT INTO events SELECT * FROM generateRandom() LIMIT 100000;
166+
```
167+
168+
### Using `_part_offset` as a secondary index {#normal-projection-secondary-index}
169+
170+
The `_part_offset` field preserves its value through merges and mutations, making it valuable for secondary indexing. We can leverage this in queries:
171+
172+
```sql
173+
SELECT
174+
count()
175+
FROM events
176+
WHERE (_part, _part_offset) IN (
177+
SELECT _part, _part_offset
178+
FROM events
179+
WHERE user_id = 42
180+
)
181+
```
182+
140183
# Manipulating Projections
141184

142185
The following operations with [projections](/engines/table-engines/mergetree-family/mergetree.md/#projections) are available:
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
---
2+
description: 'Represents the contents of some projection in MergeTree tables.
3+
It can be used for introspection.'
4+
sidebar_label: 'mergeTreeProjection'
5+
sidebar_position: 77
6+
slug: /sql-reference/table-functions/mergeTreeProjection
7+
title: 'mergeTreeProjection'
8+
---
9+
10+
# mergeTreeProjection Table Function
11+
12+
Represents the contents of some projection in MergeTree tables. It can be used for introspection.
13+
14+
```sql
15+
mergeTreeProjection(database, table, projection)
16+
```
17+
18+
**Arguments**
19+
20+
- `database`- The database name to read projection from.
21+
- `table`- The table name to read projection from.
22+
- `projection` - The projection to read from.
23+
24+
**Returned Value**
25+
26+
A table object with columns provided by given projection.
27+
28+
## Usage Example {#usage-example}
29+
30+
```sql
31+
CREATE TABLE test
32+
(
33+
`user_id` UInt64,
34+
`item_id` UInt64,
35+
PROJECTION order_by_item_id
36+
(
37+
SELECT _part_offset
38+
ORDER BY item_id
39+
)
40+
)
41+
ENGINE = MergeTree
42+
ORDER BY user_id;
43+
44+
INSERT INTO test SELECT number, 100 - number FROM numbers(5);
45+
```
46+
47+
```sql
48+
SELECT *, _part_offset FROM mergeTreeProjection(currentDatabase(), test, order_by_item_id);
49+
```
50+
51+
```text
52+
┌─item_id─┬─_parent_part_offset─┬─_part_offset─┐
53+
1. │ 96 │ 4 │ 0 │
54+
2. │ 97 │ 3 │ 1 │
55+
3. │ 98 │ 2 │ 2 │
56+
4. │ 99 │ 1 │ 3 │
57+
5. │ 100 │ 0 │ 4 │
58+
└─────────┴─────────────────────┴──────────────┘
59+
```
60+
61+
```sql
62+
DESCRIBE mergeTreeProjection(currentDatabase(), test, order_by_item_id) SETTINGS describe_compact_output = 1;
63+
```
64+
65+
```text
66+
┌─name────────────────┬─type───┐
67+
1. │ item_id │ UInt64 │
68+
2. │ _parent_part_offset │ UInt64 │
69+
└─────────────────────┴────────┘
70+
```

src/Interpreters/MutationsInterpreter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,8 @@ void MutationsInterpreter::Source::read(
12851285
storage_snapshot,
12861286
part,
12871287
alter_conversions,
1288+
nullptr,
1289+
0,
12881290
required_columns,
12891291
nullptr,
12901292
apply_deleted_mask_,

src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
2-
#include <Processors/QueryPlan/Optimizations/projectionsCommon.h>
1+
#include <Core/Settings.h>
32
#include <Processors/QueryPlan/ExpressionStep.h>
43
#include <Processors/QueryPlan/FilterStep.h>
4+
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
5+
#include <Processors/QueryPlan/Optimizations/projectionsCommon.h>
56
#include <Processors/QueryPlan/ReadFromMergeTree.h>
6-
#include <Processors/QueryPlan/UnionStep.h>
77
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
8+
#include <Processors/QueryPlan/UnionStep.h>
89
#include <Processors/Sources/NullSource.h>
9-
#include <Common/logger_useful.h>
10-
#include <Core/Settings.h>
10+
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
1111
#include <Storages/ProjectionsDescription.h>
1212
#include <Storages/SelectQueryInfo.h>
13-
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
14-
15-
#include <algorithm>
1613

1714
namespace DB
1815
{
@@ -72,18 +69,6 @@ static std::optional<ActionsDAG> makeMaterializingDAG(const Block & proj_header,
7269
return dag;
7370
}
7471

75-
static bool hasAllRequiredColumns(const ProjectionDescription * projection, const Names & required_columns)
76-
{
77-
for (const auto & col : required_columns)
78-
{
79-
if (!projection->sample_block.has(col))
80-
return false;
81-
}
82-
83-
return true;
84-
}
85-
86-
8772
std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
8873
{
8974
const auto & frame = stack.back();
@@ -100,8 +85,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
10085
{
10186
iter = std::next(iter);
10287

103-
if (!typeid_cast<FilterStep *>(iter->node->step.get()) &&
104-
!typeid_cast<ExpressionStep *>(iter->node->step.get()))
88+
if (!typeid_cast<FilterStep *>(iter->node->step.get()) && !typeid_cast<ExpressionStep *>(iter->node->step.get()))
10589
break;
10690
}
10791

@@ -124,7 +108,8 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
124108
auto it = std::find_if(
125109
normal_projections.begin(),
126110
normal_projections.end(),
127-
[&](const auto * projection) { return projection->name == context->getSettingsRef()[Setting::preferred_optimize_projection_name].value; });
111+
[&](const auto * projection)
112+
{ return projection->name == context->getSettingsRef()[Setting::preferred_optimize_projection_name].value; });
128113

129114
if (it != normal_projections.end())
130115
{
@@ -133,20 +118,52 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
133118
normal_projections.push_back(preferred_projection);
134119
}
135120

121+
Names required_columns = reading->getAllColumnNames();
122+
123+
/// If `with_parent_part_offset` is true and the required columns include `_part_offset`,
124+
/// we need to remap it to `_parent_part_offset`. This ensures that the projection's
125+
/// ActionsDAG reads from the correct column and generates `_part_offset` in the output.
126+
bool with_parent_part_offset = std::any_of(
127+
normal_projections.begin(), normal_projections.end(), [](const auto & projection) { return projection->with_parent_part_offset; });
128+
bool need_parent_part_offset = false;
129+
if (with_parent_part_offset)
130+
{
131+
for (auto & name : required_columns)
132+
{
133+
if (name == "_part_offset")
134+
{
135+
name = "_parent_part_offset";
136+
need_parent_part_offset = true;
137+
}
138+
}
139+
}
140+
136141
QueryDAG query;
137142
{
138143
auto & child = iter->node->children[iter->next_child - 1];
139144
if (!query.build(*child))
140145
return {};
141146

147+
if (need_parent_part_offset)
148+
{
149+
ActionsDAG rename_dag;
150+
const auto * node = &rename_dag.addInput("_parent_part_offset", std::make_shared<DataTypeUInt64>());
151+
node = &rename_dag.addAlias(*node, "_part_offset");
152+
rename_dag.getOutputs() = {node};
153+
154+
if (query.dag)
155+
query.dag = ActionsDAG::merge(std::move(rename_dag), *std::move(query.dag));
156+
else
157+
query.dag = std::move(rename_dag);
158+
}
159+
142160
if (query.dag)
143161
query.dag->removeUnusedActions();
144162
}
145163

146164
std::list<NormalProjectionCandidate> candidates;
147165
NormalProjectionCandidate * best_candidate = nullptr;
148166

149-
const Names & required_columns = reading->getAllColumnNames();
150167
const auto & query_info = reading->getQueryInfo();
151168
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
152169

@@ -168,9 +185,21 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
168185

169186
auto logger = getLogger("optimizeUseNormalProjections");
170187

188+
auto projection_virtuals = reading->getMergeTreeData().getProjectionVirtualsPtr();
189+
auto has_all_required_columns = [&](const ProjectionDescription * projection)
190+
{
191+
for (const auto & col : required_columns)
192+
{
193+
if (!projection->sample_block.has(col) && !projection_virtuals->has(col))
194+
return false;
195+
}
196+
197+
return true;
198+
};
199+
171200
for (const auto * projection : normal_projections)
172201
{
173-
if (!hasAllRequiredColumns(projection, required_columns))
202+
if (!has_all_required_columns(projection))
174203
continue;
175204

176205
auto & candidate = candidates.emplace_back();
@@ -237,7 +266,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
237266
query_info_copy.prewhere_info = nullptr;
238267

239268
auto projection_reading = reader.readFromParts(
240-
/*parts=*/ {},
269+
/*parts=*/{},
241270
reading->getMutationsSnapshot()->cloneEmpty(),
242271
required_columns,
243272
proj_snapshot,
@@ -280,15 +309,11 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
280309
if (query.filter_node)
281310
{
282311
expr_or_filter_node.step = std::make_unique<FilterStep>(
283-
projection_reading_node.step->getOutputHeader(),
284-
std::move(*query.dag),
285-
query.filter_node->result_name,
286-
true);
312+
projection_reading_node.step->getOutputHeader(), std::move(*query.dag), query.filter_node->result_name, true);
287313
}
288314
else
289-
expr_or_filter_node.step = std::make_unique<ExpressionStep>(
290-
projection_reading_node.step->getOutputHeader(),
291-
std::move(*query.dag));
315+
expr_or_filter_node.step
316+
= std::make_unique<ExpressionStep>(projection_reading_node.step->getOutputHeader(), std::move(*query.dag));
292317

293318
expr_or_filter_node.children.push_back(&projection_reading_node);
294319
next_node = &expr_or_filter_node;

0 commit comments

Comments
 (0)