Skip to content

Commit 1eabce1

Browse files
committed
Analyze and plan for call distributed procedure statement
1 parent b4ceba0 commit 1eabce1

File tree

13 files changed

+609
-49
lines changed

13 files changed

+609
-49
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ public class Analysis
174174
private final Multiset<ColumnMaskScopeEntry> columnMaskScopes = HashMultiset.create();
175175
private final Map<NodeRef<Table>, Map<String, Expression>> columnMasks = new LinkedHashMap<>();
176176

177+
// for call distributed procedure
178+
private Optional<QualifiedObjectName> procedureName;
179+
private Optional<Object[]> procedureArguments;
180+
private Optional<TableHandle> callTarget = Optional.empty();
181+
private Optional<QuerySpecification> targetQuery = Optional.empty();
182+
177183
// for create table
178184
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
179185
private Map<String, Expression> createTableProperties = ImmutableMap.of();
@@ -666,6 +672,36 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
666672
return createTableDestination;
667673
}
668674

675+
public Optional<QualifiedObjectName> getProcedureName()
676+
{
677+
return procedureName;
678+
}
679+
680+
public void setProcedureName(Optional<QualifiedObjectName> procedureName)
681+
{
682+
this.procedureName = procedureName;
683+
}
684+
685+
public Optional<Object[]> getProcedureArguments()
686+
{
687+
return procedureArguments;
688+
}
689+
690+
public void setProcedureArguments(Optional<Object[]> procedureArguments)
691+
{
692+
this.procedureArguments = procedureArguments;
693+
}
694+
695+
public Optional<TableHandle> getCallTarget()
696+
{
697+
return callTarget;
698+
}
699+
700+
public void setCallTarget(TableHandle callTarget)
701+
{
702+
this.callTarget = Optional.of(callTarget);
703+
}
704+
669705
public Optional<TableHandle> getAnalyzeTarget()
670706
{
671707
return analyzeTarget;
@@ -1020,6 +1056,16 @@ public Optional<QuerySpecification> getCurrentQuerySpecification()
10201056
return currentQuerySpecification;
10211057
}
10221058

