Skip to content

Commit 1dbd189

Browse files
committed
Extend DistributedProcedure hierarchy for more extension types
Refactor `Procedure` and `DistributedProcedure` into abstract classes. Use a subclass `TableDataRewriteDistributedProcedure` for table rewrite tasks, for example, merge small data files, sort table data, repartition table data etc. And introduce a new class `LocalProcedure` to represent the former coordinator-only procedures. Rename `IProcedureRegistry` to `ProcedureRegistry`, and accordingly rename previous `ProcedureRegistry` to `BuiltInProcedureRegistry`.
1 parent 7623e5b commit 1dbd189

File tree

48 files changed

+577
-313
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+577
-313
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.facebook.presto.spi.function.FunctionKind;
3333
import com.facebook.presto.spi.function.table.Argument;
3434
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
35+
import com.facebook.presto.spi.procedure.DistributedProcedure;
3536
import com.facebook.presto.spi.security.AccessControl;
3637
import com.facebook.presto.spi.security.AccessControlContext;
3738
import com.facebook.presto.spi.security.AllowAllAccessControl;
@@ -175,8 +176,9 @@ public class Analysis
175176
private final Map<NodeRef<Table>, Map<String, Expression>> columnMasks = new LinkedHashMap<>();
176177

177178
// for call distributed procedure
178-
private Optional<QualifiedObjectName> procedureName;
179-
private Optional<Object[]> procedureArguments;
179+
private Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType = Optional.empty();
180+
private Optional<QualifiedObjectName> procedureName = Optional.empty();
181+
private Optional<Object[]> procedureArguments = Optional.empty();
180182
private Optional<TableHandle> callTarget = Optional.empty();
181183
private Optional<QuerySpecification> targetQuery = Optional.empty();
182184

@@ -682,6 +684,16 @@ public void setProcedureName(Optional<QualifiedObjectName> procedureName)
682684
this.procedureName = procedureName;
683685
}
684686

