Skip to content

Commit b4ceba0

Browse files
committed
Support and handle call distributed procedure statement in preparer
1 parent 00d76f3 commit b4ceba0

File tree

11 files changed

+328
-42
lines changed

11 files changed

+328
-42
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryPreparer.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,25 @@
1313
*/
1414
package com.facebook.presto.sql.analyzer;
1515

16+
import com.facebook.presto.common.QualifiedObjectName;
1617
import com.facebook.presto.common.analyzer.PreparedQuery;
1718
import com.facebook.presto.common.resourceGroups.QueryType;
19+
import com.facebook.presto.spi.ConnectorId;
1820
import com.facebook.presto.spi.PrestoException;
1921
import com.facebook.presto.spi.PrestoWarning;
22+
import com.facebook.presto.spi.SchemaTableName;
2023
import com.facebook.presto.spi.WarningCollector;
2124
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
2225
import com.facebook.presto.spi.analyzer.QueryPreparer;
26+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2327
import com.facebook.presto.sql.analyzer.utils.StatementUtils;
2428
import com.facebook.presto.sql.parser.SqlParser;
29+
import com.facebook.presto.sql.tree.Call;
2530
import com.facebook.presto.sql.tree.Execute;
2631
import com.facebook.presto.sql.tree.Explain;
2732
import com.facebook.presto.sql.tree.ExplainType;
2833
import com.facebook.presto.sql.tree.Expression;
34+
import com.facebook.presto.sql.tree.QualifiedName;
2935
import com.facebook.presto.sql.tree.Statement;
3036
import com.google.common.collect.ImmutableList;
3137
import com.google.common.collect.ImmutableSet;
@@ -36,13 +42,15 @@
3642
import java.util.Optional;
3743

3844
import static com.facebook.presto.common.WarningHandlingLevel.AS_ERROR;
45+
import static com.facebook.presto.common.resourceGroups.QueryType.CALL_DISTRIBUTED_PROCEDURE;
3946
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
4047
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
4148
import static com.facebook.presto.spi.StandardErrorCode.WARNING_AS_ERROR;
4249
import static com.facebook.presto.sql.SqlFormatter.formatSql;
4350
import static com.facebook.presto.sql.analyzer.ConstantExpressionVerifier.verifyExpressionIsConstant;
4451
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PARAMETER_USAGE;
4552
import static com.facebook.presto.sql.analyzer.utils.AnalyzerUtil.createParsingOptions;
53+
import static com.facebook.presto.sql.analyzer.utils.MetadataUtils.createQualifiedObjectName;
4654
import static com.facebook.presto.sql.analyzer.utils.ParameterExtractor.getParameterCount;
4755
import static com.facebook.presto.sql.tree.ExplainType.Type.VALIDATE;
4856
import static java.lang.String.format;
@@ -56,11 +64,15 @@ public class BuiltInQueryPreparer
5664
implements QueryPreparer
5765
{
5866
private final SqlParser sqlParser;
67+
private final IProcedureRegistry procedureRegistry;
5968

6069
@Inject
61-
public BuiltInQueryPreparer(SqlParser sqlParser)
70+
public BuiltInQueryPreparer(
71+
SqlParser sqlParser,
72+
IProcedureRegistry procedureRegistry)
6273
{
6374
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
75+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
6476
}
6577

6678
@Override
@@ -87,6 +99,18 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
8799
statement = sqlParser.createStatement(query, createParsingOptions(analyzerOptions));
88100
}
89101

102+
Optional<QualifiedObjectName> distributedProcedureName = Optional.empty();
103+
if (statement instanceof Call) {
104+
QualifiedName qualifiedName = ((Call) statement).getName();
105+
QualifiedObjectName qualifiedObjectName = createQualifiedObjectName(analyzerOptions.getSessionCatalogName(), analyzerOptions.getSessionSchemaName(),
106+
statement, qualifiedName, (catalogName, objectName) -> objectName);
107+
if (procedureRegistry.isDistributedProcedure(
108+
new ConnectorId(qualifiedObjectName.getCatalogName()),
109+
new SchemaTableName(qualifiedObjectName.getSchemaName(), qualifiedObjectName.getObjectName()))) {
110+
distributedProcedureName = Optional.of(qualifiedObjectName);
111+
}
112+
}
113+
90114
if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
91115
Statement innerStatement = ((Explain) statement).getStatement();
92116
Optional<QueryType> innerQueryType = StatementUtils.getQueryType(innerStatement.getClass());
@@ -103,7 +127,7 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
103127
if (analyzerOptions.isLogFormattedQueryEnabled()) {
104128
formattedQuery = Optional.of(getFormattedQuery(statement, parameters));
105129
}
106-
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql);
130+
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql, distributedProcedureName);
107131
}
108132

