Skip to content

Commit daf3bd6

Browse files
committed
Small TVF analyzer/planning changes
1 parent 7f2e5c5 commit daf3bd6

File tree

11 files changed

+274
-91
lines changed

11 files changed

+274
-91
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Field.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,6 @@ public Field(Optional<NodeLocation> nodeLocation, Optional<QualifiedName> relati
8686
this.aliased = aliased;
8787
}
8888

89-
public static Field newUnqualified(Optional<String> name, Type type)
90-
{
91-
requireNonNull(name, "name is null");
92-
requireNonNull(type, "type is null");
93-
94-
return new Field(Optional.empty(), Optional.empty(), name, type, false, Optional.empty(), Optional.empty(), false);
95-
}
96-
9789
public Optional<NodeLocation> getNodeLocation()
9890
{
9991
return nodeLocation;

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1491,7 +1491,7 @@ private void verifyRequiredColumns(TableFunctionInvocation node, Map<String, Lis
14911491
// the scope is recorded, because table arguments are already analyzed
14921492
Scope inputScope = analysis.getScope(tableArgumentsByName.get(name).getRelation());
14931493
columns.stream()
1494-
.filter(column -> column < 0 || column >= inputScope.getRelationType().getAllFieldCount()) // hidden columns can be required as well as visible columns
1494+
.filter(column -> column < 0 || column >= inputScope.getRelationType().getVisibleFieldCount())
14951495
.findFirst()
14961496
.ifPresent(column -> {
14971497
throw new SemanticException(TABLE_FUNCTION_IMPLEMENTATION_ERROR, "Invalid index: %s of required column from table argument %s", column, name);

presto-main-base/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@
143143
import static java.lang.String.format;
144144
import static java.util.Objects.requireNonNull;
145145

146-
class QueryPlanner
146+
public class QueryPlanner
147147
{
148148
private final Analysis analysis;
149149
private final VariableAllocator variableAllocator;
@@ -524,7 +524,7 @@ private PlanBuilder project(PlanBuilder subPlan, Iterable<Expression> expression
524524
*
525525
* @return the new subplan and a mapping of each expression to the symbol representing the coercion or an existing symbol if a coercion wasn't needed
526526
*/
527-
private PlanAndMappings coerce(PlanBuilder subPlan, List<Expression> expressions, Analysis analysis, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, Metadata metadata)
527+
public PlanAndMappings coerce(PlanBuilder subPlan, List<Expression> expressions, Analysis analysis, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, Metadata metadata)
528528
{
529529
Assignments.Builder assignments = Assignments.builder();
530530
assignments.putAll(subPlan.getRoot().getOutputVariables().stream().collect(toImmutableMap(Function.identity(), Function.identity())));
@@ -1346,7 +1346,7 @@ private RowExpression rowExpression(Expression expression, SqlPlannerContext con
13461346
context.getTranslatorContext());
13471347
}
13481348

1349-
private static List<Expression> toSymbolReferences(List<VariableReferenceExpression> variables)
1349+
public static List<Expression> toSymbolReferences(List<VariableReferenceExpression> variables)
13501350
{
13511351
return variables.stream()
13521352
.map(variable -> new SymbolReference(
@@ -1355,6 +1355,11 @@ private static List<Expression> toSymbolReferences(List<VariableReferenceExpress
13551355
.collect(toImmutableList());
13561356
}
13571357

1358+
public static SymbolReference toSymbolReference(VariableReferenceExpression variable)
1359+
{
1360+
return new SymbolReference(variable.getSourceLocation().map(location -> new NodeLocation(location.getLine(), location.getColumn())), variable.getName());
1361+
}
1362+
13581363
public static class PlanAndMappings
13591364
{
13601365
private final PlanBuilder subPlan;

presto-main-base/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ protected RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node
346346
outputVariablesBuilder.build(),
347347
sources.stream().map(RelationPlan::getRoot).collect(toImmutableList()),
348348
inputRelationsProperties,
349+
functionAnalysis.getCopartitioningLists(),
349350
new TableFunctionHandle(functionAnalysis.getConnectorId(), functionAnalysis.getConnectorTableFunctionHandle(), functionAnalysis.getTransactionHandle()));
350351

351352
return new RelationPlan(root, scope, outputVariables);

presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ public PlanNode visitTableFunction(TableFunctionNode node, RewriteContext<Void>
490490
node.getOutputVariables(),
491491
node.getSources(),
492492
node.getTableArgumentProperties(),
493+
node.getCopartitioningLists(),
493494
node.getHandle());
494495
}
495496

presto-main-base/src/main/java/com/facebook/presto/sql/planner/plan/SimplePlanRewriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public C get()
6161
return userContext;
6262
}
6363

64+
public SimplePlanRewriter<C> getNodeRewriter()
65+
{
66+
return nodeRewriter;
67+
}
68+
6469
/**
6570
* Invoke the rewrite logic recursively on children of the given node and swap it
6671
* out with an identical copy with the rewritten children

presto-main-base/src/main/java/com/facebook/presto/sql/planner/plan/TableFunctionNode.java

Lines changed: 125 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
import com.facebook.presto.spi.relation.VariableReferenceExpression;
2323
import com.fasterxml.jackson.annotation.JsonCreator;
2424
import com.fasterxml.jackson.annotation.JsonProperty;
25+
import com.google.common.collect.ImmutableList;
26+
import com.google.common.collect.ImmutableMap;
2527
import com.google.errorprone.annotations.Immutable;
2628

29+
import java.util.Collection;
2730
import java.util.List;
2831
import java.util.Map;
2932
import java.util.Optional;
3033

3134
import static com.google.common.base.Preconditions.checkArgument;
35+
import static com.google.common.collect.ImmutableList.toImmutableList;
3236
import static java.util.Objects.requireNonNull;
3337

3438
@Immutable
@@ -40,6 +44,7 @@ public class TableFunctionNode
4044
private final List<VariableReferenceExpression> outputVariables;
4145
private final List<PlanNode> sources;
4246
private final List<TableArgumentProperties> tableArgumentProperties;
47+
private final List<List<String>> copartitioningLists;
4348
private final TableFunctionHandle handle;
4449

4550
@JsonCreator
@@ -50,9 +55,10 @@ public TableFunctionNode(
5055
@JsonProperty("outputVariables") List<VariableReferenceExpression> outputVariables,
5156
@JsonProperty("sources") List<PlanNode> sources,
5257
@JsonProperty("tableArgumentProperties") List<TableArgumentProperties> tableArgumentProperties,
58+
@JsonProperty("copartitioningLists") List<List<String>> copartitioningLists,
5359
@JsonProperty("handle") TableFunctionHandle handle)
5460
{
55-
this(Optional.empty(), id, Optional.empty(), name, arguments, outputVariables, sources, tableArgumentProperties, handle);
61+
this(Optional.empty(), id, Optional.empty(), name, arguments, outputVariables, sources, tableArgumentProperties, copartitioningLists, handle);
5662
}
5763

5864
public TableFunctionNode(
@@ -64,14 +70,18 @@ public TableFunctionNode(
6470
List<VariableReferenceExpression> outputVariables,
6571
List<PlanNode> sources,
6672
List<TableArgumentProperties> tableArgumentProperties,
73+
List<List<String>> copartitioningLists,
6774
TableFunctionHandle handle)
6875
{
6976
super(sourceLocation, id, statsEquivalentPlanNode);
7077
this.name = requireNonNull(name, "name is null");
71-
this.arguments = requireNonNull(arguments, "arguments is null");
72-
this.outputVariables = requireNonNull(outputVariables, "outputVariables is null");
73-
this.sources = requireNonNull(sources, "sources is null");
74-
this.tableArgumentProperties = requireNonNull(tableArgumentProperties, "tableArgumentProperties is null");
78+
this.arguments = ImmutableMap.copyOf(arguments);
79+
this.outputVariables = ImmutableList.copyOf(outputVariables);
80+
this.sources = ImmutableList.copyOf(sources);
81+
this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties);
82+
this.copartitioningLists = copartitioningLists.stream()
83+
.map(ImmutableList::copyOf)
84+
.collect(toImmutableList());
7585
this.handle = requireNonNull(handle, "handle is null");
7686
}
7787

@@ -87,8 +97,23 @@ public Map<String, Argument> getArguments()
8797
return arguments;
8898
}
8999

90-
@JsonProperty
100+
@Override
91101
public List<VariableReferenceExpression> getOutputVariables()
102+
{
103+
ImmutableList.Builder<VariableReferenceExpression> variables = ImmutableList.builder();
104+
variables.addAll(outputVariables);
105+
106+
tableArgumentProperties.stream()
107+
.map(TableArgumentProperties::getPassThroughSpecification)
108+
.map(PassThroughSpecification::getColumns)
109+
.flatMap(Collection::stream)
110+
.map(PassThroughColumn::getOutputVariables)
111+
.forEach(variables::add);
112+
113+
return variables.build();
114+
}
115+
116+
public List<VariableReferenceExpression> getProperOutputs()
92117
{
93118
return outputVariables;
94119
}
@@ -99,6 +124,12 @@ public List<TableArgumentProperties> getTableArgumentProperties()
99124
return tableArgumentProperties;
100125
}
101126

127+
@JsonProperty
128+
public List<List<String>> getCopartitioningLists()
129+
{
130+
return copartitioningLists;
131+
}
132+
102133
@JsonProperty
103134
public TableFunctionHandle getHandle()
104135
{
@@ -122,35 +153,47 @@ public <R, C> R accept(InternalPlanVisitor<R, C> visitor, C context)
122153
public PlanNode replaceChildren(List<PlanNode> newSources)
123154
{
124155
checkArgument(sources.size() == newSources.size(), "wrong number of new children");
125-
return new TableFunctionNode(getId(), name, arguments, outputVariables, newSources, tableArgumentProperties, handle);
156+
return new TableFunctionNode(getId(), name, arguments, outputVariables, newSources, tableArgumentProperties, copartitioningLists, handle);
126157
}
127158

128159
@Override
129160
public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
130161
{
131-
return new TableFunctionNode(getSourceLocation(), getId(), statsEquivalentPlanNode, name, arguments, outputVariables, sources, tableArgumentProperties, handle);
162+
return new TableFunctionNode(getSourceLocation(), getId(), statsEquivalentPlanNode, name, arguments, outputVariables, sources, tableArgumentProperties, copartitioningLists, handle);
132163
}
133164

134165
public static class TableArgumentProperties
135166
{
167+
private final String argumentName;
136168
private final boolean rowSemantics;
137169
private final boolean pruneWhenEmpty;
138-
private final boolean passThroughColumns;
170+
private final PassThroughSpecification passThroughSpecification;
171+
private final List<VariableReferenceExpression> requiredColumns;
139172
private final Optional<DataOrganizationSpecification> specification;
140173

141174
@JsonCreator
142175
public TableArgumentProperties(
176+
@JsonProperty("argumentName") String argumentName,
143177
@JsonProperty("rowSemantics") boolean rowSemantics,
144178
@JsonProperty("pruneWhenEmpty") boolean pruneWhenEmpty,
145-
@JsonProperty("passThroughColumns") boolean passThroughColumns,
179+
@JsonProperty("passThroughSpecification") PassThroughSpecification passThroughSpecification,
180+
@JsonProperty("requiredColumns") List<VariableReferenceExpression> requiredColumns,
146181
@JsonProperty("specification") Optional<DataOrganizationSpecification> specification)
147182
{
183+
this.argumentName = requireNonNull(argumentName, "argumentName is null");
148184
this.rowSemantics = rowSemantics;
149185
this.pruneWhenEmpty = pruneWhenEmpty;
150-
this.passThroughColumns = passThroughColumns;
186+
this.passThroughSpecification = requireNonNull(passThroughSpecification, "passThroughSpecification is null");
187+
this.requiredColumns = ImmutableList.copyOf(requiredColumns);
151188
this.specification = requireNonNull(specification, "specification is null");
152189
}
153190

191+
@JsonProperty
192+
public String getArgumentName()
193+
{
194+
return argumentName;
195+
}
196+
154197
@JsonProperty
155198
public boolean isRowSemantics()
156199
{
@@ -164,15 +207,83 @@ public boolean isPruneWhenEmpty()
164207
}
165208

166209
@JsonProperty
167-
public boolean isPassThroughColumns()
210+
public PassThroughSpecification getPassThroughSpecification()
211+
{
212+
return passThroughSpecification;
213+
}
214+
215+
@JsonProperty
216+
public List<VariableReferenceExpression> getRequiredColumns()
168217
{
169-
return passThroughColumns;
218+
return requiredColumns;
170219
}
171220

172221
@JsonProperty
173-
public Optional<DataOrganizationSpecification> specification()
222+
public Optional<DataOrganizationSpecification> getSpecification()
174223
{
175224
return specification;
176225
}
177226
}
227+
228+
/**
229+
* Specifies how columns from source tables are passed through to the output of a table function.
230+
* This class manages both explicitly declared pass-through columns and partitioning columns
231+
* that must be preserved in the output.
232+
*/
233+
public static class PassThroughSpecification
234+
{
235+
private final boolean declaredAsPassThrough;
236+
private final List<PassThroughColumn> columns;
237+
238+
@JsonCreator
239+
public PassThroughSpecification(
240+
@JsonProperty("declaredAsPassThrough") boolean declaredAsPassThrough,
241+
@JsonProperty("columns") List<PassThroughColumn> columns)
242+
{
243+
this.declaredAsPassThrough = declaredAsPassThrough;
244+
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
245+
checkArgument(
246+
declaredAsPassThrough || this.columns.stream().allMatch(PassThroughColumn::isPartitioningColumn),
247+
"non-partitioning pass-through column for non-pass-through source of a table function");
248+
}
249+
250+
@JsonProperty
251+
public boolean isDeclaredAsPassThrough()
252+
{
253+
return declaredAsPassThrough;
254+
}
255+
256+
@JsonProperty
257+
public List<PassThroughColumn> getColumns()
258+
{
259+
return columns;
260+
}
261+
}
262+
263+
public static class PassThroughColumn
264+
{
265+
private final VariableReferenceExpression outputVariables;
266+
private final boolean isPartitioningColumn;
267+
268+
@JsonCreator
269+
public PassThroughColumn(
270+
@JsonProperty("outputVariables") VariableReferenceExpression outputVariables,
271+
@JsonProperty("partitioningColumn") boolean isPartitioningColumn)
272+
{
273+
this.outputVariables = requireNonNull(outputVariables, "symbol is null");
274+
this.isPartitioningColumn = isPartitioningColumn;
275+
}
276+
277+
@JsonProperty
278+
public VariableReferenceExpression getOutputVariables()
279+
{
280+
return outputVariables;
281+
}
282+
283+
@JsonProperty
284+
public boolean isPartitioningColumn()
285+
{
286+
return isPartitioningColumn;
287+
}
288+
}
178289
}

presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,8 @@ public void installPlugin(Plugin plugin)
772772
@Override
773773
public void createCatalog(String catalogName, String connectorName, Map<String, String> properties)
774774
{
775-
throw new UnsupportedOperationException();
775+
nodeManager.addCurrentNodeConnector(new ConnectorId(catalogName));
776+
connectorManager.createConnection(catalogName, connectorName, properties);
776777
}
777778

778779
@Override

0 commit comments

Comments
 (0)