Skip to content

Commit 9b19f4a

Browse files
committed
Add experimental planner support for sort merge join
When enabled, the query planner prefers sort merge join for eligible joins. A SortNode is added if input is not already sorted. This can be enabled with the ``prefer_sort_merge_join`` session property (hidden) or the ``experimental.optimizer.prefer-sort-merge-join`` configuration property.
1 parent 8392ce7 commit 9b19f4a

File tree

8 files changed

+304
-1
lines changed

8 files changed

+304
-1
lines changed

presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ public final class SystemSessionProperties
255255
public static final String MAX_STAGE_COUNT_FOR_EAGER_SCHEDULING = "max_stage_count_for_eager_scheduling";
256256
public static final String HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD = "hyperloglog_standard_error_warning_threshold";
257257
public static final String PREFER_MERGE_JOIN_FOR_SORTED_INPUTS = "prefer_merge_join_for_sorted_inputs";
258+
public static final String PREFER_SORT_MERGE_JOIN = "prefer_sort_merge_join";
258259
public static final String SEGMENTED_AGGREGATION_ENABLED = "segmented_aggregation_enabled";
259260
public static final String USE_HISTORY_BASED_PLAN_STATISTICS = "use_history_based_plan_statistics";
260261
public static final String TRACK_HISTORY_BASED_PLAN_STATISTICS = "track_history_based_plan_statistics";
@@ -1386,6 +1387,11 @@ public SystemSessionProperties(
13861387
"To make it work, the connector needs to guarantee and expose the data properties of the underlying table.",
13871388
featuresConfig.isPreferMergeJoinForSortedInputs(),
13881389
true),
1390+
booleanProperty(
1391+
PREFER_SORT_MERGE_JOIN,
1392+
"Prefer sort merge join for all joins. A SortNode is added if input is not already sorted.",
1393+
featuresConfig.isPreferSortMergeJoin(),
1394+
true),
13891395
booleanProperty(
13901396
SEGMENTED_AGGREGATION_ENABLED,
13911397
"Enable segmented aggregation.",
@@ -2881,6 +2887,11 @@ public static boolean preferMergeJoinForSortedInputs(Session session)
28812887
return session.getSystemProperty(PREFER_MERGE_JOIN_FOR_SORTED_INPUTS, Boolean.class);
28822888
}
28832889

