Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@
import com.google.common.base.Preconditions;

import java.util.BitSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* validator plan.
Expand All @@ -63,27 +59,20 @@ public Plan visit(Plan plan, CascadesContext context) {
child.accept(this, context);
}

Optional<Slot> opt = checkAllSlotFromChildren(plan);
if (opt.isPresent()) {
List<Slot> childrenOutput = plan.children().stream().flatMap(p -> p.getOutput().stream()).collect(
Collectors.toList());
throw new AnalysisException("A expression contains slot not from children\n"
+ "Slot: " + opt.get() + " Children Output:" + childrenOutput + "\n"
+ "Plan: " + plan.treeString() + "\n");
}
checkAllSlotFromChildren(plan);
return plan;
}

/**
* Check all slot must from children.
*/
public static Optional<Slot> checkAllSlotFromChildren(Plan plan) {
public static void checkAllSlotFromChildren(Plan plan) {
if (plan.arity() == 0) {
return Optional.empty();
return;
}
// agg exist multi-phase
if (plan instanceof Aggregate) {
return Optional.empty();
return;
}

Supplier<BitSet> childrenOutputIds = LazyCompute.of(() -> {
Expand All @@ -97,22 +86,20 @@ public static Optional<Slot> checkAllSlotFromChildren(Plan plan) {
});

for (Expression expression : plan.getExpressions()) {
AtomicReference<Slot> invalidSlot = new AtomicReference<>();
expression.anyMatch(e -> {
if (e instanceof Slot) {
Slot slot = (Slot) e;
if (slot.getName().startsWith("mv") || slot instanceof SlotNotFromChildren) {
return false;
}
if (!childrenOutputIds.get().get(slot.getExprId().asInt())) {
invalidSlot.set(slot);
return true;
throw new AnalysisException("A expression contains slot not from children\n"
+ "Slot: " + slot + " Children Output:" + childrenOutputIds.get() + "\n"
+ "Plan: " + plan.treeString() + "\n");
}
}
return false;
});
return Optional.ofNullable(invalidSlot.get());
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -65,6 +67,7 @@ public class LazyMaterializeTopN extends PlanPostProcessor {
when we create materializeNode for the first union child, set hasMaterialized=true
to avoid generating materializeNode for other union's children
*/
private static final Logger LOG = LogManager.getLogger(LazyMaterializeTopN.class);
private boolean hasMaterialized = false;

@Override
Expand All @@ -77,6 +80,7 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
}
return result;
} catch (Exception e) {
LOG.warn("lazy materialize topn failed", e);
return topN;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
Expand All @@ -39,20 +40,24 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* prune lazy materialized slot
prune lazy materialized slot
*/
public class LazySlotPruning extends DefaultPlanRewriter<LazySlotPruning.Context> {
/**
* Context
Context
*/
public static class Context {
private PhysicalRelation scan;
Expand Down Expand Up @@ -98,6 +103,48 @@ public Plan visit(Plan plan, Context context) {
return plan;
}

@Override
public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Context context) {
if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) {
/*
materialization(materializedSlots=[a, b], lazy=[c])
->topn(b)
->filter(a=1, output=(rowid, a, b))
->materializeOlapScan(rowid, lazy=[c], T[a,b,c])
=>
materialization(materializedSlots=[b], lazy=[a, c])
->topn(b)
->project(rowid, b)
->filter(a=1, output=(rowid, a, b))
->materializeOlapScan(rowid, lazy=[a,c], T[a,b,c])
*/
List<Slot> lazySlotsToScan = new ArrayList<>();
boolean lazySlotsChanged = false;

for (Slot slot : context.lazySlots) {
if (!filter.getInputSlots().contains(slot)) {
lazySlotsToScan.add(slot);
} else {
lazySlotsChanged = true;
}
}
if (lazySlotsChanged) {
Context contextForScan = context.withLazySlots(lazySlotsToScan);
filter = (PhysicalFilter<? extends Plan>) filter.withChildren(
filter.child().accept(this, contextForScan));
filter = (PhysicalFilter<? extends Plan>) filter
.copyStatsAndGroupIdFrom(filter).resetLogicalProperties();
List<Slot> filterOutput = Lists.newArrayList(filter.getOutput());
filterOutput.removeAll(filter.getInputSlots());
return new PhysicalProject<>(
filterOutput.stream().map(s -> (SlotReference) s).collect(Collectors.toList()),
Optional.empty(), null,
filter.getPhysicalProperties(), filter.getStats(), filter);
}
}
return visit(filter, context);
}

@Override
public Plan visitPhysicalOlapScan(PhysicalOlapScan scan, Context context) {
if (scan.getOutput().containsAll(context.lazySlots)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -71,6 +74,24 @@ public ProbeContext(SlotReference slot) {
}
}

@Override
public Optional<MaterializeSource> visitPhysicalFilter(PhysicalFilter<? extends Plan> filter,
ProbeContext context) {
if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) {
// agg table do not support lazy materialize
OlapTable table = ((PhysicalOlapScan) filter.child()).getTable();
if (KeysType.AGG_KEYS.equals(table.getKeysType())) {
return Optional.empty();
}
if (filter.getInputSlots().contains(context.slot)) {
return Optional.of(new MaterializeSource((Relation) filter.child(), context.slot));
} else {
return filter.child().accept(this, context);
}
}
return this.visit(filter, context);
}

@Override
public Optional<MaterializeSource> visit(Plan plan, ProbeContext context) {
if (plan.getInputSlots().contains(context.slot)) {
Expand Down Expand Up @@ -195,7 +216,7 @@ public Optional<MaterializeSource> visitPhysicalProject(
} else {
// projectExpr is alias
Alias alias = (Alias) projectExpr;
if (alias.child() instanceof SlotReference) {
if (alias.child() instanceof SlotReference && !SessionVariable.getTopNLazyMaterializationUsingIndex()) {
ProbeContext childContext = new ProbeContext((SlotReference) alias.child());
return project.child().accept(this, childContext);
} else {
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,11 @@ public enum IgnoreSplitType {
varType = VariableAnnotation.EXPERIMENTAL)
public int topNLazyMaterializationThreshold = 1024;

@VariableMgr.VarAttr(name = "topn_lazy_materialization_using_index", needForward = true,
fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL)
public boolean topNLazyMaterializationUsingIndex = false;

@VariableMgr.VarAttr(name = ENABLE_PRUNE_NESTED_COLUMN, needForward = true,
fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL,
Expand All @@ -1618,6 +1623,14 @@ public static int getTopNLazyMaterializationThreshold() {
}
}

public static boolean getTopNLazyMaterializationUsingIndex() {
if (ConnectContext.get() != null) {
return ConnectContext.get().getSessionVariable().topNLazyMaterializationUsingIndex;
} else {
return VariableMgr.getDefaultSessionVariable().topNLazyMaterializationUsingIndex;
}
}

@VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT, needForward = true)
private boolean disableInvertedIndexV1ForVaraint = true;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !plan --
PhysicalResultSink
--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username]
----PhysicalLazyMaterialize[materializedSlots:(t1.username) lazySlots:(t1.addr,t1.age,t1.user_id)]
------PhysicalTopN[MERGE_SORT]
--------PhysicalDistribute[DistributionSpecGather]
----------PhysicalTopN[LOCAL_SORT]
------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t1, t1.username]
--------------filter((t1.user_id = 1))
----------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.age,t1.addr)]

-- !exec --
1 a 10 cd

-- !plan2 --
PhysicalResultSink
--PhysicalProject[t2.addr, t2.age, t2.user_id, t2.username]
----PhysicalLazyMaterialize[materializedSlots:(t2.username) lazySlots:(t2.addr,t2.age,t2.user_id)]
------PhysicalTopN[MERGE_SORT]
--------PhysicalDistribute[DistributionSpecGather]
----------PhysicalTopN[LOCAL_SORT]
------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username]
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.username = t2.username)) otherCondition=() build RFs:RF0 username->[username];RF1 username->[username]
----------------PhysicalProject[t1.username]
------------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
----------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username]
------------------filter((t2.user_id > 0))
--------------------PhysicalLazyMaterializeOlapScan[t2 lazySlots:(t2.age,t2.addr)]

