diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java index 40777e26faaf76..926e12cd279ee9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java @@ -32,11 +32,7 @@ import com.google.common.base.Preconditions; import java.util.BitSet; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import java.util.stream.Collectors; /** * validator plan. @@ -63,27 +59,20 @@ public Plan visit(Plan plan, CascadesContext context) { child.accept(this, context); } - Optional opt = checkAllSlotFromChildren(plan); - if (opt.isPresent()) { - List childrenOutput = plan.children().stream().flatMap(p -> p.getOutput().stream()).collect( - Collectors.toList()); - throw new AnalysisException("A expression contains slot not from children\n" - + "Slot: " + opt.get() + " Children Output:" + childrenOutput + "\n" - + "Plan: " + plan.treeString() + "\n"); - } + checkAllSlotFromChildren(plan); return plan; } /** * Check all slot must from children. */ - public static Optional checkAllSlotFromChildren(Plan plan) { + public static void checkAllSlotFromChildren(Plan plan) { if (plan.arity() == 0) { - return Optional.empty(); + return; } // agg exist multi-phase if (plan instanceof Aggregate) { - return Optional.empty(); + return; } Supplier childrenOutputIds = LazyCompute.of(() -> { @@ -97,7 +86,6 @@ public static Optional checkAllSlotFromChildren(Plan plan) { }); for (Expression expression : plan.getExpressions()) { - AtomicReference invalidSlot = new AtomicReference<>(); expression.anyMatch(e -> { if (e instanceof Slot) { Slot slot = (Slot) e; @@ -105,14 +93,13 @@ public static Optional checkAllSlotFromChildren(Plan plan) { return false; } if (!childrenOutputIds.get().get(slot.getExprId().asInt())) { - invalidSlot.set(slot); - return true; + throw new AnalysisException("A expression contains slot not from children\n" + + "Slot: " + slot + " Children Output:" + childrenOutputIds.get() + "\n" + + "Plan: " + plan.treeString() + "\n"); } } return false; }); - return Optional.ofNullable(invalidSlot.get()); } - return Optional.empty(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java index ba6425b1cceeab..b3512835bd8d7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java @@ -41,6 +41,8 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; @@ -65,6 +67,7 @@ public class LazyMaterializeTopN extends PlanPostProcessor { when we create materializeNode for the first union child, set hasMaterialized=true to avoid generating materializeNode for other union's children */ + private static final Logger LOG = LogManager.getLogger(LazyMaterializeTopN.class); private boolean hasMaterialized = false; @Override @@ -77,6 +80,7 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { } return result; } catch (Exception e) { + LOG.warn("lazy materialize topn failed", e); return topN; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java index 65b571116470dc..d15766f55a3140 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan; @@ -39,20 +40,24 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; +import org.apache.doris.qe.SessionVariable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; /** - * prune lazy materialized slot + prune lazy materialized slot */ public class LazySlotPruning extends DefaultPlanRewriter { /** - * Context + Context */ public static class Context { private PhysicalRelation scan; @@ -98,6 +103,48 @@ public Plan visit(Plan plan, Context context) { return plan; } + @Override + public Plan visitPhysicalFilter(PhysicalFilter filter, Context context) { + if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) { + /* + materialization(materializedSlots=[a, b], lazy=[c]) + ->topn(b) + ->filter(a=1, output=(rowid, a, b)) + ->materializeOlapScan(rowid, lazy=[c], T[a,b,c]) + => + materialization(materializedSlots=[b], lazy=[a, c]) + ->topn(b) + ->project(rowid, b) + ->filter(a=1, output=(rowid, a, b)) + ->materializeOlapScan(rowid, lazy=[a,c], T[a,b,c]) + */ + List lazySlotsToScan = new ArrayList<>(); + boolean lazySlotsChanged = false; + + for (Slot slot : context.lazySlots) { + if (!filter.getInputSlots().contains(slot)) { + lazySlotsToScan.add(slot); + } else { + lazySlotsChanged = true; + } + } + if (lazySlotsChanged) { + Context contextForScan = context.withLazySlots(lazySlotsToScan); + filter = (PhysicalFilter) filter.withChildren( + filter.child().accept(this, contextForScan)); + filter = (PhysicalFilter) filter + .copyStatsAndGroupIdFrom(filter).resetLogicalProperties(); + List filterOutput = Lists.newArrayList(filter.getOutput()); + filterOutput.removeAll(filter.getInputSlots()); + return new PhysicalProject<>( + filterOutput.stream().map(s -> (SlotReference) s).collect(Collectors.toList()), + Optional.empty(), null, + filter.getPhysicalProperties(), filter.getStats(), filter); + } + } + return visit(filter, context); + } + @Override public Plan visitPhysicalOlapScan(PhysicalOlapScan scan, Context context) { if (scan.getOutput().containsAll(context.lazySlots)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java index c6ff7709a7de2f..b6b4b31d8b6b34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java @@ -28,13 +28,16 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Relation; import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; +import org.apache.doris.qe.SessionVariable; import com.google.common.collect.ImmutableSet; import org.apache.logging.log4j.LogManager; @@ -71,6 +74,24 @@ public ProbeContext(SlotReference slot) { } } + @Override + public Optional visitPhysicalFilter(PhysicalFilter filter, + ProbeContext context) { + if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) { + // agg table do not support lazy materialize + OlapTable table = ((PhysicalOlapScan) filter.child()).getTable(); + if (KeysType.AGG_KEYS.equals(table.getKeysType())) { + return Optional.empty(); + } + if (filter.getInputSlots().contains(context.slot)) { + return Optional.of(new MaterializeSource((Relation) filter.child(), context.slot)); + } else { + return filter.child().accept(this, context); + } + } + return this.visit(filter, context); + } + @Override public Optional visit(Plan plan, ProbeContext context) { if (plan.getInputSlots().contains(context.slot)) { @@ -195,7 +216,7 @@ public Optional visitPhysicalProject( } else { // projectExpr is alias Alias alias = (Alias) projectExpr; - if (alias.child() instanceof SlotReference) { + if (alias.child() instanceof SlotReference && !SessionVariable.getTopNLazyMaterializationUsingIndex()) { ProbeContext childContext = new ProbeContext((SlotReference) alias.child()); return project.child().accept(this, childContext); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 11df0e53eb109c..502f402766c125 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1598,6 +1598,11 @@ public enum IgnoreSplitType { varType = VariableAnnotation.EXPERIMENTAL) public int topNLazyMaterializationThreshold = 1024; + @VariableMgr.VarAttr(name = "topn_lazy_materialization_using_index", needForward = true, + fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL) + public boolean topNLazyMaterializationUsingIndex = false; + @VariableMgr.VarAttr(name = ENABLE_PRUNE_NESTED_COLUMN, needForward = true, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, @@ -1618,6 +1623,14 @@ public static int getTopNLazyMaterializationThreshold() { } } + public static boolean getTopNLazyMaterializationUsingIndex() { + if (ConnectContext.get() != null) { + return ConnectContext.get().getSessionVariable().topNLazyMaterializationUsingIndex; + } else { + return VariableMgr.getDefaultSessionVariable().topNLazyMaterializationUsingIndex; + } + } + @VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT, needForward = true) private boolean disableInvertedIndexV1ForVaraint = true; diff --git a/regression-test/data/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.out b/regression-test/data/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.out new file mode 100644 index 00000000000000..d966e52f11e140 --- /dev/null +++ b/regression-test/data/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !plan -- +PhysicalResultSink +--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username] +----PhysicalLazyMaterialize[materializedSlots:(t1.username) lazySlots:(t1.addr,t1.age,t1.user_id)] +------PhysicalTopN[MERGE_SORT] +--------PhysicalDistribute[DistributionSpecGather] +----------PhysicalTopN[LOCAL_SORT] +------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t1, t1.username] +--------------filter((t1.user_id = 1)) +----------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.age,t1.addr)] + +-- !exec -- +1 a 10 cd + +-- !plan2 -- +PhysicalResultSink +--PhysicalProject[t2.addr, t2.age, t2.user_id, t2.username] +----PhysicalLazyMaterialize[materializedSlots:(t2.username) lazySlots:(t2.addr,t2.age,t2.user_id)] +------PhysicalTopN[MERGE_SORT] +--------PhysicalDistribute[DistributionSpecGather] +----------PhysicalTopN[LOCAL_SORT] +------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username] +--------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.username = t2.username)) otherCondition=() build RFs:RF0 username->[username];RF1 username->[username] +----------------PhysicalProject[t1.username] +------------------PhysicalOlapScan[t1] apply RFs: RF0 RF1 +----------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username] +------------------filter((t2.user_id > 0)) +--------------------PhysicalLazyMaterializeOlapScan[t2 lazySlots:(t2.age,t2.addr)] + +-- !exe2 -- +1 a 10 cd 1 a 10 cd + +-- !plan_no_effect -- +PhysicalResultSink +--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username] +----PhysicalLazyMaterialize[materializedSlots:(t1.user_id) lazySlots:(t1.addr,t1.age,t1.username)] +------PhysicalTopN[MERGE_SORT] +--------PhysicalDistribute[DistributionSpecGather] +----------PhysicalTopN[LOCAL_SORT] +------------filter((t1.user_id > 0)) +--------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.username,t1.age,t1.addr)] + diff --git a/regression-test/suites/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.groovy b/regression-test/suites/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.groovy new file mode 100644 index 00000000000000..8b839adfac79ae --- /dev/null +++ b/regression-test/suites/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("topNLazyMaterializationUsingIndex.groovy") { + sql """ + drop table if exists t1; + CREATE TABLE t1 + ( + `user_id` LARGEINT NOT NULL, + `username` VARCHAR(50) NOT NULL, + age int, + addr VARCHAR(50) NOT NULL + ) + duplicate KEY(user_id, username) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + insert into t1 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq'); + + + drop table if exists t2; + CREATE TABLE t2 + ( + `user_id` LARGEINT NOT NULL, + `username` VARCHAR(50) NOT NULL, + age int, + addr VARCHAR(50) NOT NULL + ) + duplicate KEY(user_id, username) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + insert into t2 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq'); + + set topn_lazy_materialization_using_index = true; + SET detail_shape_nodes='PhysicalProject'; + """ + qt_plan """ + explain shape plan + select * from t1 where user_id = 1 order by username limit 1; + """ + qt_exec """ + select * from t1 where user_id = 1 order by username limit 1; + """ + + qt_plan2 """ + explain shape plan + select t2.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1; + """ + + qt_exe2 """ + select t2.*, t1.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1; + """ + + qt_plan_no_effect """ + explain shape plan + select * from t1 where + user_id > 0 order by user_id limit 1; + """ + +}