Skip to content

Commit 48a2c5f

Browse files
committed
Support and handle call distributed procedure statement in preparer
1 parent 842795b commit 48a2c5f

File tree

11 files changed

+319
-37
lines changed

11 files changed

+319
-37
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryPreparer.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,25 @@
1313
*/
1414
package com.facebook.presto.sql.analyzer;
1515

16+
import com.facebook.presto.common.QualifiedObjectName;
1617
import com.facebook.presto.common.analyzer.PreparedQuery;
1718
import com.facebook.presto.common.resourceGroups.QueryType;
19+
import com.facebook.presto.spi.ConnectorId;
1820
import com.facebook.presto.spi.PrestoException;
1921
import com.facebook.presto.spi.PrestoWarning;
22+
import com.facebook.presto.spi.SchemaTableName;
2023
import com.facebook.presto.spi.WarningCollector;
2124
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
2225
import com.facebook.presto.spi.analyzer.QueryPreparer;
26+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2327
import com.facebook.presto.sql.analyzer.utils.StatementUtils;
2428
import com.facebook.presto.sql.parser.SqlParser;
29+
import com.facebook.presto.sql.tree.Call;
2530
import com.facebook.presto.sql.tree.Execute;
2631
import com.facebook.presto.sql.tree.Explain;
2732
import com.facebook.presto.sql.tree.ExplainType;
2833
import com.facebook.presto.sql.tree.Expression;
34+
import com.facebook.presto.sql.tree.QualifiedName;
2935
import com.facebook.presto.sql.tree.Statement;
3036
import com.google.common.collect.ImmutableList;
3137
import com.google.common.collect.ImmutableSet;
@@ -37,13 +43,15 @@
3743
import java.util.Optional;
3844

3945
import static com.facebook.presto.common.WarningHandlingLevel.AS_ERROR;
46+
import static com.facebook.presto.common.resourceGroups.QueryType.CALL_DISTRIBUTED_PROCEDURE;
4047
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
4148
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
4249
import static com.facebook.presto.spi.StandardErrorCode.WARNING_AS_ERROR;
4350
import static com.facebook.presto.sql.SqlFormatter.formatSql;
4451
import static com.facebook.presto.sql.analyzer.ConstantExpressionVerifier.verifyExpressionIsConstant;
4552
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PARAMETER_USAGE;
4653
import static com.facebook.presto.sql.analyzer.utils.AnalyzerUtil.createParsingOptions;
54+
import static com.facebook.presto.sql.analyzer.utils.MetadataUtils.createQualifiedObjectName;
4755
import static com.facebook.presto.sql.analyzer.utils.ParameterExtractor.getParameterCount;
4856
import static com.facebook.presto.sql.tree.ExplainType.Type.VALIDATE;
4957
import static java.lang.String.format;
@@ -57,11 +65,15 @@ public class BuiltInQueryPreparer
5765
implements QueryPreparer
5866
{
5967
private final SqlParser sqlParser;
68+
private final IProcedureRegistry procedureRegistry;
6069

6170
@Inject
62-
public BuiltInQueryPreparer(SqlParser sqlParser)
71+
public BuiltInQueryPreparer(
72+
SqlParser sqlParser,
73+
IProcedureRegistry procedureRegistry)
6374
{
6475
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
76+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
6577
}
6678

6779
@Override
@@ -88,6 +100,17 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
88100
statement = sqlParser.createStatement(query, createParsingOptions(analyzerOptions));
89101
}
90102

103+
Optional<QualifiedObjectName> distributedProcedureName = Optional.empty();
104+
if (statement instanceof Call) {
105+
QualifiedName qualifiedName = ((Call) statement).getName();
106+
QualifiedObjectName qualifiedObjectName = createQualifiedObjectName(analyzerOptions.getSessionCatalogName(), analyzerOptions.getSessionSchemaName(), statement, qualifiedName);
107+
if (procedureRegistry.isDistributedProcedure(
108+
new ConnectorId(qualifiedObjectName.getCatalogName()),
109+
new SchemaTableName(qualifiedObjectName.getSchemaName(), qualifiedObjectName.getObjectName()))) {
110+
distributedProcedureName = Optional.of(qualifiedObjectName);
111+
}
112+
}
113+
91114
if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
92115
Statement innerStatement = ((Explain) statement).getStatement();
93116
Optional<QueryType> innerQueryType = StatementUtils.getQueryType(innerStatement.getClass());
@@ -104,7 +127,7 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
104127
if (analyzerOptions.isLogFormattedQueryEnabled()) {
105128
formattedQuery = Optional.of(getFormattedQuery(statement, parameters));
106129
}
107-
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql);
130+
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql, distributedProcedureName);
108131
}
109132

