Skip to content

Commit 00d76f3

Browse files
committed
Refactor ProcedureRegistry to support distributed procedure and be available in analyzer
1 parent e60a95f commit 00d76f3

File tree

14 files changed

+324
-11
lines changed

14 files changed

+324
-11
lines changed

presto-main-base/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.facebook.presto.spi.constraints.TableConstraint;
3939
import com.facebook.presto.spi.function.SqlFunction;
4040
import com.facebook.presto.spi.plan.PartitioningHandle;
41+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
4142
import com.facebook.presto.spi.security.GrantInfo;
4243
import com.facebook.presto.spi.security.PrestoPrincipal;
4344
import com.facebook.presto.spi.security.Privilege;
@@ -594,7 +595,7 @@ public FunctionAndTypeManager getFunctionAndTypeManager()
594595
}
595596

596597
@Override
597-
public ProcedureRegistry getProcedureRegistry()
598+
public IProcedureRegistry getProcedureRegistry()
598599
{
599600
return delegate.getProcedureRegistry();
600601
}

presto-main-base/src/main/java/com/facebook/presto/metadata/Metadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.facebook.presto.spi.constraints.TableConstraint;
4646
import com.facebook.presto.spi.function.SqlFunction;
4747
import com.facebook.presto.spi.plan.PartitioningHandle;
48+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
4849
import com.facebook.presto.spi.relation.RowExpression;
4950
import com.facebook.presto.spi.relation.VariableReferenceExpression;
5051
import com.facebook.presto.spi.security.GrantInfo;
@@ -505,7 +506,7 @@ default Map<String, CatalogContext> getCatalogNamesWithConnectorContext(Session
505506
// TODO: metadata should not provide FunctionAndTypeManager
506507
FunctionAndTypeManager getFunctionAndTypeManager();
507508

508-
ProcedureRegistry getProcedureRegistry();
509+
IProcedureRegistry getProcedureRegistry();
509510

510511
BlockEncodingSerde getBlockEncodingSerde();
511512

presto-main-base/src/main/java/com/facebook/presto/metadata/MetadataManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import com.facebook.presto.spi.constraints.TableConstraint;
6666
import com.facebook.presto.spi.function.SqlFunction;
6767
import com.facebook.presto.spi.plan.PartitioningHandle;
68+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
6869
import com.facebook.presto.spi.relation.RowExpression;
6970
import com.facebook.presto.spi.relation.VariableReferenceExpression;
7071
import com.facebook.presto.spi.security.GrantInfo;
@@ -149,7 +150,7 @@ public class MetadataManager
149150
private static final Logger log = Logger.get(MetadataManager.class);
150151

151152
private final FunctionAndTypeManager functionAndTypeManager;
152-
private final ProcedureRegistry procedures;
153+
private final IProcedureRegistry procedures;
153154
private final JsonCodec<ViewDefinition> viewCodec;
154155
private final BlockEncodingSerde blockEncodingSerde;
155156
private final SessionPropertyManager sessionPropertyManager;
@@ -182,7 +183,8 @@ public MetadataManager(
182183
columnPropertyManager,
183184
analyzePropertyManager,
184185
transactionManager,
185-
functionAndTypeManager);
186+
functionAndTypeManager,
187+
new ProcedureRegistry(functionAndTypeManager));
186188
}
187189

188190
@Inject
@@ -195,7 +197,8 @@ public MetadataManager(
195197
ColumnPropertyManager columnPropertyManager,
196198
AnalyzePropertyManager analyzePropertyManager,
197199
TransactionManager transactionManager,
198-
FunctionAndTypeManager functionAndTypeManager)
200+
FunctionAndTypeManager functionAndTypeManager,
201+
IProcedureRegistry procedureRegistry)
199202
{
200203
this.viewCodec = requireNonNull(viewCodec, "viewCodec is null");
201204
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
@@ -206,7 +209,7 @@ public MetadataManager(
206209
this.analyzePropertyManager = requireNonNull(analyzePropertyManager, "analyzePropertyManager is null");
207210
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
208211
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null");
209-
this.procedures = new ProcedureRegistry(functionAndTypeManager);
212+
this.procedures = requireNonNull(procedureRegistry, "procedureRegistry is null");
210213

211214
verifyComparableOrderableContract();
212215
}
@@ -1339,7 +1342,7 @@ public FunctionAndTypeManager getFunctionAndTypeManager()
13391342
}
13401343

