Skip to content

Commit 99e49d8

Browse files
committed
topn lazy materialization supports using index filter
1 parent 6deea82 commit 99e49d8

File tree

8 files changed

+217
-17
lines changed

8 files changed

+217
-17
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,27 +63,20 @@ public Plan visit(Plan plan, CascadesContext context) {
6363
child.accept(this, context);
6464
}
6565

66-
Optional<Slot> opt = checkAllSlotFromChildren(plan);
67-
if (opt.isPresent()) {
68-
List<Slot> childrenOutput = plan.children().stream().flatMap(p -> p.getOutput().stream()).collect(
69-
Collectors.toList());
70-
throw new AnalysisException("A expression contains slot not from children\n"
71-
+ "Slot: " + opt.get() + " Children Output:" + childrenOutput + "\n"
72-
+ "Plan: " + plan.treeString() + "\n");
73-
}
66+
checkAllSlotFromChildren(plan);
7467
return plan;
7568
}
7669

7770
/**
7871
* Check all slot must from children.
7972
*/
80-
public static Optional<Slot> checkAllSlotFromChildren(Plan plan) {
73+
public static void checkAllSlotFromChildren(Plan plan) {
8174
if (plan.arity() == 0) {
82-
return Optional.empty();
75+
return;
8376
}
8477
// agg exist multi-phase
8578
if (plan instanceof Aggregate) {
86-
return Optional.empty();
79+
return;
8780
}
8881

8982
Supplier<BitSet> childrenOutputIds = LazyCompute.of(() -> {
@@ -97,22 +90,20 @@ public static Optional<Slot> checkAllSlotFromChildren(Plan plan) {
9790
});
9891

9992
for (Expression expression : plan.getExpressions()) {
100-
AtomicReference<Slot> invalidSlot = new AtomicReference<>();
10193
expression.anyMatch(e -> {
10294
if (e instanceof Slot) {
10395
Slot slot = (Slot) e;
10496
if (slot.getName().startsWith("mv") || slot instanceof SlotNotFromChildren) {
10597
return false;
10698
}
10799
if (!childrenOutputIds.get().get(slot.getExprId().asInt())) {
108-
invalidSlot.set(slot);
109-
return true;
100+
throw new AnalysisException("A expression contains slot not from children\n"
101+
+ "Slot: " + slot + " Children Output:" + childrenOutputIds.get() + "\n"
102+
+ "Plan: " + plan.treeString() + "\n");
110103
}
111104
}
112105
return false;
113106
});
114-
return Optional.ofNullable(invalidSlot.get());
115107
}
116-
return Optional.empty();
117108
}
118109
}

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import com.google.common.collect.BiMap;
4242
import com.google.common.collect.HashBiMap;
4343
import com.google.common.collect.ImmutableList;
44+
import org.apache.logging.log4j.LogManager;
45+
import org.apache.logging.log4j.Logger;
4446

4547
import java.util.ArrayList;
4648
import java.util.HashMap;
@@ -65,6 +67,7 @@ public class LazyMaterializeTopN extends PlanPostProcessor {
6567
when we create materializeNode for the first union child, set hasMaterialized=true
6668
to avoid generating materializeNode for other union's children
6769
*/
70+
private static final Logger LOG = LogManager.getLogger(LazyMaterializeTopN.class);
6871
private boolean hasMaterialized = false;
6972

7073
@Override
@@ -77,6 +80,7 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
7780
}
7881
return result;
7982
} catch (Exception e) {
83+
LOG.warn("lazy materialize topn failed", e);
8084
return topN;
8185
}
8286
}

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
2828
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
2929
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
30+
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
3031
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
3132
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeFileScan;
3233
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
@@ -39,13 +40,17 @@
3940
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
4041
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
4142
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
43+
import org.apache.doris.qe.SessionVariable;
4244

4345
import com.google.common.collect.ImmutableList;
46+
import com.google.common.collect.Lists;
4447

4548
import java.util.ArrayList;
4649
import java.util.HashMap;
4750
import java.util.List;
4851
import java.util.Map;
52+
import java.util.Optional;
53+
import java.util.stream.Collectors;
4954