1059+
public void setTargetQuery(QuerySpecification targetQuery)
1060+
{
1061+
this.targetQuery = Optional.of(targetQuery);
1062+
}
1063+
1064+
public Optional<QuerySpecification> getTargetQuery()
1065+
{
1066+
return this.targetQuery;
1067+
}
1068+
10231069
public Map<FunctionKind, Set<String>> getInvokedFunctions()
10241070
{
10251071
Map<FunctionKind, Set<String>> functionMap = new HashMap<>();

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/SemanticErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public enum SemanticErrorCode
9090

9191
SAMPLE_PERCENTAGE_OUT_OF_RANGE,
9292

93+
PROCEDURE_NOT_FOUND,
9394
INVALID_PROCEDURE_ARGUMENTS,
9495

9596
INVALID_SESSION_PROPERTY,

presto-main-base/src/main/java/com/facebook/presto/execution/CallTask.java

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,53 @@ public ListenableFuture<?> execute(Call call, TransactionManager transactionMana
8585
ConnectorId connectorId = getConnectorIdOrThrow(session, metadata, procedureName.getCatalogName(), call, catalogError);
8686
Procedure procedure = metadata.getProcedureRegistry().resolve(connectorId, toSchemaTableName(procedureName));
8787

88+
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
89+
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, parameterLookup);
90+
91+
// validate arguments
92+
MethodType methodType = procedure.getMethodHandle().type();
93+
for (int i = 0; i < procedure.getArguments().size(); i++) {
94+
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
95+
String name = procedure.getArguments().get(i).getName();
96+
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
97+
}
98+
}
99+
100+
// insert session argument
101+
List<Object> arguments = new ArrayList<>();
102+
Iterator<Object> valuesIterator = asList(values).iterator();
103+
for (Class<?> type : methodType.parameterList()) {
104+
if (ConnectorSession.class.isAssignableFrom(type)) {
105+
arguments.add(session.toConnectorSession(connectorId));
106+
}
107+
else {
108+
arguments.add(valuesIterator.next());
109+
}
110+
}
111+
112+
try {
113+
procedure.getMethodHandle().invokeWithArguments(arguments);
114+
}
115+
catch (Throwable t) {
116+
if (t instanceof InterruptedException) {
117+
Thread.currentThread().interrupt();
118+
}
119+
throwIfInstanceOf(t, PrestoException.class);
120+
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
121+
}
122+
123+
return immediateFuture(null);
124+
}
125+
126+
public static Object toTypeObjectValue(Session session, Type type, Object value)
127+
{
128+
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
129+
writeNativeValue(type, blockBuilder, value);
130+
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
131+
}
132+
133+
public static Object[] extractParameterValuesInOrder(Call call, Procedure procedure, Metadata metadata, Session session, Map<NodeRef<Parameter>, Expression> parameterLookup)
134+
{
88135
// map declared argument names to positions
89136
Map<String, Integer> positions = new HashMap<>();
90137
for (int i = 0; i < procedure.getArguments().size(); i++) {
@@ -131,7 +178,6 @@ else if (i < procedure.getArguments().size()) {
131178

132179
// get argument values
133180
Object[] values = new Object[procedure.getArguments().size()];
134-
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
135181
for (Entry<String, CallArgument> entry : names.entrySet()) {
136182
CallArgument callArgument = entry.getValue();
137183
int index = positions.get(entry.getKey());
@@ -156,45 +202,6 @@ else if (i < procedure.getArguments().size()) {
156202
}
157203
}
158204

159-
// validate arguments
160-
MethodType methodType = procedure.getMethodHandle().type();
161-
for (int i = 0; i < procedure.getArguments().size(); i++) {
162-
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
163-
String name = procedure.getArguments().get(i).getName();
164-
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
165-
}
166-
}
167-
168-
// insert session argument
169-
List<Object> arguments = new ArrayList<>();
170-
Iterator<Object> valuesIterator = asList(values).iterator();
171-
for (Class<?> type : methodType.parameterList()) {
172-
if (ConnectorSession.class.isAssignableFrom(type)) {
173-
arguments.add(session.toConnectorSession(connectorId));
174-
}
175-
else {
176-
arguments.add(valuesIterator.next());
177-
}
178-
}
179-
180-
try {
181-
procedure.getMethodHandle().invokeWithArguments(arguments);
182-
}
183-
catch (Throwable t) {
184-
if (t instanceof InterruptedException) {
185-
Thread.currentThread().interrupt();
186-
}
187-
throwIfInstanceOf(t, PrestoException.class);
188-
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
189-
}
190-
191-
return immediateFuture(null);
192-
}
193-
194-
private static Object toTypeObjectValue(Session session, Type type, Object value)
195-
{
196-
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
197-
writeNativeValue(type, blockBuilder, value);
198-
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
205+
return values;
199206
}
200207
}

presto-main-base/src/main/java/com/facebook/presto/metadata/MetadataManager.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,30 @@ public MetadataManager(
173173
ColumnPropertyManager columnPropertyManager,
174174
AnalyzePropertyManager analyzePropertyManager,
175175
TransactionManager transactionManager)
176+
{
177+
this(
178+
functionAndTypeManager,
179+
blockEncodingSerde,
180+
sessionPropertyManager,
181+
schemaPropertyManager,
182+
tablePropertyManager,
183+
columnPropertyManager,
184+
analyzePropertyManager,
185+
transactionManager,
186+
new ProcedureRegistry(functionAndTypeManager));
187+
}
188+
189+
@VisibleForTesting
190+
public MetadataManager(
191+
FunctionAndTypeManager functionAndTypeManager,
192+
BlockEncodingSerde blockEncodingSerde,
193+
SessionPropertyManager sessionPropertyManager,
194+
SchemaPropertyManager schemaPropertyManager,
195+
TablePropertyManager tablePropertyManager,
196+
ColumnPropertyManager columnPropertyManager,
197+
AnalyzePropertyManager analyzePropertyManager,
198+
TransactionManager transactionManager,
199+
IProcedureRegistry procedureRegistry)
176200
{
177201
this(
178202
createTestingViewCodec(functionAndTypeManager),
@@ -184,7 +208,7 @@ public MetadataManager(
184208
analyzePropertyManager,
185209
transactionManager,
186210
functionAndTypeManager,
187-
new ProcedureRegistry(functionAndTypeManager));
211+
procedureRegistry);
188212
}
189213