109133
private static String getFormattedQuery(Statement statement, List<Expression> parameters)
@@ -131,13 +155,19 @@ public static class BuiltInPreparedQuery
131155
private final Statement statement;
132156
private final Statement wrappedStatement;
133157
private final List<Expression> parameters;
158+
private final Optional<QualifiedObjectName> distributedProcedureName;
134159

135-
public BuiltInPreparedQuery(Statement wrappedStatement, Statement statement, List<Expression> parameters, Optional<String> formattedQuery, Optional<String> prepareSql)
160+
public BuiltInPreparedQuery(
161+
Statement wrappedStatement,
162+
Statement statement, List<Expression> parameters,
163+
Optional<String> formattedQuery, Optional<String> prepareSql,
164+
Optional<QualifiedObjectName> distributedProcedureName)
136165
{
137166
super(formattedQuery, prepareSql);
138167
this.wrappedStatement = requireNonNull(wrappedStatement, "wrappedStatement is null");
139168
this.statement = requireNonNull(statement, "statement is null");
140169
this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null"));
170+
this.distributedProcedureName = requireNonNull(distributedProcedureName, "distributedProcedureName is null");
141171
}
142172

143173
public Statement getStatement()
@@ -157,9 +187,17 @@ public List<Expression> getParameters()
157187

158188
public Optional<QueryType> getQueryType()
159189
{
190+
if (getDistributedProcedureName().isPresent()) {
191+
return Optional.of(CALL_DISTRIBUTED_PROCEDURE);
192+
}
160193
return StatementUtils.getQueryType(statement.getClass());
161194
}
162195

