Skip to content

Commit 10287ee

Browse files
committed
Extend DistributedProcedure hierarchy for more extension types
Use a subclass `TableDataRewriteDistributedProcedure` for table rewrite tasks, for example, merge small data files, sort table data, repartition table data etc.
1 parent 947df95 commit 10287ee

File tree

13 files changed

+213
-102
lines changed

13 files changed

+213
-102
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.facebook.presto.spi.function.FunctionKind;
3333
import com.facebook.presto.spi.function.table.Argument;
3434
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
35+
import com.facebook.presto.spi.procedure.DistributedProcedure;
3536
import com.facebook.presto.spi.security.AccessControl;
3637
import com.facebook.presto.spi.security.AccessControlContext;
3738
import com.facebook.presto.spi.security.AllowAllAccessControl;
@@ -175,8 +176,9 @@ public class Analysis
175176
private final Map<NodeRef<Table>, Map<String, Expression>> columnMasks = new LinkedHashMap<>();
176177

177178
// for call distributed procedure
178-
private Optional<QualifiedObjectName> procedureName;
179-
private Optional<Object[]> procedureArguments;
179+
private Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType = Optional.empty();
180+
private Optional<QualifiedObjectName> procedureName = Optional.empty();
181+
private Optional<Object[]> procedureArguments = Optional.empty();
180182
private Optional<TableHandle> callTarget = Optional.empty();
181183
private Optional<QuerySpecification> targetQuery = Optional.empty();
182184

@@ -682,6 +684,16 @@ public void setProcedureName(Optional<QualifiedObjectName> procedureName)
682684
this.procedureName = procedureName;
683685
}
684686

687+
public Optional<DistributedProcedure.DistributedProcedureType> getDistributedProcedureType()
688+
{
689+
return distributedProcedureType;
690+
}
691+
692+
public void setDistributedProcedureType(Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType)
693+
{
694+
this.distributedProcedureType = distributedProcedureType;
695+
}
696+
685697
public Optional<Object[]> getProcedureArguments()
686698
{
687699
return procedureArguments;

presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import com.facebook.presto.spi.PrestoException;
1919
import com.facebook.presto.spi.WarningCollector;
2020
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
21-
import com.facebook.presto.spi.procedure.DistributedProcedure;
2221
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2322
import com.facebook.presto.spi.procedure.Procedure;
23+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
2424
import com.facebook.presto.spi.procedure.TestProcedureRegistry;
2525
import com.facebook.presto.sql.analyzer.BuiltInQueryPreparer.BuiltInPreparedQuery;
2626
import com.facebook.presto.sql.parser.SqlParser;
@@ -40,8 +40,8 @@
4040

4141
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
4242
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
43-
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
44-
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
43+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
44+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
4545
import static com.facebook.presto.sql.QueryUtil.selectList;
4646
import static com.facebook.presto.sql.QueryUtil.simpleQuery;
4747
import static com.facebook.presto.sql.QueryUtil.table;
@@ -68,10 +68,11 @@ public void setup()
6868

6969
List<Procedure> procedures = new ArrayList<>();
7070
procedures.add(new Procedure("system", "fun", arguments));
71-
procedures.add(new DistributedProcedure("system", "distributed_fun",
71+
procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun",
7272
arguments,
7373
(session, transactionContext, procedureHandle, fragments) -> null,
74-
(transactionContext, procedureHandle, fragments) -> {}));
74+
(transactionContext, procedureHandle, fragments) -> {},
75+
TestProcedureRegistry.TestProcedureContext::new));
7576
procedureRegistry.addProcedures(new ConnectorId("test"), procedures);
7677
queryPreparer = new BuiltInQueryPreparer(SQL_PARSER, procedureRegistry);
7778
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1071,13 +1071,15 @@ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure(
10711071
}
10721072