687+
public Optional<DistributedProcedure.DistributedProcedureType> getDistributedProcedureType()
688+
{
689+
return distributedProcedureType;
690+
}
691+
692+
public void setDistributedProcedureType(Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType)
693+
{
694+
this.distributedProcedureType = distributedProcedureType;
695+
}
696+
685697
public Optional<Object[]> getProcedureArguments()
686698
{
687699
return procedureArguments;

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryPreparer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.facebook.presto.spi.WarningCollector;
2424
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
2525
import com.facebook.presto.spi.analyzer.QueryPreparer;
26-
import com.facebook.presto.spi.procedure.IProcedureRegistry;
26+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
2727
import com.facebook.presto.sql.analyzer.utils.StatementUtils;
2828
import com.facebook.presto.sql.parser.SqlParser;
2929
import com.facebook.presto.sql.tree.Call;
@@ -64,12 +64,12 @@ public class BuiltInQueryPreparer
6464
implements QueryPreparer
6565
{
6666
private final SqlParser sqlParser;
67-
private final IProcedureRegistry procedureRegistry;
67+
private final ProcedureRegistry procedureRegistry;
6868

6969
@Inject
7070
public BuiltInQueryPreparer(
7171
SqlParser sqlParser,
72-
IProcedureRegistry procedureRegistry)
72+
ProcedureRegistry procedureRegistry)
7373
{
7474
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
7575
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");

presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
import com.facebook.presto.spi.PrestoException;
1919
import com.facebook.presto.spi.WarningCollector;
2020
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
21-
import com.facebook.presto.spi.procedure.DistributedProcedure;
22-
import com.facebook.presto.spi.procedure.IProcedureRegistry;
21+
import com.facebook.presto.spi.procedure.LocalProcedure;
2322
import com.facebook.presto.spi.procedure.Procedure;
24-
import com.facebook.presto.spi.procedure.TestProcedureRegistry;
23+
import com.facebook.presto.spi.procedure.Procedure.Argument;
24+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
25+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
2526
import com.facebook.presto.sql.analyzer.BuiltInQueryPreparer.BuiltInPreparedQuery;
2627
import com.facebook.presto.sql.parser.SqlParser;
2728
import com.facebook.presto.sql.tree.AllColumns;
@@ -40,8 +41,8 @@
4041

4142
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
4243
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
43-
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
44-
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
44+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
45+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
4546
import static com.facebook.presto.sql.QueryUtil.selectList;
4647
import static com.facebook.presto.sql.QueryUtil.simpleQuery;
4748
import static com.facebook.presto.sql.QueryUtil.table;
@@ -55,23 +56,24 @@ public class TestBuiltInQueryPreparer
5556
private static final SqlParser SQL_PARSER = new SqlParser();
5657
private static final Map<String, String> emptyPreparedStatements = ImmutableMap.of();
5758
private static final AnalyzerOptions testAnalyzerOptions = AnalyzerOptions.builder().build();
58-
private static IProcedureRegistry procedureRegistry;
59+
private static ProcedureRegistry procedureRegistry;
5960
private static BuiltInQueryPreparer queryPreparer;
6061

6162
@BeforeClass
6263
public void setup()
6364
{
6465
procedureRegistry = new TestProcedureRegistry();
65-
List<Procedure.Argument> arguments = new ArrayList<>();
66-
arguments.add(new Procedure.Argument(SCHEMA, VARCHAR));
67-
arguments.add(new Procedure.Argument(TABLE_NAME, VARCHAR));
66+
List<Argument> arguments = new ArrayList<>();
67+
arguments.add(new Argument(SCHEMA, VARCHAR));
68+
arguments.add(new Argument(TABLE_NAME, VARCHAR));
6869

6970
List<Procedure> procedures = new ArrayList<>();
70-
procedures.add(new Procedure("system", "fun", arguments));
71-
procedures.add(new DistributedProcedure("system", "distributed_fun",
71+
procedures.add(new LocalProcedure("system", "fun", arguments));
72+
procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun",
7273
arguments,
7374
(session, transactionContext, procedureHandle, fragments) -> null,
74-
(transactionContext, procedureHandle, fragments) -> {}));
75+
(transactionContext, procedureHandle, fragments) -> {},
76+
TestProcedureRegistry.TestProcedureContext::new));
7577
procedureRegistry.addProcedures(new ConnectorId("test"), procedures);
7678
queryPreparer = new BuiltInQueryPreparer(SQL_PARSER, procedureRegistry);
7779
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.analyzer;
15+
16+
import com.facebook.presto.spi.ConnectorId;
17+
import com.facebook.presto.spi.PrestoException;
18+
import com.facebook.presto.spi.SchemaTableName;
19+
import com.facebook.presto.spi.connector.ConnectorProcedureContext;
20+
import com.facebook.presto.spi.procedure.DistributedProcedure;
21+
import com.facebook.presto.spi.procedure.Procedure;
22+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
23+
24+
import java.util.Collection;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.function.Function;
28+
import java.util.stream.Collectors;
29+
30+
import static com.facebook.presto.spi.StandardErrorCode.PROCEDURE_NOT_FOUND;
31+
import static java.util.Objects.requireNonNull;
32+
33+
public class TestProcedureRegistry
34+
implements ProcedureRegistry
35+
{
36+
private final Map<ConnectorId, Map<SchemaTableName, Procedure>> connectorProcedures = new ConcurrentHashMap<>();
37+
38+
@Override
39+
public void addProcedures(ConnectorId connectorId, Collection<Procedure> procedures)
40+
{
41+
requireNonNull(connectorId, "connectorId is null");
42+
requireNonNull(procedures, "procedures is null");
43+
44+
Map<SchemaTableName, Procedure> proceduresByName = procedures.stream().collect(Collectors.toMap(
45+
procedure -> new SchemaTableName(procedure.getSchema(), procedure.getName()),
46+
Function.identity()));
47+
if (connectorProcedures.putIfAbsent(connectorId, proceduresByName) != null) {
48+
throw new IllegalStateException("Procedures already registered for connector: " + connectorId);
49+
}
50+
}
51+
52+
@Override
53+
public void removeProcedures(ConnectorId connectorId)
54+
{
55+
connectorProcedures.remove(connectorId);
56+
}
57+
58+
@Override
59+
public Procedure resolve(ConnectorId connectorId, SchemaTableName name)
60+
{
61+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
62+
if (procedures != null) {
63+
Procedure procedure = procedures.get(name);
64+
if (procedure != null) {
65+
return procedure;
66+
}
67+
}
68+
throw new PrestoException(PROCEDURE_NOT_FOUND, "Procedure not registered: " + name);
69+
}
70+
71+
@Override
72+
public DistributedProcedure resolveDistributed(ConnectorId connectorId, SchemaTableName name)
73+
{
74+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
75+
if (procedures != null) {
76+
Procedure procedure = procedures.get(name);
77+
if (procedure != null && procedure instanceof DistributedProcedure) {
78+
return (DistributedProcedure) procedure;
79+
}
80+
}
81+
throw new PrestoException(PROCEDURE_NOT_FOUND, "Distributed procedure not registered: " + name);
82+
}
83+
84+
@Override
85+
public boolean isDistributedProcedure(ConnectorId connectorId, SchemaTableName name)
86+
{
87+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
88+
return procedures != null &&
89+
procedures.containsKey(name) &&
90+
procedures.get(name) instanceof DistributedProcedure;
91+
}
92+
93+
public static class TestProcedureContext
94+
implements ConnectorProcedureContext
95+
{}
96+
}

presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/InvalidateMetastoreCacheProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.spi.ConnectorSession;
1818
import com.facebook.presto.spi.PrestoException;
1919
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
20+
import com.facebook.presto.spi.procedure.LocalProcedure;
2021
import com.facebook.presto.spi.procedure.Procedure;
2122
import com.facebook.presto.spi.procedure.Procedure.Argument;
2223
import com.google.common.collect.ImmutableList;
@@ -65,7 +66,7 @@ public InvalidateMetastoreCacheProcedure(ExtendedHiveMetastore extendedHiveMetas
6566
@Override
6667
public Procedure get()
6768
{
68-
return new Procedure(
69+
return new LocalProcedure(
6970
"system",
7071
"invalidate_metastore_cache",
7172
ImmutableList.of(

presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.facebook.presto.spi.ConnectorSession;
2323
import com.facebook.presto.spi.PrestoException;
2424
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
25+
import com.facebook.presto.spi.procedure.LocalProcedure;
2526
import com.facebook.presto.spi.procedure.Procedure;
2627
import com.facebook.presto.spi.procedure.Procedure.Argument;
2728
import com.google.common.collect.ImmutableList;
@@ -83,7 +84,7 @@ public CreateEmptyPartitionProcedure(
8384
@Override
8485
public Procedure get()
8586
{
86-
return new Procedure(
87+
return new LocalProcedure(
8788
"system",
8889
"create_empty_partition",
8990
ImmutableList.of(

presto-hive/src/main/java/com/facebook/presto/hive/DirectoryListCacheInvalidationProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.spi.ConnectorSession;
1717
import com.facebook.presto.spi.PrestoException;
1818
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
19+
import com.facebook.presto.spi.procedure.LocalProcedure;
1920
import com.facebook.presto.spi.procedure.Procedure;
2021
import com.facebook.presto.spi.procedure.Procedure.Argument;
2122
import com.google.common.collect.ImmutableList;
@@ -52,7 +53,7 @@ public DirectoryListCacheInvalidationProcedure(DirectoryLister directoryLister)
5253
@Override
5354
public Procedure get()
5455
{
55-
return new Procedure(
56+
return new LocalProcedure(
5657
"system",
5758
"invalidate_directory_list_cache",
5859
ImmutableList.of(

presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.facebook.presto.spi.SchemaTableName;
2525
import com.facebook.presto.spi.TableNotFoundException;
2626
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
27+
import com.facebook.presto.spi.procedure.LocalProcedure;
2728
import com.facebook.presto.spi.procedure.Procedure;
2829
import com.facebook.presto.spi.procedure.Procedure.Argument;
2930
import com.google.common.collect.ImmutableList;
@@ -95,7 +96,7 @@ public SyncPartitionMetadataProcedure(
9596
@Override
9697
public Procedure get()
9798
{
98-
return new Procedure(
99+
return new LocalProcedure(
99100
"system",
100101
"sync_partition_metadata",
101102
ImmutableList.of(

presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ExpireSnapshotsProcedure.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.facebook.presto.spi.ConnectorSession;
2121
import com.facebook.presto.spi.SchemaTableName;
2222
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
23+
import com.facebook.presto.spi.procedure.LocalProcedure;
2324
import com.facebook.presto.spi.procedure.Procedure;
25+
import com.facebook.presto.spi.procedure.Procedure.Argument;
2426
import com.google.common.collect.ImmutableList;
2527
import jakarta.inject.Inject;
2628
import org.apache.iceberg.ExpireSnapshots;
@@ -61,15 +63,15 @@ public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory)
6163
@Override
6264
public Procedure get()
6365
{
64-
return new Procedure(
66+
return new LocalProcedure(
6567
"system",
6668
"expire_snapshots",
6769
ImmutableList.of(
68-
new Procedure.Argument("schema", VARCHAR),
69-
new Procedure.Argument("table_name", VARCHAR),
70-
new Procedure.Argument("older_than", TIMESTAMP, false, null),
71-
new Procedure.Argument("retain_last", INTEGER, false, null),
72-
new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)),
70+
new Argument("schema", VARCHAR),
71+
new Argument("table_name", VARCHAR),
72+
new Argument("older_than", TIMESTAMP, false, null),
73+
new Argument("retain_last", INTEGER, false, null),
74+
new Argument("snapshot_ids", "array(bigint)", false, null)),
7375
EXPIRE_SNAPSHOTS.bindTo(this));
7476
}
7577

presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/FastForwardBranchProcedure.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import com.facebook.presto.spi.ConnectorSession;
1818
import com.facebook.presto.spi.SchemaTableName;
1919
import com.facebook.presto.spi.connector.ConnectorMetadata;
20+
import com.facebook.presto.spi.procedure.LocalProcedure;
2021
import com.facebook.presto.spi.procedure.Procedure;
22+
import com.facebook.presto.spi.procedure.Procedure.Argument;
2123
import com.google.common.collect.ImmutableList;
2224
import jakarta.inject.Inject;
2325
import org.apache.iceberg.Table;
@@ -54,14 +56,14 @@ public FastForwardBranchProcedure(IcebergMetadataFactory metadataFactory)
5456
@Override
5557
public Procedure get()
5658
{
57-
return new Procedure(
59+
return new LocalProcedure(
5860
"system",
5961
"fast_forward",
6062
ImmutableList.of(
61-
new Procedure.Argument("schema", VARCHAR),
62-
new Procedure.Argument("table_name", VARCHAR),
63-
new Procedure.Argument("branch", VARCHAR),
64-
new Procedure.Argument("to", VARCHAR)),
63+
new Argument("schema", VARCHAR),
64+
new Argument("table_name", VARCHAR),
65+
new Argument("branch", VARCHAR),
66+
new Argument("to", VARCHAR)),
6567
FAST_FORWARD.bindTo(this));
6668
}
6769

0 commit comments

Comments
 (0)