-- !exe2 --
1 a 10 cd 1 a 10 cd

-- !plan_no_effect --
PhysicalResultSink
--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username]
----PhysicalLazyMaterialize[materializedSlots:(t1.user_id) lazySlots:(t1.addr,t1.age,t1.username)]
------PhysicalTopN[MERGE_SORT]
--------PhysicalDistribute[DistributionSpecGather]
----------PhysicalTopN[LOCAL_SORT]
------------filter((t1.user_id > 0))
--------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.username,t1.age,t1.addr)]

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("topNLazyMaterializationUsingIndex.groovy") {
sql """
drop table if exists t1;
CREATE TABLE t1
(
`user_id` LARGEINT NOT NULL,
`username` VARCHAR(50) NOT NULL,
age int,
addr VARCHAR(50) NOT NULL
)
duplicate KEY(user_id, username)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");

insert into t1 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq');


drop table if exists t2;
CREATE TABLE t2
(
`user_id` LARGEINT NOT NULL,
`username` VARCHAR(50) NOT NULL,
age int,
addr VARCHAR(50) NOT NULL
)
duplicate KEY(user_id, username)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");

insert into t2 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq');

set topn_lazy_materialization_using_index = true;
SET detail_shape_nodes='PhysicalProject';
"""
qt_plan """
explain shape plan
select * from t1 where user_id = 1 order by username limit 1;
"""
qt_exec """
select * from t1 where user_id = 1 order by username limit 1;
"""

qt_plan2 """
explain shape plan
select t2.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1;
"""

qt_exe2 """
select t2.*, t1.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1;
"""

qt_plan_no_effect """
explain shape plan
select * from t1 where
user_id > 0 order by user_id limit 1;
"""

}