Skip to content

Commit 54edf15

Browse files
committed
Analyze and plan for call distributed procedure statement
1 parent 2d2b03d commit 54edf15

File tree

9 files changed

+523
-46
lines changed

9 files changed

+523
-46
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ public class Analysis
157157

158158
private final Map<NodeRef<QuerySpecification>, List<GroupingOperation>> groupingOperations = new LinkedHashMap<>();
159159

160+
// for call distributed procedure
161+
private Optional<QualifiedObjectName> procedureName;
162+
private Optional<Object[]> procedureArguments;
163+
private Optional<TableHandle> callTarget = Optional.empty();
164+
private Optional<Query> currentQuery = Optional.empty();
165+
160166
// for create table
161167
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
162168
private Map<String, Expression> createTableProperties = ImmutableMap.of();
@@ -635,6 +641,36 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
635641
return createTableDestination;
636642
}
637643

644+
public Optional<QualifiedObjectName> getProcedureName()
645+
{
646+
return procedureName;
647+
}
648+
649+
public void setProcedureName(Optional<QualifiedObjectName> procedureName)
650+
{
651+
this.procedureName = procedureName;
652+
}
653+
654+
public Optional<Object[]> getProcedureArguments()
655+
{
656+
return procedureArguments;
657+
}
658+
659+
public void setProcedureArguments(Optional<Object[]> procedureArguments)
660+
{
661+
this.procedureArguments = procedureArguments;
662+
}
663+
664+
public Optional<TableHandle> getCallTarget()
665+
{
666+
return callTarget;
667+
}
668+
669+
public void setCallTarget(TableHandle callTarget)
670+
{
671+
this.callTarget = Optional.of(callTarget);
672+
}
673+
638674
public Optional<TableHandle> getAnalyzeTarget()
639675
{
640676
return analyzeTarget;
@@ -984,6 +1020,16 @@ public Optional<QuerySpecification> getCurrentQuerySpecification()
9841020
return currentQuerySpecification;
9851021
}
9861022

1023+
public void setCurrentQuery(Query query)
1024+
{
1025+
this.currentQuery = Optional.of(query);
1026+
}
1027+
1028+
public Optional<Query> getCurrentQuery()
1029+
{
1030+
return this.currentQuery;
1031+
}
1032+
9871033
public Map<FunctionKind, Set<String>> getInvokedFunctions()
9881034
{
9891035
Map<FunctionKind, Set<String>> functionMap = new HashMap<>();

presto-main/src/main/java/com/facebook/presto/execution/CallTask.java

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,53 @@ public ListenableFuture<?> execute(Call call, TransactionManager transactionMana
8585
.orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName()));
8686
Procedure procedure = metadata.getProcedureRegistry().resolve(connectorId, toSchemaTableName(procedureName));
8787