2890+
public static boolean preferSortMergeJoin(Session session)
2891+
{
2892+
return session.getSystemProperty(PREFER_SORT_MERGE_JOIN, Boolean.class);
2893+
}
2894+
28842895
public static boolean isSegmentedAggregationEnabled(Session session)
28852896
{
28862897
return session.getSystemProperty(SEGMENTED_AGGREGATION_ENABLED, Boolean.class);

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ public class FeaturesConfig
226226

227227
private boolean streamingForPartialAggregationEnabled;
228228
private boolean preferMergeJoinForSortedInputs;
229+
private boolean preferSortMergeJoin;
229230
private boolean segmentedAggregationEnabled;
230231

231232
private int maxStageCountForEagerScheduling = 25;
@@ -2232,6 +2233,19 @@ public FeaturesConfig setPreferMergeJoinForSortedInputs(boolean preferMergeJoinF
22322233
return this;
22332234
}
22342235

2236+
public boolean isPreferSortMergeJoin()
2237+
{
2238+
return preferSortMergeJoin;
2239+
}
2240+
2241+
@Config("experimental.optimizer.prefer-sort-merge-join")
2242+
@ConfigDescription("Prefer sort merge join for all joins. A SortNode is added if input is not already sorted.")
2243+
public FeaturesConfig setPreferSortMergeJoin(boolean preferSortMergeJoin)
2244+
{
2245+
this.preferSortMergeJoin = preferSortMergeJoin;
2246+
return this;
2247+
}
2248+
22352249
public boolean isSegmentedAggregationEnabled()
22362250
{
22372251
return segmentedAggregationEnabled;
@@ -2970,6 +2984,7 @@ public boolean isInEqualityJoinPushdownEnabled()
29702984
{
29712985
return inEqualityJoinPushdownEnabled;
29722986
}
2987+
29732988
public boolean isPrestoSparkExecutionEnvironment()
29742989
{
29752990
return prestoSparkExecutionEnvironment;

presto-main-base/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
4242
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
43+
import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin;
4344
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PLAN_ERROR;
4445
import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT;
4546
import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_REWINDABLE_SPLIT_SOURCE;
@@ -161,6 +162,10 @@ public GroupedExecutionTagger.GroupedExecutionProperties visitMergeJoin(MergeJoi
161162
left.totalLifespans,
162163
left.recoveryEligible && right.recoveryEligible);
163164
}
165+
if (preferSortMergeJoin(session)) {
166+
// TODO: This will break the other use case for merge join operating on sorted tables, which requires grouped execution for correctness.
167+
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
168+
}
164169
throw new PrestoException(
165170
INVALID_PLAN_ERROR,
166171
format("When grouped execution can't be enabled, merge join plan is not valid." +

presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@
186186
import com.facebook.presto.sql.planner.optimizations.SetFlatteningOptimizer;
187187
import com.facebook.presto.sql.planner.optimizations.ShardJoins;
188188
import com.facebook.presto.sql.planner.optimizations.SimplifyPlanWithEmptyInput;
189+
import com.facebook.presto.sql.planner.optimizations.SortMergeJoinOptimizer;
189190
import com.facebook.presto.sql.planner.optimizations.StatsRecordingPlanOptimizer;
190191
import com.facebook.presto.sql.planner.optimizations.TransformQuantifiedComparisonApplyToLateralJoin;
191192
import com.facebook.presto.sql.planner.optimizations.UnaliasSymbolReferences;
@@ -927,7 +928,8 @@ public PlanOptimizers(
927928
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
928929
// Should be placed after AddExchanges, but before AddLocalExchange
929930
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
930-
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));
931+
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()),
932+
new SortMergeJoinOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));
931933

932934
// Optimizers above this don't understand local exchanges, so be careful moving this.
933935
builder.add(new AddLocalExchanges(metadata, featuresConfig.isNativeExecutionEnabled()));

presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.facebook.presto.spi.plan.JoinNode;
3030
import com.facebook.presto.spi.plan.LimitNode;
3131
import com.facebook.presto.spi.plan.MarkDistinctNode;
32+
import com.facebook.presto.spi.plan.MergeJoinNode;
3233
import com.facebook.presto.spi.plan.OrderingScheme;
3334
import com.facebook.presto.spi.plan.OutputNode;
3435
import com.facebook.presto.spi.plan.Partitioning;
@@ -78,6 +79,7 @@
7879
import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled;
7980
import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled;
8081
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
82+
import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin;
8183
import static com.facebook.presto.common.type.BigintType.BIGINT;
8284
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
8385
import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference;
@@ -887,6 +889,17 @@ public PlanWithProperties visitSpatialJoin(SpatialJoinNode node, StreamPreferred
887889
return rebaseAndDeriveProperties(node, ImmutableList.of(probe, build));
888890
}
889891

