Skip to content

Commit 5c36ee5

Browse files
committed
Add planner support to prevent mixed CPP/Java execution
1 parent 3d3f541 commit 5c36ee5

File tree

11 files changed

+1719
-17
lines changed

11 files changed

+1719
-17
lines changed

presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.facebook.presto.sql.planner.iterative.rule.EvaluateZeroLimit;
4646
import com.facebook.presto.sql.planner.iterative.rule.EvaluateZeroSample;
4747
import com.facebook.presto.sql.planner.iterative.rule.ExtractSpatialJoins;
48+
import com.facebook.presto.sql.planner.iterative.rule.ExtractSystemTableFilterRuleSet;
4849
import com.facebook.presto.sql.planner.iterative.rule.GatherAndMergeWindows;
4950
import com.facebook.presto.sql.planner.iterative.rule.ImplementBernoulliSampleAsFilter;
5051
import com.facebook.presto.sql.planner.iterative.rule.ImplementFilteredAggregations;
@@ -937,13 +938,13 @@ public PlanOptimizers(
937938
// Optimizers above this do not need to care about aggregations with the type other than SINGLE
938939
// This optimizer must be run after all exchange-related optimizers
939940
builder.add(new IterativeOptimizer(
940-
metadata,
941-
ruleStats,
942-
statsCalculator,
943-
costCalculator,
944-
ImmutableSet.of(
945-
new PushPartialAggregationThroughJoin(),
946-
new PushPartialAggregationThroughExchange(metadata.getFunctionAndTypeManager(), featuresConfig.isNativeExecutionEnabled()))),
941+
metadata,
942+
ruleStats,
943+
statsCalculator,
944+
costCalculator,
945+
ImmutableSet.of(
946+
new PushPartialAggregationThroughJoin(),
947+
new PushPartialAggregationThroughExchange(metadata.getFunctionAndTypeManager(), featuresConfig.isNativeExecutionEnabled()))),
947948
// MergePartialAggregationsWithFilter should immediately follow PushPartialAggregationThroughExchange
948949
new MergePartialAggregationsWithFilter(metadata.getFunctionAndTypeManager()),
949950
new IterativeOptimizer(
@@ -982,6 +983,14 @@ public PlanOptimizers(
982983
// Pass after connector optimizer, as it relies on connector optimizer to identify empty input tables and convert them to empty ValuesNode
983984
builder.add(new SimplifyPlanWithEmptyInput());
984985

986+
builder.add(
987+
new IterativeOptimizer(
988+
metadata,
989+
ruleStats,
990+
statsCalculator,
991+
costCalculator,
992+
new ExtractSystemTableFilterRuleSet(metadata.getFunctionAndTypeManager()).rules()));
993+
985994
// DO NOT add optimizers that change the plan shape (computations) after this point
986995

987996
// Precomputed hashes - this assumes that partitioning will not change
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.planner.iterative.rule;
15+
16+
import com.facebook.presto.matching.Capture;
17+
import com.facebook.presto.matching.Captures;
18+
import com.facebook.presto.matching.Pattern;
19+
import com.facebook.presto.metadata.FunctionAndTypeManager;
20+
import com.facebook.presto.spi.plan.FilterNode;
21+
import com.facebook.presto.spi.plan.PartitioningScheme;
22+
import com.facebook.presto.spi.plan.PlanNode;
23+
import com.facebook.presto.spi.plan.ProjectNode;
24+
import com.facebook.presto.spi.plan.TableScanNode;
25+
import com.facebook.presto.sql.planner.PlannerUtils;
26+
import com.facebook.presto.sql.planner.iterative.Rule;
27+
import com.facebook.presto.sql.planner.plan.ExchangeNode;
28+
import com.google.common.collect.ImmutableList;
29+
import com.google.common.collect.ImmutableSet;
30+
31+
import java.util.Optional;
32+
import java.util.Set;
33+
34+
import static com.facebook.presto.matching.Capture.newCapture;
35+
import static com.facebook.presto.sql.planner.plan.Patterns.exchange;
36+
import static com.facebook.presto.sql.planner.plan.Patterns.filter;
37+
import static com.facebook.presto.sql.planner.plan.Patterns.project;
38+
import static com.facebook.presto.sql.planner.plan.Patterns.source;
39+
import static com.facebook.presto.sql.planner.plan.Patterns.tableScan;
40+
import static com.facebook.presto.sql.relational.RowExpressionUtils.containsNonCoordinatorEligibleCallExpression;
41+
import static java.util.Objects.requireNonNull;
42+
43+
/**
44+
* RuleSet for extracting system table filters when they contain non-coordinator-eligible functions (e.g., CPP functions).
45+
* This ensures that system table scans happen on the coordinator while CPP functions execute on workers.
46+
*
47+
* Patterns handled:
48+
* 1. Exchange -> Project -> Filter -> TableScan (system) => Project -> Filter -> Exchange -> TableScan
49+
* 2. Exchange -> Project -> TableScan (system) => Project -> Exchange -> TableScan
50+
* 3. Exchange -> Filter -> TableScan (system) => Filter -> Exchange -> TableScan
51+
*/
52+
public class ExtractSystemTableFilterRuleSet
53+
{
54+
private final FunctionAndTypeManager functionAndTypeManager;
55+
56+
public ExtractSystemTableFilterRuleSet(FunctionAndTypeManager functionAndTypeManager)
57+
{
58+
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
59+
}
60+
61+
public Set<Rule<?>> rules()
62+
{
63+
return ImmutableSet.of(
64+
new ProjectFilterScanRule(),
65+
new ProjectScanRule(),
66+
new FilterScanRule());
67+
}
68+
69+
private abstract class SystemTableFilterRule<T extends PlanNode>
70+
implements Rule<T>
71+
{
72+
protected final Capture<TableScanNode> tableScanCapture = newCapture();
73+
74+
protected boolean containsFunctionsIneligibleOnCoordinator(Optional<FilterNode> filterNode, Optional<ProjectNode> projectNode)
75+
{
76+
boolean hasIneligiblePredicates = filterNode
77+
.map(filter -> containsNonCoordinatorEligibleCallExpression(functionAndTypeManager, filter.getPredicate()))
78+
.orElse(false);
79+
80+
boolean hasIneligibleProjections = projectNode
81+
.map(project -> project.getAssignments().getExpressions().stream()
82+
.anyMatch(expression -> containsNonCoordinatorEligibleCallExpression(functionAndTypeManager, expression)))
83+
.orElse(false);
84+
85+
return hasIneligiblePredicates || hasIneligibleProjections;
86+
}
87+
}
88+
89+
private final class ProjectFilterScanRule
90+
extends SystemTableFilterRule<ExchangeNode>
91+
{
92+
private final Capture<ExchangeNode> exchangeCapture = newCapture();
93+
private final Capture<ProjectNode> projectCapture = newCapture();
94+
private final Capture<FilterNode> filterCapture = newCapture();
95+
96+
@Override
97+
public Pattern<ExchangeNode> getPattern()
98+
{
99+
return exchange()
100+
.capturedAs(exchangeCapture)
101+
.with(source().matching(
102+
project()
103+
.capturedAs(projectCapture)
104+
.with(source().matching(
105+
filter()
106+
.capturedAs(filterCapture)
107+
.with(source().matching(
108+
tableScan()
109+
.capturedAs(tableScanCapture)
110+
.matching(PlannerUtils::containsSystemTableScan)))))));
111+
}
112+
113+
@Override
114+
public Result apply(ExchangeNode node, Captures captures, Context context)
115+
{
116+
TableScanNode tableScanNode = captures.get(tableScanCapture);
117+
ExchangeNode exchangeNode = captures.get(exchangeCapture);
118+
ProjectNode projectNode = captures.get(projectCapture);
119+
FilterNode filterNode = captures.get(filterCapture);
120+
121+
if (!containsFunctionsIneligibleOnCoordinator(Optional.of(filterNode), Optional.of(projectNode))) {
122+
return Result.empty();
123+
}
124+
125+
// The exchange's output variables must match what the filter expects
126+
// Since the filter was originally between project and table scan, it expects
127+
// the table scan's output variables
128+
PartitioningScheme newPartitioningScheme = new PartitioningScheme(
129+
exchangeNode.getPartitioningScheme().getPartitioning(),
130+
tableScanNode.getOutputVariables(),
131+
exchangeNode.getPartitioningScheme().getHashColumn(),
132+
exchangeNode.getPartitioningScheme().isScaleWriters(),
133+
exchangeNode.getPartitioningScheme().isReplicateNullsAndAny(),
134+
exchangeNode.getPartitioningScheme().getEncoding(),
135+
exchangeNode.getPartitioningScheme().getBucketToPartition());
136+
137+
// Create new exchange with table scan as source
138+
ExchangeNode newExchange = new ExchangeNode(
139+
exchangeNode.getSourceLocation(),
140+
context.getIdAllocator().getNextId(),
141+
exchangeNode.getType(),
142+
exchangeNode.getScope(),
143+
newPartitioningScheme,
144+
ImmutableList.of(tableScanNode),
145+
ImmutableList.of(tableScanNode.getOutputVariables()),
146+
exchangeNode.isEnsureSourceOrdering(),
147+
exchangeNode.getOrderingScheme());
148+
149+
// Recreate filter with exchange as source
150+
FilterNode newFilter = new FilterNode(
151+
filterNode.getSourceLocation(),
152+
context.getIdAllocator().getNextId(),
153+
newExchange,
154+
filterNode.getPredicate());
155+
156+
// Recreate project with filter as source
157+
ProjectNode newProject = new ProjectNode(
158+
projectNode.getSourceLocation(),
159+
context.getIdAllocator().getNextId(),
160+
newFilter,
161+
projectNode.getAssignments(),
162+
projectNode.getLocality());
163+
164+
return Result.ofPlanNode(newProject);
165+
}
166+
}
167+
168+
private final class ProjectScanRule
169+
extends SystemTableFilterRule<ExchangeNode>
170+
{
171+
private final Capture<ExchangeNode> exchangeCapture = newCapture();
172+
private final Capture<ProjectNode> projectCapture = newCapture();
173+
174+
@Override
175+
public Pattern<ExchangeNode> getPattern()
176+
{
177+
return exchange()
178+
.capturedAs(exchangeCapture)
179+
.with(source().matching(
180+
project()
181+
.capturedAs(projectCapture)
182+
.with(source().matching(
183+
tableScan()
184+
.capturedAs(tableScanCapture)
185+
.matching(PlannerUtils::containsSystemTableScan)))));
186+
}
187+
188+
@Override
189+
public Result apply(ExchangeNode node, Captures captures, Context context)
190+
{
191+
TableScanNode tableScanNode = captures.get(tableScanCapture);
192+
ExchangeNode exchangeNode = captures.get(exchangeCapture);
193+
ProjectNode projectNode = captures.get(projectCapture);
194+
195+
if (!containsFunctionsIneligibleOnCoordinator(Optional.empty(), Optional.of(projectNode))) {
196+
return Result.empty();
197+
}
198+
199+
// Update partitioning scheme to match table scan outputs
200+
PartitioningScheme newPartitioningScheme = new PartitioningScheme(
201+
exchangeNode.getPartitioningScheme().getPartitioning(),
202+
tableScanNode.getOutputVariables(),
203+
exchangeNode.getPartitioningScheme().getHashColumn(),
204+
exchangeNode.getPartitioningScheme().isScaleWriters(),
205+
exchangeNode.getPartitioningScheme().isReplicateNullsAndAny(),
206+
exchangeNode.getPartitioningScheme().getEncoding(),
207+
exchangeNode.getPartitioningScheme().getBucketToPartition());
208+
209+
// Create new exchange with table scan as source
210+
ExchangeNode newExchange = new ExchangeNode(
211+
exchangeNode.getSourceLocation(),
212+
context.getIdAllocator().getNextId(),
213+
exchangeNode.getType(),
214+
exchangeNode.getScope(),
215+
newPartitioningScheme,
216+
ImmutableList.of(tableScanNode),
217+
ImmutableList.of(tableScanNode.getOutputVariables()),
218+
exchangeNode.isEnsureSourceOrdering(),
219+
exchangeNode.getOrderingScheme());
220+
221+
// Recreate project with exchange as source
222+
ProjectNode newProject = new ProjectNode(
223+
projectNode.getSourceLocation(),
224+
context.getIdAllocator().getNextId(),
225+
newExchange,
226+
projectNode.getAssignments(),
227+
projectNode.getLocality());
228+
229+
return Result.ofPlanNode(newProject);
230+
}
231+
}
232+
233+
private final class FilterScanRule
234+
extends SystemTableFilterRule<ExchangeNode>
235+
{
236+
private final Capture<ExchangeNode> exchangeCapture = newCapture();
237+
private final Capture<FilterNode> filterCapture = newCapture();
238+
239+
@Override
240+
public Pattern<ExchangeNode> getPattern()
241+
{
242+
return exchange()
243+
.capturedAs(exchangeCapture)
244+
.with(source().matching(
245+
filter()
246+
.capturedAs(filterCapture)
247+
.with(source().matching(
248+
tableScan()
249+
.capturedAs(tableScanCapture)
250+
.matching(PlannerUtils::containsSystemTableScan)))));
251+
}
252+
253+
@Override
254+
public Result apply(ExchangeNode node, Captures captures, Context context)
255+
{
256+
TableScanNode tableScanNode = captures.get(tableScanCapture);
257+
ExchangeNode exchangeNode = captures.get(exchangeCapture);
258+
FilterNode filterNode = captures.get(filterCapture);
259+
260+
if (!containsFunctionsIneligibleOnCoordinator(Optional.of(filterNode), Optional.empty())) {
261+
return Result.empty();
262+
}
263+
264+
// Update partitioning scheme to match table scan outputs
265+
PartitioningScheme newPartitioningScheme = new PartitioningScheme(
266+
exchangeNode.getPartitioningScheme().getPartitioning(),
267+
tableScanNode.getOutputVariables(),
268+
exchangeNode.getPartitioningScheme().getHashColumn(),
269+
exchangeNode.getPartitioningScheme().isScaleWriters(),
270+
exchangeNode.getPartitioningScheme().isReplicateNullsAndAny(),
271+
exchangeNode.getPartitioningScheme().getEncoding(),
272+
exchangeNode.getPartitioningScheme().getBucketToPartition());
273+
274+
// Create new exchange with table scan as source
275+
ExchangeNode newExchange = new ExchangeNode(
276+
exchangeNode.getSourceLocation(),
277+
context.getIdAllocator().getNextId(),
278+
exchangeNode.getType(),
279+
exchangeNode.getScope(),
280+
newPartitioningScheme,
281+
ImmutableList.of(tableScanNode),
282+
ImmutableList.of(tableScanNode.getOutputVariables()),
283+
exchangeNode.isEnsureSourceOrdering(),
284+
exchangeNode.getOrderingScheme());
285+
286+
// Recreate filter with exchange as source
287+
FilterNode newFilter = new FilterNode(
288+
filterNode.getSourceLocation(),
289+
context.getIdAllocator().getNextId(),
290+
newExchange,
291+
filterNode.getPredicate());
292+
293+
return Result.ofPlanNode(newFilter);
294+
}
295+
}
296+
}

presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PickTableLayout.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@
5959
import static com.facebook.presto.metadata.TableLayoutResult.computeEnforced;
6060
import static com.facebook.presto.spi.relation.DomainTranslator.BASIC_COLUMN_EXTRACTOR;
6161
import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
62+
import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan;
6263
import static com.facebook.presto.sql.planner.iterative.rule.PreconditionRules.checkRulesAreFiredBeforeAddExchangesRule;
6364
import static com.facebook.presto.sql.planner.plan.Patterns.filter;
6465
import static com.facebook.presto.sql.planner.plan.Patterns.source;
6566
import static com.facebook.presto.sql.planner.plan.Patterns.tableScan;
67+
import static com.facebook.presto.sql.relational.RowExpressionUtils.containsNonCoordinatorEligibleCallExpression;
6668
import static com.google.common.base.Preconditions.checkArgument;
6769
import static com.google.common.collect.ImmutableSet.toImmutableSet;
6870
import static com.google.common.collect.Sets.intersection;
@@ -271,6 +273,16 @@ private static PlanNode pushPredicateIntoTableScan(
271273
new FunctionResolution(metadata.getFunctionAndTypeManager().getFunctionAndTypeResolver()),
272274
metadata.getFunctionAndTypeManager());
273275
RowExpression deterministicPredicate = logicalRowExpressions.filterDeterministicConjuncts(predicate);
276+
// If the predicate contains non-Java expressions, we cannot prune partitions over system tables.
277+
RowExpression ineligiblePredicate = TRUE_CONSTANT;
278+
if (containsSystemTableScan(node)) {
279+
ineligiblePredicate = logicalRowExpressions.filterConjuncts(
280+
deterministicPredicate,
281+
expression -> containsNonCoordinatorEligibleCallExpression(metadata.getFunctionAndTypeManager(), expression));
282+
deterministicPredicate = logicalRowExpressions.filterConjuncts(
283+
deterministicPredicate,
284+
expression -> !containsNonCoordinatorEligibleCallExpression(metadata.getFunctionAndTypeManager(), expression));
285+
}
274286
DomainTranslator.ExtractionResult<VariableReferenceExpression> decomposedPredicate = domainTranslator.fromPredicate(
275287
session.toConnectorSession(),
276288
deterministicPredicate,
@@ -339,7 +351,8 @@ private static PlanNode pushPredicateIntoTableScan(
339351
RowExpression resultingPredicate = logicalRowExpressions.combineConjuncts(
340352
domainTranslator.toPredicate(layout.getUnenforcedConstraint().transform(assignments::get)),
341353
logicalRowExpressions.filterNonDeterministicConjuncts(predicate),
342-
decomposedPredicate.getRemainingExpression());
354+
decomposedPredicate.getRemainingExpression(),
355+
ineligiblePredicate);
343356

344357
if (!TRUE_CONSTANT.equals(resultingPredicate)) {
345358
return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), tableScan, resultingPredicate);

0 commit comments

Comments
 (0)