Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
278e5f0
Refactor ProcedureRegistry to support distributed procedure and be av…
hantangwangd Oct 7, 2025
987118e
Support and handle call distributed procedure statement in preparer
hantangwangd Oct 6, 2025
bc67dbd
Analyze and plan for call distributed procedure statement
hantangwangd Oct 6, 2025
d7574eb
Execute optimization, segmentation and local planning for call distri…
hantangwangd Oct 7, 2025
2a243dd
Refactor Iceberg connector to support call distributed procedure
hantangwangd Oct 7, 2025
2cff9a9
Support Iceberg procedure `rewrite_data_files`
hantangwangd Oct 8, 2025
cbc0edf
Fix mixed case test failures
hantangwangd Oct 8, 2025
bef5763
Fix test failure caused by concurrent operations on the same table
hantangwangd Oct 8, 2025
f0729ec
Extend DistributedProcedure hierarchy for more extension types
hantangwangd Oct 9, 2025
cf12a2f
[native] Update presto protocol for distributed procedure
hantangwangd Oct 14, 2025
eaa82da
[native] Prepare for the actual support of distributed procedures
hantangwangd Oct 14, 2025
73f4367
[Address Comment] Rename IProcedureRegistry to ProcedureRegistry
hantangwangd Oct 19, 2025
e24d0d8
[Address Comment] Move TestProcedureRegistry out of the presto-spi mo…
hantangwangd Oct 19, 2025
6f1c34b
[Address Comment] Fix little things and typos
hantangwangd Oct 19, 2025
9f4fae8
[Address Comment]Refactor `Procedure` and `DistributedProcedure` into…
hantangwangd Oct 19, 2025
a192c74
[Address Comment] Add logical planner test for call distributed proce…
hantangwangd Oct 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.function.FunctionKind;
import com.facebook.presto.spi.function.table.Argument;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.spi.security.AccessControlContext;
import com.facebook.presto.spi.security.AllowAllAccessControl;
Expand Down Expand Up @@ -174,6 +175,13 @@ public class Analysis
private final Multiset<ColumnMaskScopeEntry> columnMaskScopes = HashMultiset.create();
private final Map<NodeRef<Table>, Map<String, Expression>> columnMasks = new LinkedHashMap<>();

// for call distributed procedure
private Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType = Optional.empty();
private Optional<QualifiedObjectName> procedureName = Optional.empty();
private Optional<Object[]> procedureArguments = Optional.empty();
private Optional<TableHandle> callTarget = Optional.empty();
private Optional<QuerySpecification> targetQuery = Optional.empty();

// for create table
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
private Map<String, Expression> createTableProperties = ImmutableMap.of();
Expand Down Expand Up @@ -666,6 +674,46 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
return createTableDestination;
}

public Optional<QualifiedObjectName> getProcedureName()
{
return procedureName;
}

public void setProcedureName(Optional<QualifiedObjectName> procedureName)
{
this.procedureName = procedureName;
}

public Optional<DistributedProcedure.DistributedProcedureType> getDistributedProcedureType()
{
return distributedProcedureType;
}

public void setDistributedProcedureType(Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType)
{
this.distributedProcedureType = distributedProcedureType;
}

public Optional<Object[]> getProcedureArguments()
{
return procedureArguments;
}

public void setProcedureArguments(Optional<Object[]> procedureArguments)
{
this.procedureArguments = procedureArguments;
}

public Optional<TableHandle> getCallTarget()
{
return callTarget;
}

public void setCallTarget(TableHandle callTarget)
{
this.callTarget = Optional.of(callTarget);
}

public Optional<TableHandle> getAnalyzeTarget()
{
return analyzeTarget;
Expand Down Expand Up @@ -1020,6 +1068,16 @@ public Optional<QuerySpecification> getCurrentQuerySpecification()
return currentQuerySpecification;
}

public void setTargetQuery(QuerySpecification targetQuery)
{
this.targetQuery = Optional.of(targetQuery);
}

public Optional<QuerySpecification> getTargetQuery()
{
return this.targetQuery;
}

