Skip to content

Commit a504c7c

Browse files
arhimondraditi-pandit
authored andcommitted
[native] Improve semi join parallelism
Run build side in parallel similar to other joins
1 parent 1bc7da4 commit a504c7c

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,8 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, StreamPreferredProper
868868
parentPreferences.constrainTo(node.getSource().getOutputVariables()).withDefaultParallelism(session));
869869

870870
// this filter source consumes the input completely, so we do not pass through parent preferences
871-
PlanWithProperties filteringSource = planAndEnforce(node.getFilteringSource(), singleStream(), singleStream());
871+
StreamPreferredProperties filteringPreference = nativeExecution ? defaultParallelism(session) : singleStream();
872+
PlanWithProperties filteringSource = planAndEnforce(node.getFilteringSource(), filteringPreference, filteringPreference);
872873

873874
return rebaseAndDeriveProperties(node, ImmutableList.of(source, filteringSource));
874875
}

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeJoinQueries.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
package com.facebook.presto.nativeworker;
1515

1616
import com.facebook.presto.Session;
17+
import com.facebook.presto.sql.analyzer.FeaturesConfig;
1718
import com.facebook.presto.testing.QueryRunner;
1819
import com.facebook.presto.tests.AbstractTestQueryFramework;
20+
import com.google.common.collect.ImmutableMap;
1921
import org.testng.annotations.DataProvider;
2022
import org.testng.annotations.Test;
2123

@@ -28,6 +30,13 @@
2830
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrdersEx;
2931
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPartitionedNation;
3032
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createRegion;
33+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
34+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
35+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.semiJoin;
36+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;
37+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
38+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
39+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE;
3140

3241
public abstract class AbstractTestNativeJoinQueries
3342
extends AbstractTestQueryFramework
@@ -47,6 +56,12 @@ protected void createTables()
4756
createBucketedCustomer(queryRunner);
4857
}
4958

59+
@Override
60+
protected FeaturesConfig createFeaturesConfig()
61+
{
62+
return new FeaturesConfig().setNativeExecutionEnabled(true);
63+
}
64+
5065
@Test(dataProvider = "joinTypeProvider")
5166
public void testInnerJoin(Session joinTypeSession)
5267
{
@@ -67,6 +82,35 @@ public void testBucketedInnerJoin(Session joinTypeSession)
6782
"WHERE b.name=c.name AND \"$bucket\" = 5");
6883
}
6984

85+
@Test
86+
public void testSemiJoinPlan()
87+
{
88+
String sql = "SELECT orderkey FROM orders WHERE orderdate IN (SELECT shipdate FROM lineitem)";
89+
assertPlan(
90+
partitionedJoin(),
91+
sql,
92+
anyTree(
93+
semiJoin("orderdate", "shipdate", "orderkey_1",
94+
exchange(REMOTE_STREAMING, REPARTITION,
95+
tableScan("orders", ImmutableMap.of(
96+
"orderkey", "orderkey",
97+
"orderdate", "orderdate"))),
98+
exchange(REMOTE_STREAMING, REPARTITION,
99+
tableScan("lineitem", ImmutableMap.of(
100+
"shipdate", "shipdate"))))));
101+
assertPlan(
102+
broadcastJoin(),
103+
sql,
104+
anyTree(
105+
semiJoin("orderdate", "shipdate", "orderkey_1",
106+
tableScan("orders", ImmutableMap.of(
107+
"orderkey", "orderkey",
108+
"orderdate", "orderdate")),
109+
exchange(REMOTE_STREAMING, REPLICATE,
110+
tableScan("lineitem", ImmutableMap.of(
111+
"shipdate", "shipdate"))))));
112+
}
113+
70114
@Test(dataProvider = "joinTypeProvider")
71115
public void testSemiJoin(Session joinTypeSession)
72116
{

0 commit comments

Comments
 (0)