Skip to content

Commit c85ea00

Browse files
authored
[feat](topn lazy materialize)using index topn lazy (#59572)
### What problem does this PR solve? if slot a is operative slots, but it is only used in a filter node which is parent of olapScan, this slot will be materialized by materialization node when `set topn_lazy_materialization_using_index = true;` * materialization(materializedSlots=[a, b], lazy=[c]) * ->topn(b) * ->filter(a=1, output=(rowid, a, b)) * ->materializeOlapScan(rowid, lazy=[c], T[a,b,c]) * => * materialization(materializedSlots=[b], lazy=[a, c]) * ->topn(b) * ->project(rowid, b) * ->filter(a=1, output=(rowid, a, b)) * ->materializeOlapScan(rowid, lazy=[a,c], T[a,b,c])
1 parent f2d789b commit c85ea00

File tree

7 files changed

+215
-23
lines changed

7 files changed

+215
-23
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@
3232
import com.google.common.base.Preconditions;
3333

3434
import java.util.BitSet;
35-
import java.util.List;
36-
import java.util.Optional;
37-
import java.util.concurrent.atomic.AtomicReference;
3835
import java.util.function.Supplier;
39-
import java.util.stream.Collectors;
4036

4137
/**
4238
* validator plan.
@@ -63,27 +59,20 @@ public Plan visit(Plan plan, CascadesContext context) {
6359
child.accept(this, context);
6460
}
6561

66-
Optional<Slot> opt = checkAllSlotFromChildren(plan);
67-
if (opt.isPresent()) {
68-
List<Slot> childrenOutput = plan.children().stream().flatMap(p -> p.getOutput().stream()).collect(
69-
Collectors.toList());
70-
throw new AnalysisException("A expression contains slot not from children\n"
71-
+ "Slot: " + opt.get() + " Children Output:" + childrenOutput + "\n"
72-
+ "Plan: " + plan.treeString() + "\n");
73-
}
62+
checkAllSlotFromChildren(plan);
7463
return plan;
7564
}
7665

7766
/**
7867
* Check all slot must from children.
7968
*/
80-
public static Optional<Slot> checkAllSlotFromChildren(Plan plan) {
69+
public static void checkAllSlotFromChildren(Plan plan) {
8170
if (plan.arity() == 0) {
82-
return Optional.empty();
71+
return;
8372
}
8473
// agg exist multi-phase
8574
if (plan instanceof Aggregate) {
86-
return Optional.empty();
75+
return;
8776
}
8877

8978
Supplier<BitSet> childrenOutputIds = LazyCompute.of(() -> {
@@ -97,22 +86,20 @@ public static Optional<Slot> checkAllSlotFromChildren(Plan plan) {
9786
});
9887

9988
for (Expression expression : plan.getExpressions()) {
100-
AtomicReference<Slot> invalidSlot = new AtomicReference<>();
10189
expression.anyMatch(e -> {
10290
if (e instanceof Slot) {
10391
Slot slot = (Slot) e;
10492
if (slot.getName().startsWith("mv") || slot instanceof SlotNotFromChildren) {
10593
return false;
10694
}
10795
if (!childrenOutputIds.get().get(slot.getExprId().asInt())) {
108-
invalidSlot.set(slot);
109-
return true;
96+
throw new AnalysisException("A expression contains slot not from children\n"
97+
+ "Slot: " + slot + " Children Output:" + childrenOutputIds.get() + "\n"
98+
+ "Plan: " + plan.treeString() + "\n");
11099
}
111100
}
112101
return false;
113102
});
114-
return Optional.ofNullable(invalidSlot.get());
115103
}
116-
return Optional.empty();
117104
}
118105
}

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import com.google.common.collect.BiMap;
4242
import com.google.common.collect.HashBiMap;
4343
import com.google.common.collect.ImmutableList;
44+
import org.apache.logging.log4j.LogManager;
45+
import org.apache.logging.log4j.Logger;
4446

4547
import java.util.ArrayList;
4648
import java.util.HashMap;
@@ -65,6 +67,7 @@ public class LazyMaterializeTopN extends PlanPostProcessor {
6567
when we create materializeNode for the first union child, set hasMaterialized=true
6668
to avoid generating materializeNode for other union's children
6769
*/
70+
private static final Logger LOG = LogManager.getLogger(LazyMaterializeTopN.class);
6871
private boolean hasMaterialized = false;
6972

7073
@Override
@@ -77,6 +80,7 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
7780
}
7881
return result;
7982
} catch (Exception e) {
83+
LOG.warn("lazy materialize topn failed", e);
8084
return topN;
8185
}
8286
}

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
2828
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
2929
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
30+
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
3031
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
3132
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeFileScan;
3233
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
@@ -39,20 +40,24 @@
3940
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
4041
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
4142
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
43+
import org.apache.doris.qe.SessionVariable;
4244

4345
import com.google.common.collect.ImmutableList;
46+
import com.google.common.collect.Lists;
4447