5055
/**
5156
* prune lazy materialized slot
@@ -98,6 +103,48 @@ public Plan visit(Plan plan, Context context) {
98103
return plan;
99104
}
100105

106+
@Override
107+
public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Context context) {
108+
if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) {
109+
/**
110+
* materialization(materializedSlots=[a, b], lazy=[c])
111+
* ->topn(b)
112+
* ->filter(a=1, output=(rowid, a, b))
113+
* ->materializeOlapScan(rowid, lazy=[c], T[a,b,c])
114+
* =>
115+
* materialization(materializedSlots=[b], lazy=[a, c])
116+
* ->topn(b)
117+
* ->project(rowid, b)
118+
* ->filter(a=1, output=(rowid, a, b))
119+
* ->materializeOlapScan(rowid, lazy=[a,c], T[a,b,c])
120+
*/
121+
List<Slot> lazySlotsToScan = new ArrayList<>();
122+
boolean lazySlotsChanged = false;
123+
124+
for (Slot slot : context.lazySlots) {
125+
if (!filter.getInputSlots().contains(slot)) {
126+
lazySlotsToScan.add(slot);
127+
} else {
128+
lazySlotsChanged = true;
129+
}
130+
}
131+
if (lazySlotsChanged) {
132+
Context contextForScan = context.withLazySlots(lazySlotsToScan);
133+
filter = (PhysicalFilter<? extends Plan>) filter.withChildren(
134+
filter.child().accept(this, contextForScan));
135+
filter = (PhysicalFilter<? extends Plan>) filter
136+
.copyStatsAndGroupIdFrom(filter).resetLogicalProperties();
137+
List<Slot> filterOutput = Lists.newArrayList(filter.getOutput());
138+
filterOutput.removeAll(filter.getInputSlots());
139+
return new PhysicalProject<>(
140+
filterOutput.stream().map(s -> (SlotReference) s).collect(Collectors.toList()),
141+
Optional.empty(), null,
142+
filter.getPhysicalProperties(), filter.getStats(), filter);
143+
}
144+
}
145+
return visit(filter, context);
146+
}
147+
101148
@Override
102149
public Plan visitPhysicalOlapScan(PhysicalOlapScan scan, Context context) {
103150
if (scan.getOutput().containsAll(context.lazySlots)) {

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
import org.apache.doris.nereids.trees.expressions.NamedExpression;
2929
import org.apache.doris.nereids.trees.expressions.SlotReference;
3030
import org.apache.doris.nereids.trees.plans.Plan;
31+
import org.apache.doris.nereids.trees.plans.algebra.Relation;
3132
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
33+
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
3234
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
3335
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
3436
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
3537
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
3638
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
3739
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
40+
import org.apache.doris.qe.SessionVariable;
3841

3942
import com.google.common.collect.ImmutableSet;
4043
import org.apache.logging.log4j.LogManager;
@@ -43,6 +46,7 @@
4346
import java.util.Map;
4447
import java.util.Optional;
4548
import java.util.Set;
49+
import java.util.stream.Collectors;
4650

4751
/**
4852
* visitor to probe the slots which can perform lazy materialization
@@ -71,6 +75,24 @@ public ProbeContext(SlotReference slot) {
7175
}
7276
}
7377

78+
@Override
79+
public Optional<MaterializeSource> visitPhysicalFilter(PhysicalFilter<? extends Plan> filter,
80+
ProbeContext context) {
81+
if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) {
82+
// agg table do not support lazy materialize
83+
OlapTable table = ((PhysicalOlapScan) filter.child()).getTable();
84+
if (KeysType.AGG_KEYS.equals(table.getKeysType())) {
85+
return Optional.empty();
86+
}
87+
if (filter.getInputSlots().contains(context.slot)) {
88+
return Optional.of(new MaterializeSource((Relation) filter.child(), context.slot));
89+
} else {
90+
return filter.child().accept(this, context);
91+
}
92+
}
93+
return this.visit(filter, context);
94+
}
95+
7496
@Override
7597
public Optional<MaterializeSource> visit(Plan plan, ProbeContext context) {
7698
if (plan.getInputSlots().contains(context.slot)) {
@@ -195,7 +217,7 @@ public Optional<MaterializeSource> visitPhysicalProject(
195217
} else {
196218
// projectExpr is alias
197219
Alias alias = (Alias) projectExpr;
198-
if (alias.child() instanceof SlotReference) {
220+
if (alias.child() instanceof SlotReference && !SessionVariable.getTopNLazyMaterializationUsingIndex()) {
199221
ProbeContext childContext = new ProbeContext((SlotReference) alias.child());
200222
return project.child().accept(this, childContext);
201223
} else {

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.doris.nereids.trees.expressions.SlotReference;
2121
import org.apache.doris.nereids.trees.plans.algebra.Relation;
2222

23+
import java.util.ArrayList;
24+
import java.util.List;
25+
2326
/**
2427
the table and slot used to do lazy materialize
2528
*/

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,6 +1598,11 @@ public enum IgnoreSplitType {
15981598
varType = VariableAnnotation.EXPERIMENTAL)
15991599
public int topNLazyMaterializationThreshold = 1024;
16001600

1601+
@VariableMgr.VarAttr(name = "topn_lazy_materialization_using_index", needForward = true,
1602+
fuzzy = false,
1603+
varType = VariableAnnotation.EXPERIMENTAL)
1604+
public boolean topNLazyMaterializationUsingIndex = false;
1605+
16011606
@VariableMgr.VarAttr(name = ENABLE_PRUNE_NESTED_COLUMN, needForward = true,
16021607
fuzzy = false,
16031608
varType = VariableAnnotation.EXPERIMENTAL,
@@ -1618,6 +1623,14 @@ public static int getTopNLazyMaterializationThreshold() {
16181623
}
16191624
}
16201625

1626+
public static boolean getTopNLazyMaterializationUsingIndex() {
1627+
if (ConnectContext.get() != null) {
1628+
return ConnectContext.get().getSessionVariable().topNLazyMaterializationUsingIndex;
1629+
} else {
1630+
return VariableMgr.getDefaultSessionVariable().topNLazyMaterializationUsingIndex;
1631+
}
1632+
}
1633+
16211634
@VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT, needForward = true)
16221635
private boolean disableInvertedIndexV1ForVaraint = true;
16231636

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !plan --
3+
PhysicalResultSink
4+
--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username]
5+
----PhysicalLazyMaterialize[materializedSlots:(t1.username) lazySlots:(t1.addr,t1.age,t1.user_id)]
6+
------PhysicalTopN[MERGE_SORT]
7+
--------PhysicalDistribute[DistributionSpecGather]
8+
----------PhysicalTopN[LOCAL_SORT]
9+
------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t1, t1.username]
10+
--------------filter((t1.user_id = 1))
11+
----------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.age,t1.addr)]
12+
13+
-- !exec --
14+
1 a 10 cd
15+
16+
-- !plan2 --
17+
PhysicalResultSink
18+
--PhysicalProject[t2.addr, t2.age, t2.user_id, t2.username]
19+
----PhysicalLazyMaterialize[materializedSlots:(t2.username) lazySlots:(t2.addr,t2.age,t2.user_id)]
20+
------PhysicalTopN[MERGE_SORT]
21+
--------PhysicalDistribute[DistributionSpecGather]
22+
----------PhysicalTopN[LOCAL_SORT]
23+
------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username]
24+
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.username = t2.username)) otherCondition=() build RFs:RF0 username->[username];RF1 username->[username]
25+
----------------PhysicalProject[t1.username]
26+
------------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
27+
----------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username]
28+
------------------filter((t2.user_id > 0))
29+
--------------------PhysicalLazyMaterializeOlapScan[t2 lazySlots:(t2.age,t2.addr)]
30+
31+
-- !exe2 --
32+
1 a 10 cd 1 a 10 cd
33+
34+
-- !plan_no_effect --
35+
PhysicalResultSink
36+
--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username]
37+
----PhysicalLazyMaterialize[materializedSlots:(t1.user_id) lazySlots:(t1.addr,t1.age,t1.username)]
38+
------PhysicalTopN[MERGE_SORT]
39+
--------PhysicalDistribute[DistributionSpecGather]
40+
----------PhysicalTopN[LOCAL_SORT]
41+
------------filter((t1.user_id > 0))
42+
--------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.username,t1.age,t1.addr)]
43+
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("topNLazyMaterializationUsingIndex.groovy") {
19+
sql """
20+
drop table if exists t1;
21+
CREATE TABLE t1
22+
(
23+
`user_id` LARGEINT NOT NULL,
24+
`username` VARCHAR(50) NOT NULL,
25+
age int,
26+
addr VARCHAR(50) NOT NULL
27+
)
28+
duplicate KEY(user_id, username)
29+
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
30+
PROPERTIES (
31+
"replication_allocation" = "tag.location.default: 1");
32+
33+
insert into t1 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq');
34+
35+
36+
drop table if exists t2;
37+
CREATE TABLE t2
38+
(
39+
`user_id` LARGEINT NOT NULL,
40+
`username` VARCHAR(50) NOT NULL,
41+
age int,
42+
addr VARCHAR(50) NOT NULL
43+
)
44+
duplicate KEY(user_id, username)
45+
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
46+
PROPERTIES (
47+
"replication_allocation" = "tag.location.default: 1");
48+
49+
insert into t2 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq');
50+
51+
set topn_lazy_materialization_using_index = true;
52+
SET detail_shape_nodes='PhysicalProject';
53+
"""
54+
qt_plan """
55+
explain shape plan
56+
select * from t1 where user_id = 1 order by username limit 1;
57+
"""
58+
qt_exec """
59+
select * from t1 where user_id = 1 order by username limit 1;
60+
"""
61+
62+
qt_plan2 """
63+
explain shape plan
64+
select t2.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1;
65+
"""
66+
67+
qt_exe2 """
68+
select t2.*, t1.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1;
69+
"""
70+
71+
qt_plan_no_effect """
72+
explain shape plan
73+
select * from t1 where
74+
user_id > 0 order by user_id limit 1;
75+
"""
76+
77+
}

0 commit comments

Comments
 (0)