110133
private static String getFormattedQuery(Statement statement, List<Expression> parameters)
@@ -132,13 +155,19 @@ public static class BuiltInPreparedQuery
132155
private final Statement statement;
133156
private final Statement wrappedStatement;
134157
private final List<Expression> parameters;
158+
private final Optional<QualifiedObjectName> distributedProcedureName;
135159

136-
public BuiltInPreparedQuery(Statement wrappedStatement, Statement statement, List<Expression> parameters, Optional<String> formattedQuery, Optional<String> prepareSql)
160+
public BuiltInPreparedQuery(
161+
Statement wrappedStatement,
162+
Statement statement, List<Expression> parameters,
163+
Optional<String> formattedQuery, Optional<String> prepareSql,
164+
Optional<QualifiedObjectName> distributedProcedureName)
137165
{
138166
super(formattedQuery, prepareSql);
139167
this.wrappedStatement = requireNonNull(wrappedStatement, "wrappedStatement is null");
140168
this.statement = requireNonNull(statement, "statement is null");
141169
this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null"));
170+
this.distributedProcedureName = requireNonNull(distributedProcedureName, "distributedProcedureName is null");
142171
}
143172

144173
public Statement getStatement()
@@ -158,9 +187,17 @@ public List<Expression> getParameters()
158187

159188
public Optional<QueryType> getQueryType()
160189
{
190+
if (getDistributedProcedureName().isPresent()) {
191+
return Optional.of(CALL_DISTRIBUTED_PROCEDURE);
192+
}
161193
return StatementUtils.getQueryType(statement.getClass());
162194
}
163195