10731073
transaction = icebergTable.newTransaction();
1074-
procedureContext = Optional.of(new IcebergProcedureContext(Optional.of(icebergTable), transaction));
10751074
Procedure procedure = procedureRegistry.resolve(
10761075
new ConnectorId(procedureName.getCatalogName()),
10771076
new SchemaTableName(
10781077
procedureName.getSchemaName(),
10791078
procedureName.getObjectName()));
10801079
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
1080+
procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext());
1081+
procedureContext.get().setTable(icebergTable);
1082+
procedureContext.get().setTransaction(transaction);
10811083
return ((DistributedProcedure) procedure).getBeginCallDistributedProcedure().begin(session, procedureContext.get(), tableLayoutHandle, arguments);
10821084
}
10831085

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@
3131
public class IcebergProcedureContext
3232
implements ConnectorProcedureContext
3333
{
34-
final Optional<Table> table;
35-
final Transaction transaction;
3634
final Set<DataFile> scannedDataFiles = new HashSet<>();
3735
final Set<DeleteFile> fullyAppliedDeleteFiles = new HashSet<>();
3836
final Map<String, Object> relevantData = new HashMap<>();
37+
Optional<Table> table = Optional.empty();
38+
Transaction transaction;
3939
Optional<ConnectorSplitSource> connectorSplitSource = Optional.empty();
4040

41-
public IcebergProcedureContext(Optional<Table> table, Transaction transaction)
41+
public void setTable(Table table)
42+
{
43+
this.table = Optional.of(table);
44+
}
45+
46+
public void setTransaction(Transaction transaction)
4247
{
43-
this.table = table;
4448
this.transaction = transaction;
4549
}
4650

presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import com.facebook.presto.spi.ConnectorSplitSource;
2222
import com.facebook.presto.spi.FixedSplitSource;
2323
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24-
import com.facebook.presto.spi.procedure.DistributedProcedure;
2524
import com.facebook.presto.spi.procedure.Procedure;
25+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
2626
import com.google.common.base.VerifyException;
2727
import com.google.common.collect.ImmutableList;
2828
import com.google.common.collect.ImmutableSet;
@@ -57,8 +57,8 @@
5757
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
5858
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
5959
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
60-
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
61-
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
60+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
61+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
6262
import static com.google.common.collect.ImmutableList.toImmutableList;
6363
import static java.util.Objects.requireNonNull;
6464

@@ -80,7 +80,7 @@ public RewriteDataFilesProcedure(
8080
@Override
8181
public Procedure get()
8282
{
83-
return new DistributedProcedure(
83+
return new TableDataRewriteDistributedProcedure(
8484
"system",
8585
"rewrite_data_files",
8686
ImmutableList.of(
@@ -89,7 +89,8 @@ public Procedure get()
8989
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
9090
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
9191
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
92-
((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)));
92+
((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)),
93+
IcebergProcedureContext::new);
9394
}
9495

9596
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import com.facebook.presto.spi.function.table.TableArgumentSpecification;
7272
import com.facebook.presto.spi.function.table.TableFunctionAnalysis;
7373
import com.facebook.presto.spi.procedure.DistributedProcedure;
74+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
7475
import com.facebook.presto.spi.relation.DomainTranslator;
7576
import com.facebook.presto.spi.relation.RowExpression;
7677
import com.facebook.presto.spi.security.AccessControl;
@@ -257,6 +258,7 @@
257258
import static com.facebook.presto.spi.function.FunctionKind.WINDOW;
258259
import static com.facebook.presto.spi.function.table.DescriptorArgument.NULL_DESCRIPTOR;
259260
import static com.facebook.presto.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
261+
import static com.facebook.presto.spi.procedure.DistributedProcedure.DistributedProcedureType.TABLE_DATA_REWRITE;
260262
import static com.facebook.presto.sql.MaterializedViewUtils.buildOwnerSession;
261263
import static com.facebook.presto.sql.MaterializedViewUtils.generateBaseTablePredicates;
262264
import static com.facebook.presto.sql.MaterializedViewUtils.generateFalsePredicates;
@@ -1206,31 +1208,38 @@ protected Scope visitCall(Call call, Optional<Scope> scope)
12061208
throw new SemanticException(PROCEDURE_NOT_FOUND, "Distributed procedure not registered: " + procedureName);
12071209
}
12081210
DistributedProcedure procedure = metadata.getProcedureRegistry().resolveDistributed(connectorId, toSchemaTableName(procedureName));
1209-
12101211
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, analysis.getParameters());
1211-
QualifiedName qualifiedName = QualifiedName.of(procedure.getSchema(values), procedure.getTableName(values));
1212-
QualifiedObjectName tableName = createQualifiedObjectName(session, call, qualifiedName, metadata);
12131212

12141213
analysis.setUpdateType("CALL");
1214+
analysis.setDistributedProcedureType(Optional.of(TABLE_DATA_REWRITE));
12151215
analysis.setProcedureArguments(Optional.of(values));
1216-
1217-
String filter = procedure.getFilter(values);
1218-
Expression filterExpression = sqlParser.createExpression(filter);
1219-
QuerySpecification querySpecification = new QuerySpecification(
1220-
selectList(new AllColumns()),
1221-
Optional.of(new Table(qualifiedName)),
1222-
Optional.of(filterExpression),
1223-
Optional.empty(),
1224-
Optional.empty(),
1225-
Optional.empty(),
1226-
Optional.empty(),
1227-
Optional.empty());
1228-
analyze(querySpecification, scope);
1229-
analysis.setTargetQuery(querySpecification);
1230-
1231-
TableHandle tableHandle = metadata.getHandleVersion(session, tableName, Optional.empty())
1232-
.orElseThrow(() -> (new SemanticException(MISSING_TABLE, call, "Table '%s' does not exist", tableName)));
1233-
analysis.setCallTarget(tableHandle);
1216+
switch (procedure.getType()) {
1217+
case TABLE_DATA_REWRITE:
1218+
TableDataRewriteDistributedProcedure tableDataRewriteDistributedProcedure = (TableDataRewriteDistributedProcedure) procedure;
1219+
QualifiedName qualifiedName = QualifiedName.of(tableDataRewriteDistributedProcedure.getSchema(values), tableDataRewriteDistributedProcedure.getTableName(values));
1220+
QualifiedObjectName tableName = createQualifiedObjectName(session, call, qualifiedName, metadata);
1221+
1222+
String filter = tableDataRewriteDistributedProcedure.getFilter(values);
1223+
Expression filterExpression = sqlParser.createExpression(filter);
1224+
QuerySpecification querySpecification = new QuerySpecification(
1225+
selectList(new AllColumns()),
1226+
Optional.of(new Table(qualifiedName)),
1227+
Optional.of(filterExpression),
1228+
Optional.empty(),
1229+
Optional.empty(),
1230+
Optional.empty(),
1231+
Optional.empty(),
1232+
Optional.empty());
1233+
analyze(querySpecification, scope);
1234+
analysis.setTargetQuery(querySpecification);
1235+
1236+
TableHandle tableHandle = metadata.getHandleVersion(session, tableName, Optional.empty())
1237+
.orElseThrow(() -> (new SemanticException(MISSING_TABLE, call, "Table '%s' does not exist", tableName)));
1238+
analysis.setCallTarget(tableHandle);
1239+
break;
1240+
default:
1241+
throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "Unsupported distributed procedure type: " + procedure.getType());
1242+
}
12341243
return createAndAssignScope(call, scope, Field.newUnqualified(Optional.empty(), "rows", BIGINT));
12351244
}
12361245

