Skip to content

Commit 56279fc

Browse files
authored
fix: Improve sort merge join rule to do merge join when one sided is sorted (#26361)
## Description The current [MergeJoinForSortedInputOptimizer](https://github.com/prestodb/presto/blob/master/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java) rule does sort merge join only when both inputs are sorted. In this PR, I made the following changes: * Make the optimizer to work only for native execution, as java does not have merge join support in worker side * Have the optimizer to work for all types of joins rather than just inner join, as velox supports all types of merge joins * Do sort merge join if one side is sorted, and add sort on the other side, when the query is running in presto on spark * In `GroupedExecutionTagger`, do not fail for merge join node when grouped execution is not available for presto on spark. This is because presto on spark spawn as many tasks as the number of partitions of data. When one side is bucketed, the other side even not bucketed, the join will get as many tasks as the number of buckets, hence still equivalent to a bucket by bucket execution ## Motivation and Context Sort merge join for more cases ## Impact Improve performance ## Test Plan Unit tests, and local end to end tests ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. - [ ] If adding new dependencies, verified they have an [OpenSSF Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or higher (or obtained explicit TSC approval for lower scores). ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == General Changes * Improve ``MergeJoinForSortedInputOptimizer`` to do sort merge join when one side of the input is sorted ```
1 parent dd96fe1 commit 56279fc

File tree

10 files changed

+528
-71
lines changed

10 files changed

+528
-71
lines changed

presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.presto.Session;
1717
import com.facebook.presto.spi.plan.JoinType;
18+
import com.facebook.presto.sql.analyzer.FeaturesConfig;
1819
import com.facebook.presto.sql.planner.assertions.PlanMatchPattern;
1920
import com.facebook.presto.testing.QueryRunner;
2021
import com.facebook.presto.tests.AbstractTestQueryFramework;
@@ -56,6 +57,12 @@ protected QueryRunner createQueryRunner()
5657
Optional.empty());
5758
}
5859

60+
@Override
61+
protected FeaturesConfig createFeaturesConfig()
62+
{
63+
return new FeaturesConfig().setNativeExecutionEnabled(true);
64+
}
65+
5966
@Test
6067
public void testJoinType()
6168
{
@@ -83,19 +90,19 @@ public void testJoinType()
8390
assertPlan(
8491
mergeJoinEnabled(),
8592
"select * from test_join_customer_join_type left join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
86-
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, false));
93+
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, true));
8794

8895
// Right join
8996
assertPlan(
9097
mergeJoinEnabled(),
9198
"select * from test_join_customer_join_type right join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
92-
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, false));
99+
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, true));
93100

94101
// Outer join
95102
assertPlan(
96103
mergeJoinEnabled(),
97104
"select * from test_join_customer_join_type full join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
98-
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, false));
105+
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, true));
99106
}
100107
finally {
101108
queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_join_type");

presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java

Lines changed: 423 additions & 0 deletions
Large diffs are not rendered by default.

presto-main-base/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.OptionalInt;
4040

4141
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
42-
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
4342
import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin;
4443
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PLAN_ERROR;
4544
import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT;
@@ -58,13 +57,15 @@ class GroupedExecutionTagger
5857
private final Metadata metadata;
5958
private final NodePartitioningManager nodePartitioningManager;
6059
private final boolean groupedExecutionEnabled;
60+
private final boolean isPrestoOnSpark;
6161

62-
public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager)
62+
public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean groupedExecutionEnabled, boolean isPrestoOnSpark)
6363
{
6464
this.session = requireNonNull(session, "session is null");
6565
this.metadata = requireNonNull(metadata, "metadata is null");
6666
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
67-
this.groupedExecutionEnabled = isGroupedExecutionEnabled(session);
67+
this.groupedExecutionEnabled = groupedExecutionEnabled;
68+
this.isPrestoOnSpark = isPrestoOnSpark;
6869
}
6970