88+
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
89+
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, parameterLookup);
90+
91+
// validate arguments
92+
MethodType methodType = procedure.getMethodHandle().type();
93+
for (int i = 0; i < procedure.getArguments().size(); i++) {
94+
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
95+
String name = procedure.getArguments().get(i).getName();
96+
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
97+
}
98+
}
99+
100+
// insert session argument
101+
List<Object> arguments = new ArrayList<>();
102+
Iterator<Object> valuesIterator = asList(values).iterator();
103+
for (Class<?> type : methodType.parameterList()) {
104+
if (ConnectorSession.class.isAssignableFrom(type)) {
105+
arguments.add(session.toConnectorSession(connectorId));
106+
}
107+
else {
108+
arguments.add(valuesIterator.next());
109+
}
110+
}
111+
112+
try {
113+
procedure.getMethodHandle().invokeWithArguments(arguments);
114+
}
115+
catch (Throwable t) {
116+
if (t instanceof InterruptedException) {
117+
Thread.currentThread().interrupt();
118+
}
119+
throwIfInstanceOf(t, PrestoException.class);
120+
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
121+
}
122+
123+
return immediateFuture(null);
124+
}
125+
126+
public static Object toTypeObjectValue(Session session, Type type, Object value)
127+
{
128+
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
129+
writeNativeValue(type, blockBuilder, value);
130+
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
131+
}
132+
133+
public static Object[] extractParameterValuesInOrder(Call call, Procedure procedure, Metadata metadata, Session session, Map<NodeRef<Parameter>, Expression> parameterLookup)
134+
{
88135
// map declared argument names to positions
89136
Map<String, Integer> positions = new HashMap<>();
90137
for (int i = 0; i < procedure.getArguments().size(); i++) {
@@ -131,7 +178,6 @@ else if (i < procedure.getArguments().size()) {
131178

132179
// get argument values
133180
Object[] values = new Object[procedure.getArguments().size()];
134-
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
135181
for (Entry<String, CallArgument> entry : names.entrySet()) {
136182
CallArgument callArgument = entry.getValue();
137183
int index = positions.get(entry.getKey());
@@ -156,45 +202,6 @@ else if (i < procedure.getArguments().size()) {
156202
}
157203
}
158204

159-
// validate arguments
160-
MethodType methodType = procedure.getMethodHandle().type();
161-
for (int i = 0; i < procedure.getArguments().size(); i++) {
162-
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
163-
String name = procedure.getArguments().get(i).getName();
164-
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
165-
}
166-
}
167-
168-
// insert session argument
169-
List<Object> arguments = new ArrayList<>();
170-
Iterator<Object> valuesIterator = asList(values).iterator();
171-
for (Class<?> type : methodType.parameterList()) {
172-
if (ConnectorSession.class.isAssignableFrom(type)) {
173-
arguments.add(session.toConnectorSession(connectorId));
174-
}
175-
else {
176-
arguments.add(valuesIterator.next());
177-
}
178-
}
179-
180-
try {
181-
procedure.getMethodHandle().invokeWithArguments(arguments);
182-
}
183-
catch (Throwable t) {
184-
if (t instanceof InterruptedException) {
185-
Thread.currentThread().interrupt();
186-
}
187-
throwIfInstanceOf(t, PrestoException.class);
188-
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
189-
}
190-
191-
return immediateFuture(null);
192-
}
193-
194-
private static Object toTypeObjectValue(Session session, Type type, Object value)
195-
{
196-
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
197-
writeNativeValue(type, blockBuilder, value);
198-
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
205+
return values;
199206
}
200207
}

presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.sql.analyzer;
1515

1616
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.QualifiedObjectName;
1718
import com.facebook.presto.metadata.Metadata;
1819
import com.facebook.presto.spi.WarningCollector;
1920
import com.facebook.presto.spi.function.FunctionHandle;
@@ -108,11 +109,20 @@ public Analysis analyze(Statement statement, boolean isDescribe)
108109
}
109110

110111
public Analysis analyzeSemantic(Statement statement, boolean isDescribe)
112+
{
113+
return analyzeSemantic(statement, Optional.empty(), isDescribe);
114+
}
115+
116+
public Analysis analyzeSemantic(
117+
Statement statement,
118+
Optional<QualifiedObjectName> procedureName,
119+
boolean isDescribe)
111120
{
112121
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, parameterLookup, accessControl, warningCollector);
113122
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);
114123

115124
metadataExtractor.populateMetadataHandle(session, rewrittenStatement, analysis.getMetadataHandle());
125+
analysis.setProcedureName(procedureName);
116126
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector);
117127
analyzer.analyze(rewrittenStatement, Optional.empty());
118128
analyzeForUtilizedColumns(analysis, analysis.getStatement());

presto-main/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryAnalyzer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public QueryAnalysis analyze(AnalyzerContext analyzerContext, PreparedQuery prep
9090
session.getWarningCollector(),
9191
Optional.of(metadataExtractorExecutor));
9292