presto-main-base/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,13 @@ else if (statement instanceof Analyze) {
181181
return createAnalyzePlan(analysis, (Analyze) statement);
182182
}
183183
else if (statement instanceof Call) {
184-
return createCallDistributedProcedurePlan(analysis, (Call) statement);
184+
checkState(analysis.getDistributedProcedureType().isPresent(), "Call distributed procedure analysis is missing");
185+
switch (analysis.getDistributedProcedureType().get()) {
186+
case TABLE_DATA_REWRITE:
187+
return createCallDistributedProcedurePlanForTableDataRewrite(analysis, (Call) statement);
188+
default:
189+
throw new PrestoException(NOT_SUPPORTED, "Unsupported distributed procedure type: " + analysis.getDistributedProcedureType().get());
190+
}
185191
}
186192
else if (statement instanceof Insert) {
187193
checkState(analysis.getInsert().isPresent(), "Insert handle is missing");
@@ -223,7 +229,7 @@ private RelationPlan createExplainAnalyzePlan(Analysis analysis, Explain stateme
223229
return new RelationPlan(root, scope, ImmutableList.of(outputVariable));
224230
}
225231

226-
private RelationPlan createCallDistributedProcedurePlan(Analysis analysis, Call statement)
232+
private RelationPlan createCallDistributedProcedurePlanForTableDataRewrite(Analysis analysis, Call statement)
227233
{
228234
TableHandle targetTable = analysis.getCallTarget()
229235
.orElseThrow(() -> new PrestoException(NOT_FOUND, "Target table does not exist"));

presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@
5757
import com.facebook.presto.spi.function.Parameter;
5858
import com.facebook.presto.spi.function.RoutineCharacteristics;
5959
import com.facebook.presto.spi.function.SqlInvokedFunction;
60-
import com.facebook.presto.spi.procedure.DistributedProcedure;
6160
import com.facebook.presto.spi.procedure.Procedure;
61+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
6262
import com.facebook.presto.spi.procedure.TestProcedureRegistry;
6363
import com.facebook.presto.spi.security.AccessControl;
6464
import com.facebook.presto.spi.security.AllowAllAccessControl;
@@ -96,8 +96,8 @@
9696
import static com.facebook.presto.spi.function.RoutineCharacteristics.Determinism.DETERMINISTIC;
9797
import static com.facebook.presto.spi.function.RoutineCharacteristics.Language.SQL;
9898
import static com.facebook.presto.spi.function.RoutineCharacteristics.NullCallClause.RETURNS_NULL_ON_NULL_INPUT;
99-
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
100-
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
99+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
100+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
101101
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
102102
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
103103
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
@@ -186,10 +186,11 @@ public void setup()
186186

187187
List<Procedure> procedures = new ArrayList<>();
188188
procedures.add(new Procedure("system", "procedure", arguments));
189-
procedures.add(new DistributedProcedure("system", "distributed_procedure",
189+
procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_procedure",
190190
arguments,
191191
(session, transactionContext, procedureHandle, fragments) -> null,
192-
(transactionContext, procedureHandle, fragments) -> {}));
192+
(transactionContext, procedureHandle, fragments) -> {},
193+
TestProcedureRegistry.TestProcedureContext::new));
193194
metadata.getProcedureRegistry().addProcedures(SECOND_CONNECTOR_ID, procedures);
194195