7071
@Override
@@ -166,6 +167,15 @@ public GroupedExecutionTagger.GroupedExecutionProperties visitMergeJoin(MergeJoi
166167
// TODO: This will break the other use case for merge join operating on sorted tables, which requires grouped execution for correctness.
167168
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
168169
}
170+
171+
if (isPrestoOnSpark) {
172+
GroupedExecutionTagger.GroupedExecutionProperties mergeJoinLeft = node.getLeft().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, true, true), null);
173+
GroupedExecutionTagger.GroupedExecutionProperties mergeJoinRight = node.getRight().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, true, true), null);
174+
if (mergeJoinLeft.currentNodeCapable || mergeJoinRight.currentNodeCapable) {
175+
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
176+
}
177+
}
178+
169179
throw new PrestoException(
170180
INVALID_PLAN_ERROR,
171181
format("When grouped execution can't be enabled, merge join plan is not valid." +

presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class PlanFragmenter
5252
private final QueryManagerConfig config;
5353
private final PlanChecker distributedPlanChecker;
5454
private final PlanChecker singleNodePlanChecker;
55+
private final boolean isPrestoOnSpark;
5556

5657
@Inject
5758
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
@@ -61,6 +62,7 @@ public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitionin
6162
this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
6263
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager);
6364
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager);
65+
this.isPrestoOnSpark = featuresConfig.isPrestoSparkExecutionEnvironment();
6466
}
6567

6668
public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
@@ -90,7 +92,7 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, Pl
9092
PlanNode root = SimplePlanRewriter.rewriteWith(fragmenter, plan.getRoot(), properties);
9193

9294
SubPlan subPlan = fragmenter.buildRootFragment(root, properties);
93-
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning());
95+
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning(), isPrestoOnSpark);
9496
}
9597

9698
private static class Fragmenter

presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
4747
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
4848
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
49+
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
4950
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
5051
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
5152
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
@@ -92,12 +93,13 @@ public static SubPlan finalizeSubPlan(
9293
Session session,
9394
boolean noExchange,
9495
WarningCollector warningCollector,
95-
PartitioningHandle partitioningHandle)
96+
PartitioningHandle partitioningHandle,
97+
boolean isPrestoOnSpark)
9698
{
9799
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle);
98100
if (!noExchange && !isSingleNodeExecutionEnabled(session)) {
99101
// grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED
100-
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager);
102+
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager, isPrestoOnSpark);
101103
}
102104

103105
checkState(subPlan.getFragment().getId().getId() != ROOT_FRAGMENT_ID || !isForceSingleNodeOutput(session) || subPlan.getFragment().getPartitioning().isSingleNode(), "Root of PlanFragment is not single node");
@@ -148,10 +150,10 @@ private static void sanityCheckFragmentedPlan(
148150
149151
* TODO: We should introduce "query section" and make recoverability analysis done at query section level.
150152
*/
151-
private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolean parentContainsTableFinish, Metadata metadata, NodePartitioningManager nodePartitioningManager)
153+
private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolean parentContainsTableFinish, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean isPrestoOnSpark)
152154
{
153155
PlanFragment fragment = subPlan.getFragment();
154-
GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager), null);
156+
GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, isGroupedExecutionEnabled(session), isPrestoOnSpark), null);
155157
if (properties.isSubTreeUseful()) {
156158
boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)
157159
&& new HashSet<>(properties.getCapableTableScanNodes()).containsAll(fragment.getTableScanSchedulingOrder());
@@ -185,7 +187,7 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan,
185187
ImmutableList.Builder<SubPlan> result = ImmutableList.builder();
186188
boolean containsTableFinishNode = containsTableFinishNode(fragment);
187189
for (SubPlan child : subPlan.getChildren()) {
188-
result.add(analyzeGroupedExecution(session, child, containsTableFinishNode, metadata, nodePartitioningManager));
190+
result.add(analyzeGroupedExecution(session, child, containsTableFinishNode, metadata, nodePartitioningManager, isPrestoOnSpark));
189191
}
190192
return new SubPlan(fragment, result.build());
191193
}

presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,7 @@ public PlanOptimizers(
955955
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
956956
// Should be placed after AddExchanges, but before AddLocalExchange
957957
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
958-
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()),
958+
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled(), featuresConfig.isPrestoSparkExecutionEnvironment()),
959959
new SortMergeJoinOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));
960960

961961
// Optimizers above this don't understand local exchanges, so be careful moving this.

0 commit comments

Comments
 (0)