Skip to content

Commit 08928fb

Browse files
committed
[Address Comment]Refactor Procedure and DistributedProcedure into abstract classes
And introduce a new class `LocalProcedure` to represent the former coordinator-only procedures
1 parent 026b572 commit 08928fb

File tree

30 files changed

+245
-181
lines changed

30 files changed

+245
-181
lines changed

presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import com.facebook.presto.spi.PrestoException;
1919
import com.facebook.presto.spi.WarningCollector;
2020
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
21+
import com.facebook.presto.spi.procedure.LocalProcedure;
2122
import com.facebook.presto.spi.procedure.Procedure;
23+
import com.facebook.presto.spi.procedure.Procedure.Argument;
2224
import com.facebook.presto.spi.procedure.ProcedureRegistry;
2325
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
2426
import com.facebook.presto.sql.analyzer.BuiltInQueryPreparer.BuiltInPreparedQuery;
@@ -61,12 +63,12 @@ public class TestBuiltInQueryPreparer
6163
public void setup()
6264
{
6365
procedureRegistry = new TestProcedureRegistry();
64-
List<Procedure.Argument> arguments = new ArrayList<>();
65-
arguments.add(new Procedure.Argument(SCHEMA, VARCHAR));
66-
arguments.add(new Procedure.Argument(TABLE_NAME, VARCHAR));
66+
List<Argument> arguments = new ArrayList<>();
67+
arguments.add(new Argument(SCHEMA, VARCHAR));
68+
arguments.add(new Argument(TABLE_NAME, VARCHAR));
6769

6870
List<Procedure> procedures = new ArrayList<>();
69-
procedures.add(new Procedure("system", "fun", arguments));
71+
procedures.add(new LocalProcedure("system", "fun", arguments));
7072
procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun",
7173
arguments,
7274
(session, transactionContext, procedureHandle, fragments) -> null,

presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/InvalidateMetastoreCacheProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.spi.ConnectorSession;
1818
import com.facebook.presto.spi.PrestoException;
1919
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
20+
import com.facebook.presto.spi.procedure.LocalProcedure;
2021
import com.facebook.presto.spi.procedure.Procedure;
2122
import com.facebook.presto.spi.procedure.Procedure.Argument;
2223
import com.google.common.collect.ImmutableList;
@@ -65,7 +66,7 @@ public InvalidateMetastoreCacheProcedure(ExtendedHiveMetastore extendedHiveMetas
6566
@Override
6667
public Procedure get()
6768
{
68-
return new Procedure(
69+
return new LocalProcedure(
6970
"system",
7071
"invalidate_metastore_cache",
7172
ImmutableList.of(

presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.facebook.presto.spi.ConnectorSession;
2323
import com.facebook.presto.spi.PrestoException;
2424
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
25+
import com.facebook.presto.spi.procedure.LocalProcedure;
2526
import com.facebook.presto.spi.procedure.Procedure;
2627
import com.facebook.presto.spi.procedure.Procedure.Argument;
2728
import com.google.common.collect.ImmutableList;
@@ -83,7 +84,7 @@ public CreateEmptyPartitionProcedure(
8384
@Override
8485
public Procedure get()
8586
{
86-
return new Procedure(
87+
return new LocalProcedure(
8788
"system",
8889
"create_empty_partition",
8990
ImmutableList.of(

presto-hive/src/main/java/com/facebook/presto/hive/DirectoryListCacheInvalidationProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.spi.ConnectorSession;
1717
import com.facebook.presto.spi.PrestoException;
1818
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
19+
import com.facebook.presto.spi.procedure.LocalProcedure;
1920
import com.facebook.presto.spi.procedure.Procedure;
2021
import com.facebook.presto.spi.procedure.Procedure.Argument;
2122
import com.google.common.collect.ImmutableList;
@@ -52,7 +53,7 @@ public DirectoryListCacheInvalidationProcedure(DirectoryLister directoryLister)
5253
@Override
5354
public Procedure get()
5455
{
55-
return new Procedure(
56+
return new LocalProcedure(
5657
"system",
5758
"invalidate_directory_list_cache",
5859
ImmutableList.of(

presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.facebook.presto.spi.SchemaTableName;
2525
import com.facebook.presto.spi.TableNotFoundException;
2626
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
27+
import com.facebook.presto.spi.procedure.LocalProcedure;
2728
import com.facebook.presto.spi.procedure.Procedure;
2829
import com.facebook.presto.spi.procedure.Procedure.Argument;
2930
import com.google.common.collect.ImmutableList;
@@ -95,7 +96,7 @@ public SyncPartitionMetadataProcedure(
9596
@Override
9697
public Procedure get()
9798
{
98-
return new Procedure(
99+
return new LocalProcedure(
99100
"system",
100101
"sync_partition_metadata",
101102
ImmutableList.of(

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,7 @@ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure(
10801080
procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext());
10811081
procedureContext.get().setTable(icebergTable);
10821082
procedureContext.get().setTransaction(transaction);
1083-
return ((DistributedProcedure) procedure).getBeginCallDistributedProcedure().begin(session, procedureContext.get(), tableLayoutHandle, arguments);
1083+
return ((DistributedProcedure) procedure).begin(session, procedureContext.get(), tableLayoutHandle, arguments);
10841084
}
10851085

10861086
@Override
@@ -1093,7 +1093,7 @@ public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDi
10931093
procedureName.getObjectName()));
10941094
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
10951095
verify(procedureContext.isPresent(), "procedure context must be present");
1096-
((DistributedProcedure) procedure).getFinishCallDistributedProcedure().finish(procedureContext.get(), procedureHandle, fragments);
1096+
((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments);
10971097
transaction.commitTransaction();
10981098
procedureContext.get().destroy();
10991099
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import com.facebook.presto.spi.ConnectorSplitSource;
2222
import com.facebook.presto.spi.FixedSplitSource;
2323
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24-
import com.facebook.presto.spi.procedure.Procedure;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure.Argument;
2526
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
2627
import com.google.common.base.VerifyException;
2728
import com.google.common.collect.ImmutableList;
@@ -63,7 +64,7 @@
6364
import static java.util.Objects.requireNonNull;
6465

6566
public class RewriteDataFilesProcedure
66-
implements Provider<Procedure>
67+
implements Provider<DistributedProcedure>
6768
{
6869
TypeManager typeManager;
6970
JsonCodec<CommitTaskData> commitTaskCodec;
@@ -78,16 +79,16 @@ public RewriteDataFilesProcedure(
7879
}
7980

8081
@Override
81-
public Procedure get()
82+
public DistributedProcedure get()
8283
{
8384
return new TableDataRewriteDistributedProcedure(
8485
"system",
8586
"rewrite_data_files",
8687
ImmutableList.of(
87-
new Procedure.Argument(SCHEMA, VARCHAR),
88-
new Procedure.Argument(TABLE_NAME, VARCHAR),
89-
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
90-
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
88+
new Argument(SCHEMA, VARCHAR),
89+
new Argument(TABLE_NAME, VARCHAR),
90+
new Argument("filter", VARCHAR, false, "TRUE"),
91+
new Argument("options", "map(varchar, varchar)", false, null)),
9192
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
9293
((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)),
9394
IcebergProcedureContext::new);

presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ExpireSnapshotsProcedure.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.facebook.presto.spi.ConnectorSession;
2121
import com.facebook.presto.spi.SchemaTableName;
2222
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
23+
import com.facebook.presto.spi.procedure.LocalProcedure;
2324
import com.facebook.presto.spi.procedure.Procedure;
25+
import com.facebook.presto.spi.procedure.Procedure.Argument;
2426
import com.google.common.collect.ImmutableList;
2527
import jakarta.inject.Inject;
2628
import org.apache.iceberg.ExpireSnapshots;
@@ -61,15 +63,15 @@ public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory)
6163
@Override
6264
public Procedure get()
6365
{
64-
return new Procedure(
66+
return new LocalProcedure(
6567
"system",
6668
"expire_snapshots",
6769
ImmutableList.of(
68-
new Procedure.Argument("schema", VARCHAR),
69-
new Procedure.Argument("table_name", VARCHAR),
70-
new Procedure.Argument("older_than", TIMESTAMP, false, null),
71-
new Procedure.Argument("retain_last", INTEGER, false, null),
72-
new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)),
70+
new Argument("schema", VARCHAR),
71+
new Argument("table_name", VARCHAR),
72+
new Argument("older_than", TIMESTAMP, false, null),
73+
new Argument("retain_last", INTEGER, false, null),
74+
new Argument("snapshot_ids", "array(bigint)", false, null)),
7375
EXPIRE_SNAPSHOTS.bindTo(this));
7476
}
7577

presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/FastForwardBranchProcedure.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import com.facebook.presto.spi.ConnectorSession;
1818
import com.facebook.presto.spi.SchemaTableName;
1919
import com.facebook.presto.spi.connector.ConnectorMetadata;
20+
import com.facebook.presto.spi.procedure.LocalProcedure;
2021
import com.facebook.presto.spi.procedure.Procedure;
22+
import com.facebook.presto.spi.procedure.Procedure.Argument;
2123
import com.google.common.collect.ImmutableList;
2224
import jakarta.inject.Inject;
2325
import org.apache.iceberg.Table;
@@ -54,14 +56,14 @@ public FastForwardBranchProcedure(IcebergMetadataFactory metadataFactory)
5456
@Override
5557
public Procedure get()
5658
{
57-
return new Procedure(
59+
return new LocalProcedure(
5860
"system",
5961
"fast_forward",
6062
ImmutableList.of(
61-
new Procedure.Argument("schema", VARCHAR),
62-
new Procedure.Argument("table_name", VARCHAR),
63-
new Procedure.Argument("branch", VARCHAR),
64-
new Procedure.Argument("to", VARCHAR)),
63+
new Argument("schema", VARCHAR),
64+
new Argument("table_name", VARCHAR),
65+
new Argument("branch", VARCHAR),
66+
new Argument("to", VARCHAR)),
6567
FAST_FORWARD.bindTo(this));
6668
}
6769

presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ManifestFileCacheInvalidationProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.presto.iceberg.ManifestFileCache;
1717
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
18+
import com.facebook.presto.spi.procedure.LocalProcedure;
1819
import com.facebook.presto.spi.procedure.Procedure;
1920
import com.google.common.collect.ImmutableList;
2021
import jakarta.inject.Inject;
@@ -44,7 +45,7 @@ public ManifestFileCacheInvalidationProcedure(ManifestFileCache manifestFileCach
4445
@Override
4546
public Procedure get()
4647
{
47-
return new Procedure(
48+
return new LocalProcedure(
4849
"system",
4950
"invalidate_manifest_file_cache",
5051
ImmutableList.of(),

0 commit comments

Comments
 (0)