195196
Catalog tpchTestCatalog = createTestingCatalog(TPCH_CATALOG, TPCH_CONNECTOR_ID);

presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@
4141
import com.facebook.presto.spi.plan.TableScanNode;
4242
import com.facebook.presto.spi.plan.TopNNode;
4343
import com.facebook.presto.spi.plan.ValuesNode;
44-
import com.facebook.presto.spi.procedure.DistributedProcedure;
4544
import com.facebook.presto.spi.procedure.Procedure;
45+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
46+
import com.facebook.presto.spi.procedure.TestProcedureRegistry;
4647
import com.facebook.presto.spi.relation.VariableReferenceExpression;
4748
import com.facebook.presto.spi.transaction.IsolationLevel;
4849
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
@@ -110,8 +111,8 @@
110111
import static com.facebook.presto.spi.plan.JoinType.INNER;
111112
import static com.facebook.presto.spi.plan.JoinType.LEFT;
112113
import static com.facebook.presto.spi.plan.JoinType.RIGHT;
113-
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
114-
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
114+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
115+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
115116
import static com.facebook.presto.sql.Optimizer.PlanStage.OPTIMIZED;
116117
import static com.facebook.presto.sql.Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED;
117118
import static com.facebook.presto.sql.TestExpressionInterpreter.AVG_UDAF_CPP;
@@ -203,10 +204,11 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
203204
arguments.add(new Procedure.Argument(SCHEMA, VARCHAR));
204205
arguments.add(new Procedure.Argument(TABLE_NAME, VARCHAR));
205206
Set<Procedure> procedures = new HashSet<>();
206-
procedures.add(new DistributedProcedure("system", "distributed_fun",
207+
procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun",
207208
arguments,
208209
(session, transactionContext, procedureHandle, fragments) -> null,
209-
(transactionContext, procedureHandle, fragments) -> {}));
210+
(transactionContext, procedureHandle, fragments) -> {},
211+
TestProcedureRegistry.TestProcedureContext::new));
210212

211213
return new Connector()
212214
{

0 commit comments

Comments
 (0)