4548
import java.util.ArrayList;
4649
import java.util.HashMap;
4750
import java.util.List;
4851
import java.util.Map;
52+
import java.util.Optional;
53+
import java.util.stream.Collectors;
4954

5055
/**
51-
* prune lazy materialized slot
56+
prune lazy materialized slot
5257
*/
5358
public class LazySlotPruning extends DefaultPlanRewriter<LazySlotPruning.Context> {
5459
/**
55-
* Context
60+
Context
5661
*/
5762
public static class Context {
5863
private PhysicalRelation scan;
@@ -98,6 +103,48 @@ public Plan visit(Plan plan, Context context) {
98103
return plan;
99104
}
100105

106+
@Override
107+
public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Context context) {
108+
if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) {
109+
/*
110+
materialization(materializedSlots=[a, b], lazy=[c])
111+
->topn(b)
112+
->filter(a=1, output=(rowid, a, b))
113+
->materializeOlapScan(rowid, lazy=[c], T[a,b,c])
114+
=>
115+
materialization(materializedSlots=[b], lazy=[a, c])
116+
->topn(b)
117+
->project(rowid, b)
118+
->filter(a=1, output=(rowid, a, b))
119+
->materializeOlapScan(rowid, lazy=[a,c], T[a,b,c])
120+
*/
121+
List<Slot> lazySlotsToScan = new ArrayList<>();
122+
boolean lazySlotsChanged = false;
123+
124+
for (Slot slot : context.lazySlots) {
125+
if (!filter.getInputSlots().contains(slot)) {
126+
lazySlotsToScan.add(slot);
127+
} else {
128+
lazySlotsChanged = true;
129+
}
130+
}
131+
if (lazySlotsChanged) {
132+
Context contextForScan = context.withLazySlots(lazySlotsToScan);
133+
filter = (PhysicalFilter<? extends Plan>) filter.withChildren(
134+
filter.child().accept(this, contextForScan));
135+
filter = (PhysicalFilter<? extends Plan>) filter
136+
.copyStatsAndGroupIdFrom(filter).resetLogicalProperties();
137+
List<Slot> filterOutput = Lists.newArrayList(filter.getOutput());
138+
filterOutput.removeAll(filter.getInputSlots());
139+
return new PhysicalProject<>(
140+
filterOutput.stream().map(s -> (SlotReference) s).collect(Collectors.toList()),
141+
Optional.empty(), null,
142+
filter.getPhysicalProperties(), filter.getStats(), filter);
143+
}
144+
}
145+
return visit(filter, context);
146+
}
147+
101148
@Override
102149
public Plan visitPhysicalOlapScan(PhysicalOlapScan scan, Context context) {
103150
if (scan.getOutput().containsAll(context.lazySlots)) {

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
import org.apache.doris.nereids.trees.expressions.NamedExpression;
2929
import org.apache.doris.nereids.trees.expressions.SlotReference;
3030
import org.apache.doris.nereids.trees.plans.Plan;
31+
import org.apache.doris.nereids.trees.plans.algebra.Relation;
3132
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
33+
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
3234
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
3335
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
3436
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
3537
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
3638
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
3739
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
40+
import org.apache.doris.qe.SessionVariable;
3841

3942
import com.google.common.collect.ImmutableSet;
4043
import org.apache.logging.log4j.LogManager;
@@ -71,6 +74,24 @@ public ProbeContext(SlotReference slot) {
7174
}
7275
}
7376

77+
@Override
78+
public Optional<MaterializeSource> visitPhysicalFilter(PhysicalFilter<? extends Plan> filter,
79+
ProbeContext context) {
80+
if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) {
81+
// agg table do not support lazy materialize
82+
OlapTable table = ((PhysicalOlapScan) filter.child()).getTable();
83+
if (KeysType.AGG_KEYS.equals(table.getKeysType())) {
84+
return Optional.empty();
85+
}
86+
if (filter.getInputSlots().contains(context.slot)) {
87+
return Optional.of(new MaterializeSource((Relation) filter.child(), context.slot));
88+
} else {
89+
return filter.child().accept(this, context);
90+
}
91+
}
92+
return this.visit(filter, context);
93+
}
94+
7495
@Override
7596
public Optional<MaterializeSource> visit(Plan plan, ProbeContext context) {
7697
if (plan.getInputSlots().contains(context.slot)) {
@@ -195,7 +216,7 @@ public Optional<MaterializeSource> visitPhysicalProject(
195216
} else {
196217
// projectExpr is alias
197218
Alias alias = (Alias) projectExpr;
198-
if (alias.child() instanceof SlotReference) {
219+
if (alias.child() instanceof SlotReference && !SessionVariable.getTopNLazyMaterializationUsingIndex()) {
199220
ProbeContext childContext = new ProbeContext((SlotReference) alias.child());
200221
return project.child().accept(this, childContext);
201222
} else {

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,11 @@ public enum IgnoreSplitType {
16111611
varType = VariableAnnotation.EXPERIMENTAL)
16121612
public int topNLazyMaterializationThreshold = 1024;
16131613

1614+
@VariableMgr.VarAttr(name = "topn_lazy_materialization_using_index", needForward = true,
1615+
fuzzy = false,
1616+
varType = VariableAnnotation.EXPERIMENTAL)
1617+
public boolean topNLazyMaterializationUsingIndex = false;
1618+
16141619
@VariableMgr.VarAttr(name = ENABLE_PRUNE_NESTED_COLUMN, needForward = true,
16151620
fuzzy = false,
16161621
varType = VariableAnnotation.EXPERIMENTAL,
@@ -1631,6 +1636,14 @@ public static int getTopNLazyMaterializationThreshold() {
16311636
}
16321637
}
16331638

1639+
public static boolean getTopNLazyMaterializationUsingIndex() {
1640+
if (ConnectContext.get() != null) {
1641+
return ConnectContext.get().getSessionVariable().topNLazyMaterializationUsingIndex;
1642+
} else {
1643+
return VariableMgr.getDefaultSessionVariable().topNLazyMaterializationUsingIndex;
1644+
}
1645+
}
1646+
16341647
@VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT, needForward = true)
16351648
private boolean disableInvertedIndexV1ForVaraint = true;
16361649

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !plan --
3+
PhysicalResultSink
4+
--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username]
5+
----PhysicalLazyMaterialize[materializedSlots:(t1.username) lazySlots:(t1.addr,t1.age,t1.user_id)]
6+
------PhysicalTopN[MERGE_SORT]
7+
--------PhysicalDistribute[DistributionSpecGather]
8+
----------PhysicalTopN[LOCAL_SORT]
9+
------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t1, t1.username]
10+
--------------filter((t1.user_id = 1))
11+
----------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.age,t1.addr)]
12+
13+
-- !exec --
14+
1 a 10 cd
15+
16+
-- !plan2 --
17+
PhysicalResultSink
18+
--PhysicalProject[t2.addr, t2.age, t2.user_id, t2.username]
19+
----PhysicalLazyMaterialize[materializedSlots:(t2.username) lazySlots:(t2.addr,t2.age,t2.user_id)]
20+
------PhysicalTopN[MERGE_SORT]
21+
--------PhysicalDistribute[DistributionSpecGather]
22+
----------PhysicalTopN[LOCAL_SORT]
23+
------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username]
24+
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.username = t2.username)) otherCondition=() build RFs:RF0 username->[username];RF1 username->[username]
25+
----------------PhysicalProject[t1.username]
26+
------------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
27+
----------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username]
28+
------------------filter((t2.user_id > 0))
29+
--------------------PhysicalLazyMaterializeOlapScan[t2 lazySlots:(t2.age,t2.addr)]
30+
31+
-- !exe2 --
32+
1 a 10 cd 1 a 10 cd
33+
34+
-- !plan_no_effect --
35+
PhysicalResultSink
36+
--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username]
37+
----PhysicalLazyMaterialize[materializedSlots:(t1.user_id) lazySlots:(t1.addr,t1.age,t1.username)]
38+
------PhysicalTopN[MERGE_SORT]
39+
--------PhysicalDistribute[DistributionSpecGather]
40+
----------PhysicalTopN[LOCAL_SORT]
41+
------------filter((t1.user_id > 0))
42+
--------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.username,t1.age,t1.addr)]
43+
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("topNLazyMaterializationUsingIndex.groovy") {
19+
sql """
20+
drop table if exists t1;
21+
CREATE TABLE t1
22+
(
23+
`user_id` LARGEINT NOT NULL,
24+
`username` VARCHAR(50) NOT NULL,
25+
age int,
26+
addr VARCHAR(50) NOT NULL
27+
)
28+
duplicate KEY(user_id, username)
29+
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
30+
PROPERTIES (
31+
"replication_allocation" = "tag.location.default: 1");
32+
33+
insert into t1 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq');
34+
35+
36+
drop table if exists t2;
37+
CREATE TABLE t2
38+
(
39+
`user_id` LARGEINT NOT NULL,
40+
`username` VARCHAR(50) NOT NULL,
41+
age int,
42+
addr VARCHAR(50) NOT NULL
43+
)
44+
duplicate KEY(user_id, username)
45+
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
46+
PROPERTIES (
47+
"replication_allocation" = "tag.location.default: 1");
48+
49+
insert into t2 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq');
50+
51+
set topn_lazy_materialization_using_index = true;
52+
SET detail_shape_nodes='PhysicalProject';
53+
"""
54+
qt_plan """
55+
explain shape plan
56+
select * from t1 where user_id = 1 order by username limit 1;
57+
"""
58+
qt_exec """
59+
select * from t1 where user_id = 1 order by username limit 1;
60+
"""
61+
62+
qt_plan2 """
63+
explain shape plan
64+
select t2.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1;
65+
"""
66+
67+
qt_exe2 """
68+
select t2.*, t1.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1;
69+
"""
70+
71+
qt_plan_no_effect """
72+
explain shape plan
73+
select * from t1 where
74+
user_id > 0 order by user_id limit 1;
75+
"""
76+
77+
}

0 commit comments

Comments
 (0)