190214
@Inject
@@ -258,6 +282,21 @@ public static MetadataManager createTestMetadataManager(TransactionManager trans
258282
transactionManager);
259283
}
260284

285+
public static MetadataManager createTestMetadataManager(TransactionManager transactionManager, FeaturesConfig featuresConfig, FunctionsConfig functionsConfig, IProcedureRegistry procedureRegistry)
286+
{
287+
BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
288+
return new MetadataManager(
289+
new FunctionAndTypeManager(transactionManager, new TableFunctionRegistry(), blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()),
290+
blockEncodingManager,
291+
createTestingSessionPropertyManager(),
292+
new SchemaPropertyManager(),
293+
new TablePropertyManager(),
294+
new ColumnPropertyManager(),
295+
new AnalyzePropertyManager(),
296+
transactionManager,
297+
procedureRegistry);
298+
}
299+
261300
@Override
262301
public final void verifyComparableOrderableContract()
263302
{

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.sql.analyzer;
1515

1616
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.QualifiedObjectName;
1718
import com.facebook.presto.metadata.Metadata;
1819
import com.facebook.presto.spi.WarningCollector;
1920
import com.facebook.presto.spi.analyzer.AccessControlReferences;
@@ -116,11 +117,20 @@ public Analysis analyze(Statement statement, boolean isDescribe)
116117
}
117118

118119
public Analysis analyzeSemantic(Statement statement, boolean isDescribe)
120+
{
121+
return analyzeSemantic(statement, Optional.empty(), isDescribe);
122+
}
123+
124+
public Analysis analyzeSemantic(
125+
Statement statement,
126+
Optional<QualifiedObjectName> procedureName,
127+
boolean isDescribe)
119128
{
120129
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, parameterLookup, accessControl, warningCollector, query);
121130
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);
122131

123132
metadataExtractor.populateMetadataHandle(session, rewrittenStatement, analysis.getMetadataHandle());
133+
analysis.setProcedureName(procedureName);
124134
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector);
125135
analyzer.analyze(rewrittenStatement, Optional.empty());
126136
analyzeForUtilizedColumns(analysis, analysis.getStatement(), warningCollector);

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryAnalyzer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ public QueryAnalysis analyze(AnalyzerContext analyzerContext, PreparedQuery prep
9292
Optional.of(metadataExtractorExecutor),
9393
analyzerContext.getQuery());
9494