196+
public Optional<QualifiedObjectName> getDistributedProcedureName()
197+
{
198+
return this.distributedProcedureName;
199+
}
200+
164201
public boolean isTransactionControlStatement()
165202
{
166203
return StatementUtils.isTransactionControlStatement(getStatement());
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.analyzer.utils;
15+
16+
import com.facebook.presto.common.QualifiedObjectName;
17+
import com.facebook.presto.spi.PrestoException;
18+
import com.facebook.presto.sql.analyzer.SemanticException;
19+
import com.facebook.presto.sql.tree.Node;
20+
import com.facebook.presto.sql.tree.QualifiedName;
21+
import com.google.common.collect.Lists;
22+
23+
import java.util.List;
24+
import java.util.Optional;
25+
26+
import static com.facebook.presto.spi.StandardErrorCode.SYNTAX_ERROR;
27+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.CATALOG_NOT_SPECIFIED;
28+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
29+
import static java.lang.String.format;
30+
import static java.util.Objects.requireNonNull;
31+
32+
public class MetadataUtils
33+
{
34+
private MetadataUtils()
35+
{}
36+
37+
public static QualifiedObjectName createQualifiedObjectName(Optional<String> sessionCatalogName, Optional<String> sessionSchemaName, Node node, QualifiedName name)
38+
{
39+
requireNonNull(sessionCatalogName, "sessionCatalogName is null");
40+
requireNonNull(sessionSchemaName, "sessionSchemaName is null");
41+
requireNonNull(name, "name is null");
42+
if (name.getParts().size() > 3) {
43+
throw new PrestoException(SYNTAX_ERROR, format("Too many dots in table name: %s", name));
44+
}
45+
46+
List<String> parts = Lists.reverse(name.getParts());
47+
String objectName = parts.get(0);
48+
String schemaName = (parts.size() > 1) ? parts.get(1) : sessionSchemaName.orElseThrow(() ->
49+
new SemanticException(SCHEMA_NOT_SPECIFIED, node, "Schema must be specified when session schema is not set"));
50+
String catalogName = (parts.size() > 2) ? parts.get(2) : sessionCatalogName.orElseThrow(() ->
51+
new SemanticException(CATALOG_NOT_SPECIFIED, node, "Catalog must be specified when session catalog is not set"));
52+
53+
return new QualifiedObjectName(catalogName, schemaName, objectName);
54+
}
55+
}

presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,108 @@
1313
*/
1414
package com.facebook.presto.sql.analyzer;
1515

16+
import com.facebook.presto.common.resourceGroups.QueryType;
17+
import com.facebook.presto.spi.ConnectorId;
1618
import com.facebook.presto.spi.PrestoException;
1719
import com.facebook.presto.spi.WarningCollector;
1820
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
21+
import com.facebook.presto.spi.procedure.DistributedProcedure;
22+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
23+
import com.facebook.presto.spi.procedure.Procedure;
24+
import com.facebook.presto.spi.procedure.TestProcedureRegistry;
1925
import com.facebook.presto.sql.analyzer.BuiltInQueryPreparer.BuiltInPreparedQuery;
2026
import com.facebook.presto.sql.parser.SqlParser;
2127
import com.facebook.presto.sql.tree.AllColumns;
28+
import com.facebook.presto.sql.tree.Call;
29+
import com.facebook.presto.sql.tree.CallArgument;
2230
import com.facebook.presto.sql.tree.QualifiedName;
31+
import com.facebook.presto.sql.tree.StringLiteral;
2332
import com.google.common.collect.ImmutableMap;
33+
import org.testng.annotations.BeforeClass;
2434
import org.testng.annotations.Test;
2535

36+
import java.util.ArrayList;
37+
import java.util.List;
2638
import java.util.Map;
2739
import java.util.Optional;
2840

41+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
2942
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
43+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
44+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
3045
import static com.facebook.presto.sql.QueryUtil.selectList;
3146
import static com.facebook.presto.sql.QueryUtil.simpleQuery;
3247
import static com.facebook.presto.sql.QueryUtil.table;
3348
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PARAMETER_USAGE;
3449
import static org.testng.Assert.assertEquals;
50+
import static org.testng.Assert.assertTrue;
3551
import static org.testng.Assert.fail;
3652

3753
public class TestBuiltInQueryPreparer
3854
{
3955
private static final SqlParser SQL_PARSER = new SqlParser();
40-
private static final BuiltInQueryPreparer QUERY_PREPARER = new BuiltInQueryPreparer(SQL_PARSER);
4156
private static final Map<String, String> emptyPreparedStatements = ImmutableMap.of();
4257
private static final AnalyzerOptions testAnalyzerOptions = AnalyzerOptions.builder().build();
58+
private static IProcedureRegistry procedureRegistry;
59+
private static BuiltInQueryPreparer queryPreparer;
60+
61+
@BeforeClass
62+
public void setup()
63+
{
64+
procedureRegistry = new TestProcedureRegistry();
65+
List<Procedure.Argument> arguments = new ArrayList<>();
66+
arguments.add(new Procedure.Argument(SCHEMA, VARCHAR));
67+
arguments.add(new Procedure.Argument(TABLE_NAME, VARCHAR));
68+
69+
List<Procedure> procedures = new ArrayList<>();
70+
procedures.add(new Procedure("system", "fun", arguments));
71+
procedures.add(new DistributedProcedure("system", "distributed_fun",
72+
arguments,
73+
(session, transactionContext, procedureHandle, fragments) -> null,
74+
(transactionContext, procedureHandle, fragments) -> {}));
75+
procedureRegistry.addProcedures(new ConnectorId("test"), procedures);
76+
queryPreparer = new BuiltInQueryPreparer(SQL_PARSER, procedureRegistry);
77+
}
4378

4479
@Test
4580
public void testSelectStatement()
4681
{
47-
BuiltInPreparedQuery preparedQuery = QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "SELECT * FROM foo", emptyPreparedStatements, WarningCollector.NOOP);
82+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "SELECT * FROM foo", emptyPreparedStatements, WarningCollector.NOOP);
4883
assertEquals(preparedQuery.getStatement(),
4984
simpleQuery(selectList(new AllColumns()), table(QualifiedName.of("foo"))));
5085
}
5186