13411344
@Override
1342-
public ProcedureRegistry getProcedureRegistry()
1345+
public IProcedureRegistry getProcedureRegistry()
13431346
{
13441347
return procedures;
13451348
}

presto-main-base/src/main/java/com/facebook/presto/metadata/ProcedureRegistry.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import com.facebook.presto.spi.ConnectorSession;
2222
import com.facebook.presto.spi.PrestoException;
2323
import com.facebook.presto.spi.SchemaTableName;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2426
import com.facebook.presto.spi.procedure.Procedure;
2527
import com.google.common.collect.Maps;
2628
import com.google.common.primitives.Primitives;
2729
import com.google.errorprone.annotations.ThreadSafe;
30+
import jakarta.inject.Inject;
2831

2932
import java.util.Collection;
3033
import java.util.List;
@@ -47,16 +50,19 @@
4750

4851
@ThreadSafe
4952
public class ProcedureRegistry
53+
implements IProcedureRegistry
5054
{
5155
private final Map<ConnectorId, Map<SchemaTableName, Procedure>> connectorProcedures = new ConcurrentHashMap<>();
5256

5357
private final TypeManager typeManager;
5458

59+
@Inject
5560
public ProcedureRegistry(TypeManager typeManager)
5661
{
5762
this.typeManager = requireNonNull(typeManager, "typeManager is null");
5863
}
5964

65+
@Override
6066
public void addProcedures(ConnectorId connectorId, Collection<Procedure> procedures)
6167
{
6268
requireNonNull(connectorId, "connectorId is null");
@@ -71,11 +77,13 @@ public void addProcedures(ConnectorId connectorId, Collection<Procedure> procedu
7177
checkState(connectorProcedures.putIfAbsent(connectorId, proceduresByName) == null, "Procedures already registered for connector: %s", connectorId);
7278
}
7379

80+
@Override
7481
public void removeProcedures(ConnectorId connectorId)
7582
{
7683
connectorProcedures.remove(connectorId);
7784
}
7885

86+
@Override
7987
public Procedure resolve(ConnectorId connectorId, SchemaTableName name)
8088
{
8189
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
@@ -88,8 +96,34 @@ public Procedure resolve(ConnectorId connectorId, SchemaTableName name)
8896
throw new PrestoException(PROCEDURE_NOT_FOUND, "Procedure not registered: " + name);
8997
}
9098

99+
@Override
100+
public DistributedProcedure resolveDistributed(ConnectorId connectorId, SchemaTableName name)
101+
{
102+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
103+
if (procedures != null) {
104+
Procedure procedure = procedures.get(name);
105+
if (procedure != null && procedure instanceof DistributedProcedure) {
106+
return (DistributedProcedure) procedure;
107+
}
108+
}
109+
throw new PrestoException(PROCEDURE_NOT_FOUND, "Distributed procedure not registered: " + name);
110+
}
111+
112+
@Override
113+
public boolean isDistributedProcedure(ConnectorId connectorId, SchemaTableName name)
114+
{
115+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
116+
return procedures != null &&
117+
procedures.containsKey(name) &&
118+
procedures.get(name) instanceof DistributedProcedure;
119+
}
120+
91121
private void validateProcedure(Procedure procedure)
92122
{
123+
if (procedure instanceof DistributedProcedure) {
124+
return;
125+
}
126+
93127
List<Class<?>> parameters = procedure.getMethodHandle().type().parameterList().stream()
94128
.filter(type -> !ConnectorSession.class.isAssignableFrom(type))
95129
.collect(toList());

presto-main-base/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.facebook.presto.spi.constraints.TableConstraint;
4040
import com.facebook.presto.spi.function.SqlFunction;
4141
import com.facebook.presto.spi.plan.PartitioningHandle;
42+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
4243
import com.facebook.presto.spi.security.GrantInfo;
4344
import com.facebook.presto.spi.security.PrestoPrincipal;
4445
import com.facebook.presto.spi.security.Privilege;
@@ -616,7 +617,7 @@ public FunctionAndTypeManager getFunctionAndTypeManager()
616617
}
617618

618619
@Override
619-
public ProcedureRegistry getProcedureRegistry()
620+
public IProcedureRegistry getProcedureRegistry()
620621
{
621622
throw new UnsupportedOperationException();
622623
}

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import com.facebook.presto.metadata.InternalNodeManager;
109109
import com.facebook.presto.metadata.Metadata;
110110
import com.facebook.presto.metadata.MetadataManager;
111+
import com.facebook.presto.metadata.ProcedureRegistry;
111112
import com.facebook.presto.metadata.SchemaPropertyManager;
112113
import com.facebook.presto.metadata.SessionPropertyManager;
113114
import com.facebook.presto.metadata.SessionPropertyProviderConfig;
@@ -171,6 +172,7 @@
171172
import com.facebook.presto.spi.function.SqlInvokedFunction;
172173
import com.facebook.presto.spi.plan.SimplePlanFragment;
173174
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
175+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
174176
import com.facebook.presto.spi.relation.DeterminismEvaluator;
175177
import com.facebook.presto.spi.relation.DomainTranslator;
176178
import com.facebook.presto.spi.relation.PredicateCompiler;
@@ -654,6 +656,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
654656
binder.bind(FunctionAndTypeManager.class).in(Scopes.SINGLETON);
655657
binder.bind(TableFunctionRegistry.class).in(Scopes.SINGLETON);
656658
binder.bind(MetadataManager.class).in(Scopes.SINGLETON);
659+
binder.bind(ProcedureRegistry.class).in(Scopes.SINGLETON);
660+
binder.bind(IProcedureRegistry.class).to(ProcedureRegistry.class).in(Scopes.SINGLETON);
657661

658662
if (serverConfig.isCatalogServerEnabled() && serverConfig.isCoordinator()) {
659663
binder.bind(RemoteMetadataManager.class).in(Scopes.SINGLETON);

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import com.facebook.presto.metadata.InternalNodeManager;
7676
import com.facebook.presto.metadata.Metadata;
7777
import com.facebook.presto.metadata.MetadataManager;
78+
import com.facebook.presto.metadata.ProcedureRegistry;
7879
import com.facebook.presto.metadata.SchemaPropertyManager;
7980
import com.facebook.presto.metadata.SessionPropertyManager;
8081
import com.facebook.presto.metadata.SessionPropertyProviderConfig;
@@ -141,6 +142,7 @@
141142
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
142143
import com.facebook.presto.spi.plan.SimplePlanFragment;
143144
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
145+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
144146
import com.facebook.presto.spi.relation.DeterminismEvaluator;
145147
import com.facebook.presto.spi.relation.DomainTranslator;
146148
import com.facebook.presto.spi.relation.PredicateCompiler;
@@ -383,6 +385,8 @@ protected void setup(Binder binder)
383385
binder.bind(Metadata.class).to(MetadataManager.class).in(Scopes.SINGLETON);
384386
binder.bind(StaticFunctionNamespaceStore.class).in(Scopes.SINGLETON);
385387
binder.bind(StaticTypeManagerStore.class).in(Scopes.SINGLETON);
388+
binder.bind(ProcedureRegistry.class).in(Scopes.SINGLETON);
389+
binder.bind(IProcedureRegistry.class).to(ProcedureRegistry.class).in(Scopes.SINGLETON);
386390

387391
// type
388392
newSetBinder(binder, Type.class);
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spi;
15+
16+
@SuppressWarnings("MarkerInterface")
17+
public interface ConnectorDistributedProcedureHandle
18+
{
19+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spi.connector;
15+
16+
public interface ConnectorProcedureContext
17+
{
18+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spi.procedure;
15+
16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
19+
import com.facebook.presto.spi.connector.ConnectorProcedureContext;
20+
import io.airlift.slice.Slice;
21+
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.OptionalInt;
25+
26+
import static java.lang.String.format;
27+
import static java.util.Objects.requireNonNull;
28+
29+
public class DistributedProcedure
30+
extends Procedure
31+
{
32+
public static final String SCHEMA = "schema";
33+
public static final String TABLE_NAME = "table_name";
34+
public static final String FILTER = "filter";
35+
private final BeginCallDistributedProcedure beginCallDistributedProcedure;
36+
private final FinishCallDistributedProcedure finishCallDistributedProcedure;
37+
38+
private int schemaIndex = -1;
39+
private int tableNameIndex = -1;
40+
private OptionalInt filterIndex = OptionalInt.empty();
41+
42+
public DistributedProcedure(String schema, String name, List<Argument> arguments, BeginCallDistributedProcedure beginCallDistributedProcedure, FinishCallDistributedProcedure finishCallDistributedProcedure)
43+
{
44+
super(schema, name, arguments);
45+
for (int i = 0; i < getArguments().size(); i++) {
46+
if (getArguments().get(i).getName().equals(SCHEMA)) {
47+
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
48+
format("Argument `%s` must be string type", SCHEMA));
49+
schemaIndex = i;
50+
}
51+
else if (getArguments().get(i).getName().equals(TABLE_NAME)) {
52+
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
53+
format("Argument `%s` must be string type", TABLE_NAME));
54+
tableNameIndex = i;
55+
}
56+
else if (getArguments().get(i).getName().equals(FILTER)) {
57+
filterIndex = OptionalInt.of(i);
58+
}
59+
}
60+
checkArgument(schemaIndex >= 0 && tableNameIndex >= 0,
61+
format("A distributed procedure need at least 2 arguments: `%s` and `%s` for the target table", SCHEMA, TABLE_NAME));
62+
this.beginCallDistributedProcedure = requireNonNull(beginCallDistributedProcedure, "beginTableExecute is null");
63+
this.finishCallDistributedProcedure = requireNonNull(finishCallDistributedProcedure, "finishTableExecute is null");
64+
}
65+
66+
public BeginCallDistributedProcedure getBeginCallDistributedProcedure()
67+
{
68+
return beginCallDistributedProcedure;
69+
}
70+
71+
public FinishCallDistributedProcedure getFinishCallDistributedProcedure()
72+
{
73+
return finishCallDistributedProcedure;
74+
}
75+
76+
public String getSchema(Object[] parameters)
77+
{
78+
return (String) parameters[schemaIndex];
79+
}
80+
81+
public String getTableName(Object[] parameters)
82+
{
83+
return (String) parameters[tableNameIndex];
84+
}
85+
86+
public String getFilter(Object[] parameters)
87+
{
88+
if (filterIndex.isPresent()) {
89+
return (String) parameters[filterIndex.getAsInt()];
90+
}
91+
else {
92+
return "TRUE";
93+
}
94+
}
95+
96+
@FunctionalInterface
97+
public interface BeginCallDistributedProcedure
98+
{
99+
ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments);
100+
}
101+
102+
@FunctionalInterface
103+
public interface FinishCallDistributedProcedure
104+
{
105+
void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments);
106+
}
107+
}

0 commit comments

Comments
 (0)