95-
Analysis analysis = analyzer.analyzeSemantic(((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(), false);
95+
Analysis analysis = analyzer.analyzeSemantic(
96+
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(),
97+
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getDistributedProcedureName(),
98+
false);
9699
return new BuiltInQueryAnalysis(analysis);
97100
}
98101

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import com.facebook.presto.spi.function.table.TableArgument;
7171
import com.facebook.presto.spi.function.table.TableArgumentSpecification;
7272
import com.facebook.presto.spi.function.table.TableFunctionAnalysis;
73+
import com.facebook.presto.spi.procedure.DistributedProcedure;
7374
import com.facebook.presto.spi.relation.DomainTranslator;
7475
import com.facebook.presto.spi.relation.RowExpression;
7576
import com.facebook.presto.spi.security.AccessControl;
@@ -238,6 +239,7 @@
238239
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
239240
import static com.facebook.presto.common.type.UnknownType.UNKNOWN;
240241
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
242+
import static com.facebook.presto.execution.CallTask.extractParameterValuesInOrder;
241243
import static com.facebook.presto.metadata.MetadataUtil.createQualifiedObjectName;
242244
import static com.facebook.presto.metadata.MetadataUtil.getConnectorIdOrThrow;
243245
import static com.facebook.presto.metadata.MetadataUtil.toSchemaTableName;
@@ -294,6 +296,7 @@
294296
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISMATCHED_COLUMN_ALIASES;
295297
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISMATCHED_SET_COLUMN_TYPES;
296298
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_ATTRIBUTE;
299+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_CATALOG;
297300
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_COLUMN;
298301
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_MATERIALIZED_VIEW;
299302
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_SCHEMA;
@@ -304,6 +307,7 @@
304307
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.NON_NUMERIC_SAMPLE_PERCENTAGE;
305308
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.NOT_SUPPORTED;
306309
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.ORDER_BY_MUST_BE_IN_SELECT;
310+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.PROCEDURE_NOT_FOUND;
307311
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_ALREADY_EXISTS;
308312
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_AMBIGUOUS_RETURN_TYPE;
309313
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_COLUMN_NOT_FOUND;
@@ -403,7 +407,7 @@ public Scope analyze(Node node, Scope outerQueryScope)
403407

404408
public Scope analyze(Node node, Optional<Scope> outerQueryScope)
405409
{
406-
return new Visitor(outerQueryScope, warningCollector).process(node, Optional.empty());
410+
return new Visitor(metadata, session, outerQueryScope, warningCollector).process(node, Optional.empty());
407411
}
408412

409413
/**
@@ -414,11 +418,19 @@ public Scope analyze(Node node, Optional<Scope> outerQueryScope)
414418
private class Visitor
415419
extends DefaultTraversalVisitor<Scope, Optional<Scope>>
416420
{
421+
private final Metadata metadata;
422+
private final Session session;
417423
private final Optional<Scope> outerQueryScope;
418424
private final WarningCollector warningCollector;
419425

420-
private Visitor(Optional<Scope> outerQueryScope, WarningCollector warningCollector)
426+
private Visitor(
427+
Metadata metadata,
428+
Session session,
429+
Optional<Scope> outerQueryScope,
430+
WarningCollector warningCollector)
421431
{
432+
this.metadata = requireNonNull(metadata, "metadata is null");
433+
this.session = requireNonNull(session, "session is null");
422434
this.outerQueryScope = requireNonNull(outerQueryScope, "outerQueryScope is null");
423435
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
424436
}
@@ -1180,9 +1192,46 @@ protected Scope visitRevoke(Revoke node, Optional<Scope> scope)
11801192
}
11811193

11821194
@Override
1183-
protected Scope visitCall(Call node, Optional<Scope> scope)
1195+
protected Scope visitCall(Call call, Optional<Scope> scope)
11841196
{
1185-
return createAndAssignScope(node, scope);
1197+
if (analysis.isDescribe()) {
1198+
return createAndAssignScope(call, scope);
1199+
}
1200+
QualifiedObjectName procedureName = analysis.getProcedureName()
1201+
.orElse(createQualifiedObjectName(session, call, call.getName(), metadata));
1202+
ConnectorId connectorId = metadata.getCatalogHandle(session, procedureName.getCatalogName())
1203+
.orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName()));
1204+
1205+
if (!metadata.getProcedureRegistry().isDistributedProcedure(connectorId, toSchemaTableName(procedureName))) {
1206+
throw new SemanticException(PROCEDURE_NOT_FOUND, "Distributed procedure not registered: " + procedureName);
1207+
}
1208+
DistributedProcedure procedure = metadata.getProcedureRegistry().resolveDistributed(connectorId, toSchemaTableName(procedureName));
1209+
1210+
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, analysis.getParameters());
1211+
QualifiedName qualifiedName = QualifiedName.of(procedure.getSchema(values), procedure.getTableName(values));
1212+
QualifiedObjectName tableName = createQualifiedObjectName(session, call, qualifiedName, metadata);
1213+
1214+
analysis.setUpdateType("CALL");
1215+
analysis.setProcedureArguments(Optional.of(values));
1216+
1217+
String filter = procedure.getFilter(values);
1218+
Expression filterExpression = sqlParser.createExpression(filter);
1219+
QuerySpecification querySpecification = new QuerySpecification(
1220+
selectList(new AllColumns()),
1221+
Optional.of(new Table(qualifiedName)),
1222+
Optional.of(filterExpression),
1223+
Optional.empty(),
1224+
Optional.empty(),
1225+
Optional.empty(),
1226+
Optional.empty(),
1227+
Optional.empty());
1228+
analyze(querySpecification, scope);
1229+
analysis.setTargetQuery(querySpecification);
1230+
1231+
TableHandle tableHandle = metadata.getHandleVersion(session, tableName, Optional.empty())
1232+
.orElseThrow(() -> (new SemanticException(MISSING_TABLE, call, "Table '%s' does not exist", tableName)));
1233+
analysis.setCallTarget(tableHandle);
1234+
return createAndAssignScope(call, scope, Field.newUnqualified(Optional.empty(), "rows", BIGINT));
11861235
}
11871236

11881237
private void validateProperties(List<Property> properties, Optional<Scope> scope)

0 commit comments

Comments
 (0)