196+
public Optional<QualifiedObjectName> getDistributedProcedureName()
197+
{
198+
return this.distributedProcedureName;
199+
}
200+
163201
public boolean isTransactionControlStatement()
164202
{
165203
return StatementUtils.isTransactionControlStatement(getStatement());
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.analyzer.utils;
15+
16+
import com.facebook.presto.common.QualifiedObjectName;
17+
import com.facebook.presto.spi.PrestoException;
18+
import com.facebook.presto.sql.analyzer.SemanticException;
19+
import com.facebook.presto.sql.tree.Node;
20+
import com.facebook.presto.sql.tree.QualifiedName;
21+
import com.google.common.collect.Lists;
22+
23+
import java.util.List;
24+
import java.util.Optional;
25+
import java.util.function.BiFunction;
26+
27+
import static com.facebook.presto.spi.StandardErrorCode.SYNTAX_ERROR;
28+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.CATALOG_NOT_SPECIFIED;
29+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
30+
import static java.lang.String.format;
31+
import static java.util.Locale.ENGLISH;
32+
import static java.util.Objects.requireNonNull;
33+
34+
public class MetadataUtils
35+
{
36+
private MetadataUtils()
37+
{}
38+
39+
public static QualifiedObjectName createQualifiedObjectName(Optional<String> sessionCatalogName, Optional<String> sessionSchemaName, Node node, QualifiedName name,
40+
BiFunction<String, String, String> normalizer)
41+
{
42+
requireNonNull(sessionCatalogName, "sessionCatalogName is null");
43+
requireNonNull(sessionSchemaName, "sessionSchemaName is null");
44+
requireNonNull(name, "name is null");
45+
if (name.getParts().size() > 3) {
46+
throw new PrestoException(SYNTAX_ERROR, format("Too many dots in table name: %s", name));
47+
}
48+
49+
List<String> parts = Lists.reverse(name.getParts());
50+
String objectName = parts.get(0);
51+
String schemaName = (parts.size() > 1) ? parts.get(1) : sessionSchemaName.orElseThrow(() ->
52+
new SemanticException(SCHEMA_NOT_SPECIFIED, node, "Schema must be specified when session schema is not set"));
53+
String catalogName = (parts.size() > 2) ? parts.get(2) : sessionCatalogName.orElseThrow(() ->
54+
new SemanticException(CATALOG_NOT_SPECIFIED, node, "Catalog must be specified when session catalog is not set"));
55+
56+
catalogName = catalogName.toLowerCase(ENGLISH);
57+
schemaName = normalizer.apply(catalogName, schemaName);
58+
objectName = normalizer.apply(catalogName, objectName);
59+
return new QualifiedObjectName(catalogName, schemaName, objectName);
60+
}
61+
}

presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,108 @@
1313
*/
1414
package com.facebook.presto.sql.analyzer;
1515

16+
import com.facebook.presto.common.resourceGroups.QueryType;
17+
import com.facebook.presto.spi.ConnectorId;
1618
import com.facebook.presto.spi.PrestoException;
1719
import com.facebook.presto.spi.WarningCollector;
1820
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
21+
import com.facebook.presto.spi.procedure.DistributedProcedure;
22+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
23+
import com.facebook.presto.spi.procedure.Procedure;
24+
import com.facebook.presto.spi.procedure.TestProcedureRegistry;
1925
import com.facebook.presto.sql.analyzer.BuiltInQueryPreparer.BuiltInPreparedQuery;
2026
import com.facebook.presto.sql.parser.SqlParser;
2127
import com.facebook.presto.sql.tree.AllColumns;
28+
import com.facebook.presto.sql.tree.Call;
29+
import com.facebook.presto.sql.tree.CallArgument;
2230
import com.facebook.presto.sql.tree.QualifiedName;
31+
import com.facebook.presto.sql.tree.StringLiteral;
2332
import com.google.common.collect.ImmutableMap;
33+
import org.testng.annotations.BeforeClass;
2434
import org.testng.annotations.Test;
2535

36+
import java.util.ArrayList;
37+
import java.util.List;
2638
import java.util.Map;
2739
import java.util.Optional;
2840

41+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
2942
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
43+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
44+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
3045
import static com.facebook.presto.sql.QueryUtil.selectList;
3146
import static com.facebook.presto.sql.QueryUtil.simpleQuery;
3247
import static com.facebook.presto.sql.QueryUtil.table;
3348
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PARAMETER_USAGE;
3449
import static org.testng.Assert.assertEquals;
50+
import static org.testng.Assert.assertTrue;
3551
import static org.testng.Assert.fail;
3652

3753
public class TestBuiltInQueryPreparer
3854
{
3955
private static final SqlParser SQL_PARSER = new SqlParser();
40-
private static final BuiltInQueryPreparer QUERY_PREPARER = new BuiltInQueryPreparer(SQL_PARSER);
4156
private static final Map<String, String> emptyPreparedStatements = ImmutableMap.of();
4257
private static final AnalyzerOptions testAnalyzerOptions = AnalyzerOptions.builder().build();
58+
private static IProcedureRegistry procedureRegistry;
59+
private static BuiltInQueryPreparer queryPreparer;
60+
61+
@BeforeClass
62+
public void setup()
63+
{
64+
procedureRegistry = new TestProcedureRegistry();
65+
List<Procedure.Argument> arguments = new ArrayList<>();
66+
arguments.add(new Procedure.Argument(SCHEMA, VARCHAR));
67+
arguments.add(new Procedure.Argument(TABLE_NAME, VARCHAR));
68+
69+
List<Procedure> procedures = new ArrayList<>();
70+
procedures.add(new Procedure("system", "fun", arguments));
71+
procedures.add(new DistributedProcedure("system", "distributed_fun",
72+
arguments,
73+
(session, transactionContext, procedureHandle, fragments) -> null,
74+
(transactionContext, procedureHandle, fragments) -> {}));
75+
procedureRegistry.addProcedures(new ConnectorId("test"), procedures);
76+
queryPreparer = new BuiltInQueryPreparer(SQL_PARSER, procedureRegistry);
77+
}
4378

4479
@Test
4580
public void testSelectStatement()
4681
{
47-
BuiltInPreparedQuery preparedQuery = QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "SELECT * FROM foo", emptyPreparedStatements, WarningCollector.NOOP);
82+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "SELECT * FROM foo", emptyPreparedStatements, WarningCollector.NOOP);
4883
assertEquals(preparedQuery.getStatement(),
4984
simpleQuery(selectList(new AllColumns()), table(QualifiedName.of("foo"))));
5085
}
5186

