Skip to content

Commit 9c82b1e

Browse files
committed
Broadcast join if build estimation is small and from HBO
1 parent bd3ff3b commit 9c82b1e

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.facebook.presto.spi.plan.PlanNode;
2727
import com.facebook.presto.spi.plan.TableScanNode;
2828
import com.facebook.presto.spi.plan.ValuesNode;
29+
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
2930
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
3031
import com.facebook.presto.sql.planner.iterative.Lookup;
3132
import com.facebook.presto.sql.planner.iterative.Rule;
@@ -126,7 +127,10 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
126127
if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) {
127128
// TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here.
128129
if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) {
129-
return getOnlyElement(possibleJoinNodes.stream().filter(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED)).map(x -> x.getPlanNode()).collect(toImmutableList()));
130+
JoinNode broadcastJoin = (JoinNode) getOnlyElement(possibleJoinNodes.stream().filter(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED)).map(x -> x.getPlanNode()).collect(toImmutableList()));
131+
if (context.getStatsProvider().getStats(broadcastJoin.getBuild()).getSourceInfo() instanceof HistoryBasedSourceInfo) {
132+
return broadcastJoin;
133+
}
130134
}
131135
if (isSizeBasedJoinDistributionTypeEnabled(context.getSession())) {
132136
return getSizeBasedJoin(joinNode, context);

presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineSemiJoinDistributionType.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.facebook.presto.matching.Captures;
3737
import com.facebook.presto.matching.Pattern;
3838
import com.facebook.presto.spi.plan.PlanNode;
39+
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
3940
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
4041
import com.facebook.presto.sql.planner.iterative.Rule;
4142
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
@@ -48,11 +49,14 @@
4849
import static com.facebook.presto.SystemSessionProperties.getJoinDistributionType;
4950
import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize;
5051
import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled;
52+
import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled;
5153
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput;
5254
import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.getSourceTablesSizeInBytes;
5355
import static com.facebook.presto.sql.planner.plan.Patterns.semiJoin;
5456
import static com.facebook.presto.sql.planner.plan.SemiJoinNode.DistributionType.PARTITIONED;
5557
import static com.facebook.presto.sql.planner.plan.SemiJoinNode.DistributionType.REPLICATED;
58+
import static com.google.common.collect.ImmutableList.toImmutableList;
59+
import static com.google.common.collect.Iterables.getOnlyElement;
5660
import static java.util.Objects.requireNonNull;
5761

5862
/**
@@ -124,6 +128,12 @@ private PlanNode getCostBasedDistributionType(SemiJoinNode node, Context context
124128
possibleJoinNodes.add(getSemiJoinNodeWithCost(node.withDistributionType(PARTITIONED), context));
125129

126130
if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents())) {
131+
if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((SemiJoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) {
132+
SemiJoinNode broadcastJoin = (SemiJoinNode) getOnlyElement(possibleJoinNodes.stream().filter(result -> ((SemiJoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED)).map(x -> x.getPlanNode()).collect(toImmutableList()));
133+
if (context.getStatsProvider().getStats(broadcastJoin.getBuild()).getSourceInfo() instanceof HistoryBasedSourceInfo) {
134+
return broadcastJoin;
135+
}
136+
}
127137
if (isSizeBasedJoinDistributionTypeEnabled(context.getSession())) {
128138
return getSizeBaseDistributionType(node, context);
129139
}
@@ -155,7 +165,7 @@ private boolean canReplicate(SemiJoinNode node, Context context)
155165
PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide);
156166
double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide);
157167
return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes()
158-
|| (isSizeBasedJoinDistributionTypeEnabled(context.getSession())
168+
|| (isSizeBasedJoinDistributionTypeEnabled(context.getSession())
159169
&& getSourceTablesSizeInBytes(buildSide, context) <= joinMaxBroadcastTableSize.toBytes());
160170
}
161171

0 commit comments

Comments
 (0)