87+
@Test
88+
public void testCallProcedureStatement()
89+
{
90+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "call test.system.fun('a', 'b')", emptyPreparedStatements, WarningCollector.NOOP);
91+
List<CallArgument> arguments = new ArrayList<>();
92+
arguments.add(new CallArgument(new StringLiteral("a")));
93+
arguments.add(new CallArgument(new StringLiteral("b")));
94+
assertEquals(preparedQuery.getStatement(),
95+
new Call(QualifiedName.of("test", "system", "fun"), arguments));
96+
assertTrue(preparedQuery.getQueryType().isPresent());
97+
assertEquals(preparedQuery.getQueryType().get(), QueryType.DATA_DEFINITION);
98+
}
99+
100+
@Test
101+
public void testCallDistributedProcedureStatement()
102+
{
103+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "call test.system.distributed_fun('a', 'b')", emptyPreparedStatements, WarningCollector.NOOP);
104+
List<CallArgument> arguments = new ArrayList<>();
105+
arguments.add(new CallArgument(new StringLiteral("a")));
106+
arguments.add(new CallArgument(new StringLiteral("b")));
107+
assertEquals(preparedQuery.getStatement(),
108+
new Call(QualifiedName.of("test", "system", "distributed_fun"), arguments));
109+
assertTrue(preparedQuery.getQueryType().isPresent());
110+
assertEquals(preparedQuery.getQueryType().get(), QueryType.CALL_DISTRIBUTED_PROCEDURE);
111+
}
112+
52113
@Test
53114
public void testExecuteStatement()
54115
{
55116
Map<String, String> preparedStatements = ImmutableMap.of("my_query", "SELECT * FROM foo");
56-
BuiltInPreparedQuery preparedQuery = QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "EXECUTE my_query", preparedStatements, WarningCollector.NOOP);
117+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(testAnalyzerOptions, "EXECUTE my_query", preparedStatements, WarningCollector.NOOP);
57118
assertEquals(preparedQuery.getStatement(),
58119
simpleQuery(selectList(new AllColumns()), table(QualifiedName.of("foo"))));
59120
}
@@ -62,7 +123,7 @@ public void testExecuteStatement()
62123
public void testExecuteStatementDoesNotExist()
63124
{
64125
try {
65-
QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "execute my_query", emptyPreparedStatements, WarningCollector.NOOP);
126+
queryPreparer.prepareQuery(testAnalyzerOptions, "execute my_query", emptyPreparedStatements, WarningCollector.NOOP);
66127
fail("expected exception");
67128
}
68129
catch (PrestoException e) {
@@ -75,7 +136,7 @@ public void testTooManyParameters()
75136
{
76137
try {
77138
Map<String, String> preparedStatements = ImmutableMap.of("my_query", "SELECT * FROM foo where col1 = ?");
78-
QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1,2", preparedStatements, WarningCollector.NOOP);
139+
queryPreparer.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1,2", preparedStatements, WarningCollector.NOOP);
79140
fail("expected exception");
80141
}
81142
catch (SemanticException e) {
@@ -88,7 +149,7 @@ public void testTooFewParameters()
88149
{
89150
try {
90151
Map<String, String> preparedStatements = ImmutableMap.of("my_query", "SELECT ? FROM foo where col1 = ?");
91-
QUERY_PREPARER.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1", preparedStatements, WarningCollector.NOOP);
152+
queryPreparer.prepareQuery(testAnalyzerOptions, "EXECUTE my_query USING 1", preparedStatements, WarningCollector.NOOP);
92153
fail("expected exception");
93154
}
94155
catch (SemanticException e) {
@@ -100,7 +161,7 @@ public void testTooFewParameters()
100161
public void testFormattedQuery()
101162
{
102163
AnalyzerOptions analyzerOptions = AnalyzerOptions.builder().setLogFormattedQueryEnabled(true).build();
103-
BuiltInPreparedQuery preparedQuery = QUERY_PREPARER.prepareQuery(
164+
BuiltInPreparedQuery preparedQuery = queryPreparer.prepareQuery(
104165
analyzerOptions,
105166
"PREPARE test FROM SELECT * FROM foo where col1 = ?",
106167
emptyPreparedStatements,
@@ -112,7 +173,7 @@ public void testFormattedQuery()
112173
" foo\n" +
113174
" WHERE (col1 = ?)\n"));
114175

115-
preparedQuery = QUERY_PREPARER.prepareQuery(
176+
preparedQuery = queryPreparer.prepareQuery(
116177
analyzerOptions,
117178
"PREPARE test FROM SELECT * FROM foo",
118179
emptyPreparedStatements,

presto-common/src/main/java/com/facebook/presto/common/resourceGroups/QueryType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public enum QueryType
2727
INSERT(6),
2828
SELECT(7),
2929
CONTROL(8),
30-
UPDATE(9)
30+
UPDATE(9),
31+
CALL_DISTRIBUTED_PROCEDURE(10),
3132
/**/;
3233

3334
private final int value;

0 commit comments

Comments
 (0)