87+
@Test
88+
public void testCallProcedureStatement()
89+
{
90+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "call test.system.fun('a', 'b')", emptyPreparedStatements, WarningCollector.NOOP);
91+
List<CallArgument> arguments = new ArrayList<>();
92+
arguments.add(new CallArgument(new StringLiteral("a")));
93+
arguments.add(new CallArgument(new StringLiteral("b")));
94+
assertEquals(preparedQuery.getStatement(),
95+
new Call(QualifiedName.of("test", "system", "fun"), arguments));
96+
assertTrue(preparedQuery.getQueryType().isPresent());
97+
assertEquals(preparedQuery.getQueryType().get(), QueryType.DATA_DEFINITION);
98+
}
99+
100+
@Test
101+
public void testCallDistributedProcedureStatement()
102+
{
103+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "call test.system.distributed_fun('a', 'b')", emptyPreparedStatements, WarningCollector.NOOP);
104+
List<CallArgument> arguments = new ArrayList<>();
105+
arguments.add(new CallArgument(new StringLiteral("a")));
106+
arguments.add(new CallArgument(new StringLiteral("b")));
107+
assertEquals(preparedQuery.getStatement(),
108+
new Call(QualifiedName.of("test", "system", "distributed_fun"), arguments));
109+
assertTrue(preparedQuery.getQueryType().isPresent());
110+
assertEquals(preparedQuery.getQueryType().get(), QueryType.CALL_DISTRIBUTED_PROCEDURE);
111+
}
112+
52113
@Test
53114
public void testExecuteStatement()
54115
{
55116
Map<String, String> preparedStatements = ImmutableMap.of("my_query", "SELECT * FROM foo");
56-
BuiltInPreparedQuery preparedQuery = QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "EXECUTE my_query", preparedStatements, WarningCollector.NOOP);
117+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "EXECUTE my_query", preparedStatements, WarningCollector.NOOP);
57118
assertEquals(preparedQuery.getStatement(),
58119
simpleQuery(selectList(new AllColumns()), table(QualifiedName.of("foo"))));
59120
}
@@ -62,7 +123,7 @@ public void testExecuteStatement()
62123
public void testExecuteStatementDoesNotExist()
63124
{
64125
try {
65-
QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "execute my_query", emptyPreparedStatements, WarningCollector.NOOP);
126+
queryPreparer.prepareQuery(testAnalyzerOptions, "execute my_query", emptyPreparedStatements, WarningCollector.NOOP);
66127
fail("expected exception");
67128
}
68129
catch (PrestoException e) {
@@ -75,7 +136,7 @@ public void testTooManyParameters()
75136
{
76137
try {
77138
Map<String, String> preparedStatements = ImmutableMap.of("my_query", "SELECT * FROM foo where col1 = ?");
78-
QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1,2", preparedStatements, WarningCollector.NOOP);
139+
queryPreparer.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1,2", preparedStatements, WarningCollector.NOOP);
79140
fail("expected exception");
80141
}
81142
catch (SemanticException e) {
@@ -88,7 +149,7 @@ public void testTooFewParameters()
88149
{
89150
try {
90151
Map<String, String> preparedStatements = ImmutableMap.of("my_query", "SELECT ? FROM foo where col1 = ?");
91-
QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1", preparedStatements, WarningCollector.NOOP);
152+
queryPreparer.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1", preparedStatements, WarningCollector.NOOP);
92153
fail("expected exception");
93154
}
94155
catch (SemanticException e) {
@@ -100,7 +161,7 @@ public void testTooFewParameters()
100161
public void testFormattedQuery()
101162
{
102163
AnalyzerOptions analyzerOptions = AnalyzerOptions.builder().setLogFormattedQueryEnabled(true).build();
103-
BuiltInPreparedQuery preparedQuery = QUERY_PREPARER.prepareQuery(
164+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(
104165
analyzerOptions,
105166
"PREPARE test FROM SELECT * FROM foo where col1 = ?",
106167
emptyPreparedStatements,
@@ -112,7 +173,7 @@ public void testFormattedQuery()
112173
" foo\n" +
113174
" WHERE (col1 = ?)\n"));
114175

115-
preparedQuery = QUERY_PREPARER.prepareQuery(
176+
preparedQuery = queryPreparer.prepareQuery(
116177
analyzerOptions,
117178
"PREPARE test FROM SELECT * FROM foo",
118179
emptyPreparedStatements,

presto-common/src/main/java/com/facebook/presto/common/resourceGroups/QueryType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ public enum QueryType
2828
SELECT(7),
2929
CONTROL(8),
3030
UPDATE(9),
31-
MERGE(10)
31+
MERGE(10),
32+
CALL_DISTRIBUTED_PROCEDURE(11)
3233
/**/;
3334

3435
private final int value;

0 commit comments

Comments
 (0)