public Map<FunctionKind, Set<String>> getInvokedFunctions()
{
Map<FunctionKind, Set<String>> functionMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,25 @@
*/
package com.facebook.presto.sql.analyzer;

import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.PrestoWarning;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
import com.facebook.presto.spi.analyzer.QueryPreparer;
import com.facebook.presto.spi.procedure.ProcedureRegistry;
import com.facebook.presto.sql.analyzer.utils.StatementUtils;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.tree.Call;
import com.facebook.presto.sql.tree.Execute;
import com.facebook.presto.sql.tree.Explain;
import com.facebook.presto.sql.tree.ExplainType;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.QualifiedName;
import com.facebook.presto.sql.tree.Statement;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand All @@ -36,13 +42,15 @@
import java.util.Optional;

import static com.facebook.presto.common.WarningHandlingLevel.AS_ERROR;
import static com.facebook.presto.common.resourceGroups.QueryType.CALL_DISTRIBUTED_PROCEDURE;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.WARNING_AS_ERROR;
import static com.facebook.presto.sql.SqlFormatter.formatSql;
import static com.facebook.presto.sql.analyzer.ConstantExpressionVerifier.verifyExpressionIsConstant;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PARAMETER_USAGE;
import static com.facebook.presto.sql.analyzer.utils.AnalyzerUtil.createParsingOptions;
import static com.facebook.presto.sql.analyzer.utils.MetadataUtils.createQualifiedObjectName;
import static com.facebook.presto.sql.analyzer.utils.ParameterExtractor.getParameterCount;
import static com.facebook.presto.sql.tree.ExplainType.Type.VALIDATE;
import static java.lang.String.format;
Expand All @@ -56,11 +64,15 @@ public class BuiltInQueryPreparer
implements QueryPreparer
{
private final SqlParser sqlParser;
private final ProcedureRegistry procedureRegistry;

@Inject
public BuiltInQueryPreparer(SqlParser sqlParser)
public BuiltInQueryPreparer(
SqlParser sqlParser,
ProcedureRegistry procedureRegistry)
{
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
}

@Override
Expand All @@ -87,6 +99,18 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
statement = sqlParser.createStatement(query, createParsingOptions(analyzerOptions));
}

Optional<QualifiedObjectName> distributedProcedureName = Optional.empty();
if (statement instanceof Call) {
QualifiedName qualifiedName = ((Call) statement).getName();
QualifiedObjectName qualifiedObjectName = createQualifiedObjectName(analyzerOptions.getSessionCatalogName(), analyzerOptions.getSessionSchemaName(),
statement, qualifiedName, (catalogName, objectName) -> objectName);
if (procedureRegistry.isDistributedProcedure(
new ConnectorId(qualifiedObjectName.getCatalogName()),
new SchemaTableName(qualifiedObjectName.getSchemaName(), qualifiedObjectName.getObjectName()))) {
distributedProcedureName = Optional.of(qualifiedObjectName);
}
}

if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
Statement innerStatement = ((Explain) statement).getStatement();
Optional<QueryType> innerQueryType = StatementUtils.getQueryType(innerStatement.getClass());
Expand All @@ -103,7 +127,7 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
if (analyzerOptions.isLogFormattedQueryEnabled()) {
formattedQuery = Optional.of(getFormattedQuery(statement, parameters));
}
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql);
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql, distributedProcedureName);
}

private static String getFormattedQuery(Statement statement, List<Expression> parameters)
Expand Down Expand Up @@ -131,13 +155,19 @@ public static class BuiltInPreparedQuery
private final Statement statement;
private final Statement wrappedStatement;
private final List<Expression> parameters;
private final Optional<QualifiedObjectName> distributedProcedureName;

public BuiltInPreparedQuery(Statement wrappedStatement, Statement statement, List<Expression> parameters, Optional<String> formattedQuery, Optional<String> prepareSql)
public BuiltInPreparedQuery(
Statement wrappedStatement,
Statement statement, List<Expression> parameters,
Optional<String> formattedQuery, Optional<String> prepareSql,
Optional<QualifiedObjectName> distributedProcedureName)
{
super(formattedQuery, prepareSql);
this.wrappedStatement = requireNonNull(wrappedStatement, "wrappedStatement is null");
this.statement = requireNonNull(statement, "statement is null");
this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null"));
this.distributedProcedureName = requireNonNull(distributedProcedureName, "distributedProcedureName is null");
}

public Statement getStatement()
Expand All @@ -157,9 +187,17 @@ public List<Expression> getParameters()

public Optional<QueryType> getQueryType()
{
if (getDistributedProcedureName().isPresent()) {
return Optional.of(CALL_DISTRIBUTED_PROCEDURE);
}
return StatementUtils.getQueryType(statement.getClass());
}

public Optional<QualifiedObjectName> getDistributedProcedureName()
{
return this.distributedProcedureName;
}

public boolean isTransactionControlStatement()
{
return StatementUtils.isTransactionControlStatement(getStatement());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public enum SemanticErrorCode

SAMPLE_PERCENTAGE_OUT_OF_RANGE,

PROCEDURE_NOT_FOUND,
INVALID_PROCEDURE_ARGUMENTS,

INVALID_SESSION_PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.analyzer.utils;

import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.analyzer.SemanticException;
import com.facebook.presto.sql.tree.Identifier;
import com.facebook.presto.sql.tree.Node;
import com.facebook.presto.sql.tree.QualifiedName;
import com.google.common.collect.Lists;

import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;

import static com.facebook.presto.spi.StandardErrorCode.SYNTAX_ERROR;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.CATALOG_NOT_SPECIFIED;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class MetadataUtils
{
private MetadataUtils()
{}

public static QualifiedObjectName createQualifiedObjectName(Optional<String> sessionCatalogName, Optional<String> sessionSchemaName, Node node, QualifiedName name,
BiFunction<String, String, String> normalizer)
{
requireNonNull(sessionCatalogName, "sessionCatalogName is null");
requireNonNull(sessionSchemaName, "sessionSchemaName is null");
requireNonNull(name, "name is null");
if (name.getParts().size() > 3) {
throw new PrestoException(SYNTAX_ERROR, format("Too many dots in table name: %s", name));
}

List<Identifier> parts = Lists.reverse(name.getOriginalParts());
String objectName = parts.get(0).getValue();
String schemaName = (parts.size() > 1) ? parts.get(1).getValue() : sessionSchemaName.orElseThrow(() ->
new SemanticException(SCHEMA_NOT_SPECIFIED, node, "Schema must be specified when session schema is not set"));
String catalogName = (parts.size() > 2) ? parts.get(2).getValue() : sessionCatalogName.orElseThrow(() ->
new SemanticException(CATALOG_NOT_SPECIFIED, node, "Catalog must be specified when session catalog is not set"));

catalogName = catalogName.toLowerCase(ENGLISH);
schemaName = normalizer.apply(catalogName, schemaName);
objectName = normalizer.apply(catalogName, objectName);
return new QualifiedObjectName(catalogName, schemaName, objectName);
}
}
Loading
Loading