892+
@Override
893+
public PlanWithProperties visitMergeJoin(MergeJoinNode node, StreamPreferredProperties parentPreferences)
894+
{
895+
if (preferSortMergeJoin(session)) {
896+
PlanWithProperties probe = planAndEnforce(node.getLeft(), singleStream(), singleStream());
897+
PlanWithProperties build = planAndEnforce(node.getRight(), singleStream(), singleStream());
898+
return rebaseAndDeriveProperties(node, ImmutableList.of(probe, build));
899+
}
900+
return super.visitMergeJoin(node, parentPreferences);
901+
}
902+
890903
@Override
891904
public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProperties parentPreferences)
892905
{
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.planner.optimizations;
15+
16+
import com.facebook.presto.Session;
17+
import com.facebook.presto.metadata.Metadata;
18+
import com.facebook.presto.spi.VariableAllocator;
19+
import com.facebook.presto.spi.WarningCollector;
20+
import com.facebook.presto.spi.plan.EquiJoinClause;
21+
import com.facebook.presto.spi.plan.JoinNode;
22+
import com.facebook.presto.spi.plan.JoinType;
23+
import com.facebook.presto.spi.plan.MergeJoinNode;
24+
import com.facebook.presto.spi.plan.Ordering;
25+
import com.facebook.presto.spi.plan.OrderingScheme;
26+
import com.facebook.presto.spi.plan.PlanNode;
27+
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
28+
import com.facebook.presto.spi.plan.SortNode;
29+
import com.facebook.presto.spi.relation.VariableReferenceExpression;
30+
import com.facebook.presto.sql.planner.TypeProvider;
31+
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
32+
import com.google.common.collect.ImmutableList;
33+
34+
import java.util.List;
35+
import java.util.Optional;
36+
37+
import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin;
38+
import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST;
39+
import static com.google.common.collect.ImmutableList.toImmutableList;
40+
import static java.util.Objects.requireNonNull;
41+
42+
public class SortMergeJoinOptimizer
43+
implements PlanOptimizer
44+
{
45+
private final Metadata metadata;
46+
private final boolean nativeExecution;
47+
private boolean isEnabledForTesting;
48+
49+
public SortMergeJoinOptimizer(Metadata metadata, boolean nativeExecution)
50+
{
51+
this.metadata = requireNonNull(metadata, "metadata is null");
52+
this.nativeExecution = nativeExecution;
53+
}
54+
55+
@Override
56+
public void setEnabledForTesting(boolean isSet)
57+
{
58+
isEnabledForTesting = isSet;
59+
}
60+
61+
@Override
62+
public boolean isEnabled(Session session)
63+
{
64+
// TODO: Consider group execution and single node execution.
65+
return isEnabledForTesting || preferSortMergeJoin(session);
66+
}
67+
68+
@Override
69+
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider type, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
70+
{
71+
requireNonNull(plan, "plan is null");
72+
requireNonNull(session, "session is null");
73+
requireNonNull(variableAllocator, "variableAllocator is null");
74+
requireNonNull(idAllocator, "idAllocator is null");
75+
76+
if (isEnabled(session)) {
77+
Rewriter rewriter = new SortMergeJoinOptimizer.Rewriter(idAllocator, metadata, session);
78+
PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null);
79+
return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged());
80+
}
81+
return PlanOptimizerResult.optimizerResult(plan, false);
82+
}
83+
84+
/**
85+
* @param joinNode
86+
* @return returns true if merge join is supported for the given join node.
87+
*/
88+
public boolean isMergeJoinEligible(JoinNode joinNode)
89+
{
90+
return (joinNode.getType() == JoinType.INNER || joinNode.getType() == JoinType.LEFT || joinNode.getType() == JoinType.RIGHT)
91+
&& !joinNode.isCrossJoin();
92+
}
93+
94+
private class Rewriter
95+
extends SimplePlanRewriter<Void>
96+
{
97+
private final PlanNodeIdAllocator idAllocator;
98+
private final Metadata metadata;
99+
private final Session session;
100+
private boolean planChanged;
101+
102+
private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session)
103+
{
104+
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
105+
this.metadata = requireNonNull(metadata, "metadata is null");
106+
this.session = requireNonNull(session, "session is null");
107+
}
108+
109+
public boolean isPlanChanged()
110+
{
111+
return planChanged;
112+
}
113+
114+
@Override
115+
public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
116+
{
117+
if (!isMergeJoinEligible(node)) {
118+
return node;
119+
}
120+
121+
PlanNode left = node.getLeft();
122+
PlanNode right = node.getRight();
123+
124+
List<VariableReferenceExpression> leftJoinColumns = node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList());
125+
126+
if (!isPlanOutputSortedByColumns(left, leftJoinColumns)) {
127+
List<Ordering> leftOrdering = node.getCriteria().stream()
128+
.map(criterion -> new Ordering(criterion.getLeft(), ASC_NULLS_FIRST))
129+
.collect(toImmutableList());
130+
left = new SortNode(
131+
Optional.empty(),
132+
idAllocator.getNextId(),
133+
left,
134+
new OrderingScheme(leftOrdering),
135+
true,
136+
ImmutableList.of());
137+
}
138+
139+
List<VariableReferenceExpression> rightJoinColumns = node.getCriteria().stream()
140+
.map(EquiJoinClause::getRight)
141+
.collect(toImmutableList());
142+
if (!isPlanOutputSortedByColumns(right, rightJoinColumns)) {
143+
List<Ordering> rightOrdering = node.getCriteria().stream()
144+
.map(criterion -> new Ordering(criterion.getRight(), ASC_NULLS_FIRST))
145+
.collect(toImmutableList());
146+
right = new SortNode(
147+
Optional.empty(),
148+
idAllocator.getNextId(),
149+
right,
150+
new OrderingScheme(rightOrdering),
151+
true,
152+
ImmutableList.of());
153+
}
154+
155+
planChanged = true;
156+
return new MergeJoinNode(
157+
Optional.empty(),
158+
node.getId(),
159+
node.getType(),
160+
left,
161+
right,
162+
node.getCriteria(),
163+
node.getOutputVariables(),
164+
node.getFilter(),
165+
node.getLeftHashVariable(),
166+
node.getRightHashVariable());
167+
}
168+
169+
private boolean isPlanOutputSortedByColumns(PlanNode plan, List<VariableReferenceExpression> columns)
170+
{
171+
StreamPropertyDerivations.StreamProperties properties = StreamPropertyDerivations.derivePropertiesRecursively(plan, metadata, session, nativeExecution);
172+
173+
// Check if partitioning columns (bucketed-by columns [B]) are a subset of join columns [J]
174+
// B = subset (J)
175+
if (!verifyStreamProperties(properties, columns)) {
176+
return false;
177+
}
178+
179+
// Check if the output of the subplan is ordered by the join columns
180+
return !LocalProperties.match(properties.getLocalProperties(), LocalProperties.sorted(columns, ASC_NULLS_FIRST)).get(0).isPresent();
181+
}
182+
183+
private boolean verifyStreamProperties(StreamPropertyDerivations.StreamProperties streamProperties, List<VariableReferenceExpression> joinColumns)
184+
{
185+
if (!streamProperties.getPartitioningColumns().isPresent()) {
186+
return false;
187+
}
188+
List<VariableReferenceExpression> partitioningColumns = streamProperties.getPartitioningColumns().get();
189+
return partitioningColumns.size() <= joinColumns.size() && joinColumns.containsAll(partitioningColumns);
190+
}
191+
}
192+
}

presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public void testDefaults()
194194
.setMaxStageCountForEagerScheduling(25)
195195
.setHyperloglogStandardErrorWarningThreshold(0.004)
196196
.setPreferMergeJoinForSortedInputs(false)
197+
.setPreferSortMergeJoin(false)
197198
.setSegmentedAggregationEnabled(false)
198199
.setQueryAnalyzerTimeout(new Duration(3, MINUTES))
199200
.setQuickDistinctLimitEnabled(false)
@@ -408,6 +409,7 @@ public void testExplicitPropertyMappings()
408409
.put("execution-policy.max-stage-count-for-eager-scheduling", "123")
409410
.put("hyperloglog-standard-error-warning-threshold", "0.02")
410411
.put("optimizer.prefer-merge-join-for-sorted-inputs", "true")
412+
.put("experimental.optimizer.prefer-sort-merge-join", "true")
411413
.put("optimizer.segmented-aggregation-enabled", "true")
412414
.put("planner.query-analyzer-timeout", "10s")
413415
.put("optimizer.quick-distinct-limit-enabled", "true")
@@ -619,6 +621,7 @@ public void testExplicitPropertyMappings()
619621
.setMaxStageCountForEagerScheduling(123)
620622
.setHyperloglogStandardErrorWarningThreshold(0.02)
621623
.setPreferMergeJoinForSortedInputs(true)
624+
.setPreferSortMergeJoin(true)
622625
.setSegmentedAggregationEnabled(true)
623626
.setQueryAnalyzerTimeout(new Duration(10, SECONDS))
624627
.setQuickDistinctLimitEnabled(true)

0 commit comments

Comments
 (0)