Skip to content

Commit 898574c

Browse files
committed
Analyze and plan for call distributed procedure statement
1 parent 48a2c5f commit 898574c

File tree

13 files changed

+604
-48
lines changed

13 files changed

+604
-48
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ public class Analysis
157157

158158
private final Map<NodeRef<QuerySpecification>, List<GroupingOperation>> groupingOperations = new LinkedHashMap<>();
159159

160+
// for call distributed procedure
161+
private Optional<QualifiedObjectName> procedureName;
162+
private Optional<Object[]> procedureArguments;
163+
private Optional<TableHandle> callTarget = Optional.empty();
164+
private Optional<QuerySpecification> targetQuery = Optional.empty();
165+
160166
// for create table
161167
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
162168
private Map<String, Expression> createTableProperties = ImmutableMap.of();
@@ -635,6 +641,36 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
635641
return createTableDestination;
636642
}
637643

644+
public Optional<QualifiedObjectName> getProcedureName()
645+
{
646+
return procedureName;
647+
}
648+
649+
public void setProcedureName(Optional<QualifiedObjectName> procedureName)
650+
{
651+
this.procedureName = procedureName;
652+
}
653+
654+
public Optional<Object[]> getProcedureArguments()
655+
{
656+
return procedureArguments;
657+
}
658+
659+
public void setProcedureArguments(Optional<Object[]> procedureArguments)
660+
{
661+
this.procedureArguments = procedureArguments;
662+
}
663+
664+
public Optional<TableHandle> getCallTarget()
665+
{
666+
return callTarget;
667+
}
668+
669+
public void setCallTarget(TableHandle callTarget)
670+
{
671+
this.callTarget = Optional.of(callTarget);
672+
}
673+
638674
public Optional<TableHandle> getAnalyzeTarget()
639675
{
640676
return analyzeTarget;
@@ -984,6 +1020,16 @@ public Optional<QuerySpecification> getCurrentQuerySpecification()
9841020
return currentQuerySpecification;
9851021
}
9861022

1023+
public void setTargetQuery(QuerySpecification targetQuery)
1024+
{
1025+
this.targetQuery = Optional.of(targetQuery);
1026+
}
1027+
1028+
public Optional<QuerySpecification> getTargetQuery()
1029+
{
1030+
return this.targetQuery;
1031+
}
1032+
9871033
public Map<FunctionKind, Set<String>> getInvokedFunctions()
9881034
{
9891035
Map<FunctionKind, Set<String>> functionMap = new HashMap<>();

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/SemanticErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public enum SemanticErrorCode
8888

8989
SAMPLE_PERCENTAGE_OUT_OF_RANGE,
9090

91+
PROCEDURE_NOT_FOUND,
9192
INVALID_PROCEDURE_ARGUMENTS,
9293

9394
INVALID_SESSION_PROPERTY,

presto-main/src/main/java/com/facebook/presto/execution/CallTask.java

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,53 @@ public ListenableFuture<?> execute(Call call, TransactionManager transactionMana
8585
.orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName()));
8686
Procedure procedure = metadata.getProcedureRegistry().resolve(connectorId, toSchemaTableName(procedureName));
8787

88+
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
89+
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, parameterLookup);
90+
91+
// validate arguments
92+
MethodType methodType = procedure.getMethodHandle().type();
93+
for (int i = 0; i < procedure.getArguments().size(); i++) {
94+
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
95+
String name = procedure.getArguments().get(i).getName();
96+
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
97+
}
98+
}
99+
100+
// insert session argument
101+
List<Object> arguments = new ArrayList<>();
102+
Iterator<Object> valuesIterator = asList(values).iterator();
103+
for (Class<?> type : methodType.parameterList()) {
104+
if (ConnectorSession.class.isAssignableFrom(type)) {
105+
arguments.add(session.toConnectorSession(connectorId));
106+
}
107+
else {
108+
arguments.add(valuesIterator.next());
109+
}
110+
}
111+
112+
try {
113+
procedure.getMethodHandle().invokeWithArguments(arguments);
114+
}
115+
catch (Throwable t) {
116+
if (t instanceof InterruptedException) {
117+
Thread.currentThread().interrupt();
118+
}
119+
throwIfInstanceOf(t, PrestoException.class);
120+
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
121+
}
122+
123+
return immediateFuture(null);
124+
}
125+
126+
public static Object toTypeObjectValue(Session session, Type type, Object value)
127+
{
128+
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
129+
writeNativeValue(type, blockBuilder, value);
130+
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
131+
}
132+
133+
public static Object[] extractParameterValuesInOrder(Call call, Procedure procedure, Metadata metadata, Session session, Map<NodeRef<Parameter>, Expression> parameterLookup)
134+
{
88135
// map declared argument names to positions
89136
Map<String, Integer> positions = new HashMap<>();
90137
for (int i = 0; i < procedure.getArguments().size(); i++) {
@@ -131,7 +178,6 @@ else if (i < procedure.getArguments().size()) {
131178

132179
// get argument values
133180
Object[] values = new Object[procedure.getArguments().size()];
134-
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
135181
for (Entry<String, CallArgument> entry : names.entrySet()) {
136182
CallArgument callArgument = entry.getValue();
137183
int index = positions.get(entry.getKey());
@@ -156,45 +202,6 @@ else if (i < procedure.getArguments().size()) {
156202
}
157203
}
158204

159-
// validate arguments
160-
MethodType methodType = procedure.getMethodHandle().type();
161-
for (int i = 0; i < procedure.getArguments().size(); i++) {
162-
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
163-
String name = procedure.getArguments().get(i).getName();
164-
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
165-
}
166-
}
167-
168-
// insert session argument
169-
List<Object> arguments = new ArrayList<>();
170-
Iterator<Object> valuesIterator = asList(values).iterator();
171-
for (Class<?> type : methodType.parameterList()) {
172-
if (ConnectorSession.class.isAssignableFrom(type)) {
173-
arguments.add(session.toConnectorSession(connectorId));
174-
}
175-
else {
176-
arguments.add(valuesIterator.next());
177-
}
178-
}
179-
180-
try {
181-
procedure.getMethodHandle().invokeWithArguments(arguments);
182-
}
183-
catch (Throwable t) {
184-
if (t instanceof InterruptedException) {
185-
Thread.currentThread().interrupt();
186-
}
187-
throwIfInstanceOf(t, PrestoException.class);
188-
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
189-
}
190-
191-
return immediateFuture(null);
192-
}
193-
194-
private static Object toTypeObjectValue(Session session, Type type, Object value)
195-
{
196-
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
197-
writeNativeValue(type, blockBuilder, value);
198-
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
205+
return values;
199206
}
200207
}

presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,30 @@ public MetadataManager(
165165
ColumnPropertyManager columnPropertyManager,
166166
AnalyzePropertyManager analyzePropertyManager,
167167
TransactionManager transactionManager)
168+
{
169+
this(
170+
functionAndTypeManager,
171+
blockEncodingSerde,
172+
sessionPropertyManager,
173+
schemaPropertyManager,
174+
tablePropertyManager,
175+
columnPropertyManager,
176+
analyzePropertyManager,
177+
transactionManager,
178+
new ProcedureRegistry(functionAndTypeManager));
179+
}
180+
181+
@VisibleForTesting
182+
public MetadataManager(
183+
FunctionAndTypeManager functionAndTypeManager,
184+
BlockEncodingSerde blockEncodingSerde,
185+
SessionPropertyManager sessionPropertyManager,
186+
SchemaPropertyManager schemaPropertyManager,
187+
TablePropertyManager tablePropertyManager,
188+
ColumnPropertyManager columnPropertyManager,
189+
AnalyzePropertyManager analyzePropertyManager,
190+
TransactionManager transactionManager,
191+
IProcedureRegistry procedureRegistry)
168192
{
169193
this(
170194
createTestingViewCodec(functionAndTypeManager),
@@ -176,7 +200,7 @@ public MetadataManager(
176200
analyzePropertyManager,
177201
transactionManager,
178202
functionAndTypeManager,
179-
new ProcedureRegistry(functionAndTypeManager));
203+
procedureRegistry);
180204
}
181205

182206
@Inject
@@ -240,6 +264,21 @@ public static MetadataManager createTestMetadataManager(TransactionManager trans
240264
transactionManager);
241265
}
242266

267+
public static MetadataManager createTestMetadataManager(TransactionManager transactionManager, FeaturesConfig featuresConfig, IProcedureRegistry procedureRegistry)
268+
{
269+
BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
270+
return new MetadataManager(
271+
new FunctionAndTypeManager(transactionManager, blockEncodingManager, featuresConfig, new HandleResolver(), ImmutableSet.of()),
272+
blockEncodingManager,
273+
new SessionPropertyManager(),
274+
new SchemaPropertyManager(),
275+
new TablePropertyManager(),
276+
new ColumnPropertyManager(),
277+
new AnalyzePropertyManager(),
278+
transactionManager,
279+
procedureRegistry);
280+
}
281+
243282
@Override
244283
public final void verifyComparableOrderableContract()
245284
{

presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.sql.analyzer;
1515

1616
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.QualifiedObjectName;
1718
import com.facebook.presto.metadata.Metadata;
1819
import com.facebook.presto.spi.WarningCollector;
1920
import com.facebook.presto.spi.function.FunctionHandle;
@@ -108,11 +109,20 @@ public Analysis analyze(Statement statement, boolean isDescribe)
108109
}
109110

110111
public Analysis analyzeSemantic(Statement statement, boolean isDescribe)
112+
{
113+
return analyzeSemantic(statement, Optional.empty(), isDescribe);
114+
}
115+
116+
public Analysis analyzeSemantic(
117+
Statement statement,
118+
Optional<QualifiedObjectName> procedureName,
119+
boolean isDescribe)
111120
{
112121
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, parameterLookup, accessControl, warningCollector);
113122
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);
114123

115124
metadataExtractor.populateMetadataHandle(session, rewrittenStatement, analysis.getMetadataHandle());
125+
analysis.setProcedureName(procedureName);
116126
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector);
117127
analyzer.analyze(rewrittenStatement, Optional.empty());
118128
analyzeForUtilizedColumns(analysis, analysis.getStatement());

presto-main/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryAnalyzer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public QueryAnalysis analyze(AnalyzerContext analyzerContext, PreparedQuery prep
9090
session.getWarningCollector(),
9191
Optional.of(metadataExtractorExecutor));
9292

93-
Analysis analysis = analyzer.analyzeSemantic(((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(), false);
93+
Analysis analysis = analyzer.analyzeSemantic(
94+
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(),
95+
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getDistributedProcedureName(),
96+
false);
9497
return new BuiltInQueryAnalysis(analysis);
9598
}
9699

0 commit comments

Comments
 (0)