93-
Analysis analysis = analyzer.analyzeSemantic(((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(), false);
93+
Analysis analysis = analyzer.analyzeSemantic(
94+
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(),
95+
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getDistributedProcedureName(),
96+
false);
9497
return new BuiltInQueryAnalysis(analysis);
9598
}
9699

presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.facebook.presto.spi.function.FunctionKind;
4949
import com.facebook.presto.spi.function.Signature;
5050
import com.facebook.presto.spi.function.SqlFunction;
51+
import com.facebook.presto.spi.procedure.DistributedProcedure;
5152
import com.facebook.presto.spi.relation.DomainTranslator;
5253
import com.facebook.presto.spi.relation.RowExpression;
5354
import com.facebook.presto.spi.security.AccessControl;
@@ -198,6 +199,7 @@
198199
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
199200
import static com.facebook.presto.common.type.UnknownType.UNKNOWN;
200201
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
202+
import static com.facebook.presto.execution.CallTask.extractParameterValuesInOrder;
201203
import static com.facebook.presto.metadata.MetadataUtil.createQualifiedObjectName;
202204
import static com.facebook.presto.metadata.MetadataUtil.toSchemaTableName;
203205
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
@@ -249,6 +251,7 @@
249251
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISMATCHED_COLUMN_ALIASES;
250252
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISMATCHED_SET_COLUMN_TYPES;
251253
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_ATTRIBUTE;
254+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_CATALOG;
252255
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_COLUMN;
253256
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_MATERIALIZED_VIEW;
254257
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_SCHEMA;
@@ -341,7 +344,7 @@ public Scope analyze(Node node, Scope outerQueryScope)
341344

342345
public Scope analyze(Node node, Optional<Scope> outerQueryScope)
343346
{
344-
return new Visitor(outerQueryScope, warningCollector).process(node, Optional.empty());
347+
return new Visitor(metadata, session, outerQueryScope, warningCollector).process(node, Optional.empty());
345348
}
346349

347350
/**
@@ -352,11 +355,19 @@ public Scope analyze(Node node, Optional<Scope> outerQueryScope)
352355
private class Visitor
353356
extends DefaultTraversalVisitor<Scope, Optional<Scope>>
354357
{
358+
private final Metadata metadata;
359+
private final Session session;
355360
private final Optional<Scope> outerQueryScope;
356361
private final WarningCollector warningCollector;
357362

358-
private Visitor(Optional<Scope> outerQueryScope, WarningCollector warningCollector)
363+
private Visitor(
364+
Metadata metadata,
365+
Session session,
366+
Optional<Scope> outerQueryScope,
367+
WarningCollector warningCollector)
359368
{
369+
this.metadata = requireNonNull(metadata, "metadata is null");
370+
this.session = requireNonNull(session, "session is null");
360371
this.outerQueryScope = requireNonNull(outerQueryScope, "outerQueryScope is null");
361372
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
362373
}
@@ -1056,9 +1067,47 @@ protected Scope visitRevoke(Revoke node, Optional<Scope> scope)
10561067
}
10571068

10581069
@Override
1059-
protected Scope visitCall(Call node, Optional<Scope> scope)
1070+
protected Scope visitCall(Call call, Optional<Scope> scope)
10601071
{
1061-
return createAndAssignScope(node, scope);
1072+
if (analysis.isDescribe()) {
1073+
return createAndAssignScope(call, scope);
1074+
}
1075+
QualifiedObjectName procedureName = createQualifiedObjectName(session, call, call.getName());
1076+
ConnectorId connectorId = metadata.getCatalogHandle(session, procedureName.getCatalogName())
1077+
.orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName()));
1078+
DistributedProcedure procedure = metadata.getProcedureRegistry().resolveDistributed(connectorId, toSchemaTableName(procedureName));
1079+
1080+
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, analysis.getParameters());
1081+
ImmutableList.Builder<String> partNamesBuilder = ImmutableList.builder();
1082+
List<String> partNames = partNamesBuilder.add(procedure.getSchema(values))
1083+
.add(procedure.getTableName(values))
1084+
.build();
1085+
QualifiedName qualifiedName = QualifiedName.of(partNames);
1086+
QualifiedObjectName tableName = createQualifiedObjectName(session, call, qualifiedName);
1087+
1088+
analysis.setUpdateType("CALL");
1089+
analysis.setProcedureArguments(Optional.of(values));
1090+
1091+
String filter = procedure.getFilter(values);
1092+
Expression filterExpression = sqlParser.createExpression(filter);
1093+
QuerySpecification querySpecification = new QuerySpecification(
1094+
selectList(new AllColumns()),
1095+
Optional.of(new Table(qualifiedName)),
1096+
Optional.of(filterExpression),
1097+
Optional.empty(),
1098+
Optional.empty(),
1099+
Optional.empty(),
1100+
Optional.empty(),
1101+
Optional.empty());
1102+
Query query = new Query(Optional.empty(), querySpecification, Optional.empty(), Optional.empty(), Optional.empty());
1103+
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector);
1104+
analyzer.analyze(query, scope);
1105+
analysis.setCurrentQuery(query);
1106+
1107+
TableHandle tableHandle = metadata.getHandleVersion(session, tableName, Optional.empty())
1108+
.orElseThrow(() -> (new SemanticException(MISSING_TABLE, call, "Table '%s' does not exist", tableName)));
1109+
analysis.setCallTarget(tableHandle);
1110+
return createAndAssignScope(call, scope, Field.newUnqualified(Optional.empty(), "rows", BIGINT));
10621111
}
10631112

10641113
private void validateProperties(List<Property> properties, Optional<Scope> scope)

0 commit comments

Comments
 (0)