Skip to content

Commit 68c8751

Browse files
committed
Refactor ProcedureRegistry to support distributed procedure and be available in analyzer
1 parent d9114d3 commit 68c8751

File tree

14 files changed

+324
-11
lines changed

14 files changed

+324
-11
lines changed

presto-main/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.facebook.presto.spi.connector.ConnectorTableVersion;
3838
import com.facebook.presto.spi.constraints.TableConstraint;
3939
import com.facebook.presto.spi.function.SqlFunction;
40+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
4041
import com.facebook.presto.spi.security.GrantInfo;
4142
import com.facebook.presto.spi.security.PrestoPrincipal;
4243
import com.facebook.presto.spi.security.Privilege;
@@ -590,7 +591,7 @@ public FunctionAndTypeManager getFunctionAndTypeManager()
590591
}
591592

592593
@Override
593-
public ProcedureRegistry getProcedureRegistry()
594+
public IProcedureRegistry getProcedureRegistry()
594595
{
595596
return delegate.getProcedureRegistry();
596597
}

presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.facebook.presto.spi.connector.ConnectorTableVersion;
4444
import com.facebook.presto.spi.constraints.TableConstraint;
4545
import com.facebook.presto.spi.function.SqlFunction;
46+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
4647
import com.facebook.presto.spi.security.GrantInfo;
4748
import com.facebook.presto.spi.security.PrestoPrincipal;
4849
import com.facebook.presto.spi.security.Privilege;
@@ -486,7 +487,7 @@ public interface Metadata
486487
// TODO: metadata should not provide FunctionAndTypeManager
487488
FunctionAndTypeManager getFunctionAndTypeManager();
488489

489-
ProcedureRegistry getProcedureRegistry();
490+
IProcedureRegistry getProcedureRegistry();
490491

491492
BlockEncodingSerde getBlockEncodingSerde();
492493

presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
6363
import com.facebook.presto.spi.constraints.TableConstraint;
6464
import com.facebook.presto.spi.function.SqlFunction;
65+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
6566
import com.facebook.presto.spi.security.GrantInfo;
6667
import com.facebook.presto.spi.security.PrestoPrincipal;
6768
import com.facebook.presto.spi.security.Privilege;
@@ -141,7 +142,7 @@ public class MetadataManager
141142
private static final Logger log = Logger.get(MetadataManager.class);
142143

143144
private final FunctionAndTypeManager functionAndTypeManager;
144-
private final ProcedureRegistry procedures;
145+
private final IProcedureRegistry procedures;
145146
private final JsonCodec<ViewDefinition> viewCodec;
146147
private final BlockEncodingSerde blockEncodingSerde;
147148
private final SessionPropertyManager sessionPropertyManager;
@@ -174,7 +175,8 @@ public MetadataManager(
174175
columnPropertyManager,
175176
analyzePropertyManager,
176177
transactionManager,
177-
functionAndTypeManager);
178+
functionAndTypeManager,
179+
new ProcedureRegistry(functionAndTypeManager));
178180
}
179181

180182
@Inject
@@ -187,7 +189,8 @@ public MetadataManager(
187189
ColumnPropertyManager columnPropertyManager,
188190
AnalyzePropertyManager analyzePropertyManager,
189191
TransactionManager transactionManager,
190-
FunctionAndTypeManager functionAndTypeManager)
192+
FunctionAndTypeManager functionAndTypeManager,
193+
IProcedureRegistry procedureRegistry)
191194
{
192195
this.viewCodec = requireNonNull(viewCodec, "viewCodec is null");
193196
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
@@ -198,7 +201,7 @@ public MetadataManager(
198201
this.analyzePropertyManager = requireNonNull(analyzePropertyManager, "analyzePropertyManager is null");
199202
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
200203
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null");
201-
this.procedures = new ProcedureRegistry(functionAndTypeManager);
204+
this.procedures = requireNonNull(procedureRegistry, "procedureRegistry is null");
202205

203206
verifyComparableOrderableContract();
204207
}
@@ -1295,7 +1298,7 @@ public FunctionAndTypeManager getFunctionAndTypeManager()
12951298
}
12961299

12971300
@Override
1298-
public ProcedureRegistry getProcedureRegistry()
1301+
public IProcedureRegistry getProcedureRegistry()
12991302
{
13001303
return procedures;
13011304
}

presto-main/src/main/java/com/facebook/presto/metadata/ProcedureRegistry.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import com.facebook.presto.spi.ConnectorSession;
2222
import com.facebook.presto.spi.PrestoException;
2323
import com.facebook.presto.spi.SchemaTableName;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2426
import com.facebook.presto.spi.procedure.Procedure;
2527
import com.google.common.collect.Maps;
2628
import com.google.common.primitives.Primitives;
2729

2830
import javax.annotation.concurrent.ThreadSafe;
31+
import javax.inject.Inject;
2932

3033
import java.util.Collection;
3134
import java.util.List;
@@ -48,16 +51,19 @@
4851

4952
@ThreadSafe
5053
public class ProcedureRegistry
54+
implements IProcedureRegistry
5155
{
5256
private final Map<ConnectorId, Map<SchemaTableName, Procedure>> connectorProcedures = new ConcurrentHashMap<>();
5357

5458
private final TypeManager typeManager;
5559

60+
@Inject
5661
public ProcedureRegistry(TypeManager typeManager)
5762
{
5863
this.typeManager = requireNonNull(typeManager, "typeManager is null");
5964
}
6065

66+
@Override
6167
public void addProcedures(ConnectorId connectorId, Collection<Procedure> procedures)
6268
{
6369
requireNonNull(connectorId, "connectorId is null");
@@ -72,11 +78,13 @@ public void addProcedures(ConnectorId connectorId, Collection<Procedure> procedu
7278
checkState(connectorProcedures.putIfAbsent(connectorId, proceduresByName) == null, "Procedures already registered for connector: %s", connectorId);
7379
}
7480

81+
@Override
7582
public void removeProcedures(ConnectorId connectorId)
7683
{
7784
connectorProcedures.remove(connectorId);
7885
}
7986

87+
@Override
8088
public Procedure resolve(ConnectorId connectorId, SchemaTableName name)
8189
{
8290
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
@@ -89,8 +97,34 @@ public Procedure resolve(ConnectorId connectorId, SchemaTableName name)
8997
throw new PrestoException(PROCEDURE_NOT_FOUND, "Procedure not registered: " + name);
9098
}
9199

100+
@Override
101+
public DistributedProcedure resolveDistributed(ConnectorId connectorId, SchemaTableName name)
102+
{
103+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
104+
if (procedures != null) {
105+
Procedure procedure = procedures.get(name);
106+
if (procedure != null && procedure instanceof DistributedProcedure) {
107+
return (DistributedProcedure) procedure;
108+
}
109+
}
110+
throw new PrestoException(PROCEDURE_NOT_FOUND, "Distributed procedure not registered: " + name);
111+
}
112+
113+
@Override
114+
public boolean isDistributedProcedure(ConnectorId connectorId, SchemaTableName name)
115+
{
116+
Map<SchemaTableName, Procedure> procedures = connectorProcedures.get(connectorId);
117+
return procedures != null &&
118+
procedures.containsKey(name) &&
119+
procedures.get(name) instanceof DistributedProcedure;
120+
}
121+
92122
private void validateProcedure(Procedure procedure)
93123
{
124+
if (procedure instanceof DistributedProcedure) {
125+
return;
126+
}
127+
94128
List<Class<?>> parameters = procedure.getMethodHandle().type().parameterList().stream()
95129
.filter(type -> !ConnectorSession.class.isAssignableFrom(type))
96130
.collect(toList());

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import com.facebook.presto.metadata.Metadata;
9999
import com.facebook.presto.metadata.MetadataManager;
100100
import com.facebook.presto.metadata.MetadataUpdates;
101+
import com.facebook.presto.metadata.ProcedureRegistry;
101102
import com.facebook.presto.metadata.SchemaPropertyManager;
102103
import com.facebook.presto.metadata.SessionPropertyManager;
103104
import com.facebook.presto.metadata.StaticCatalogStore;
@@ -146,6 +147,7 @@
146147
import com.facebook.presto.spi.PageSorter;
147148
import com.facebook.presto.spi.analyzer.ViewDefinition;
148149
import com.facebook.presto.spi.function.SqlInvokedFunction;
150+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
149151
import com.facebook.presto.spi.relation.DeterminismEvaluator;
150152
import com.facebook.presto.spi.relation.DomainTranslator;
151153
import com.facebook.presto.spi.relation.PredicateCompiler;
@@ -597,6 +599,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
597599
configBinder(binder).bindConfig(StaticFunctionNamespaceStoreConfig.class);
598600
binder.bind(FunctionAndTypeManager.class).in(Scopes.SINGLETON);
599601
binder.bind(MetadataManager.class).in(Scopes.SINGLETON);
602+
binder.bind(ProcedureRegistry.class).in(Scopes.SINGLETON);
603+
binder.bind(IProcedureRegistry.class).to(ProcedureRegistry.class).in(Scopes.SINGLETON);
600604

601605
if (serverConfig.isCatalogServerEnabled() && serverConfig.isCoordinator()) {
602606
binder.bind(RemoteMetadataManager.class).in(Scopes.SINGLETON);

presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.facebook.presto.spi.connector.ConnectorTableVersion;
3939
import com.facebook.presto.spi.constraints.TableConstraint;
4040
import com.facebook.presto.spi.function.SqlFunction;
41+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
4142
import com.facebook.presto.spi.security.GrantInfo;
4243
import com.facebook.presto.spi.security.PrestoPrincipal;
4344
import com.facebook.presto.spi.security.Privilege;
@@ -608,7 +609,7 @@ public FunctionAndTypeManager getFunctionAndTypeManager()
608609
}
609610

610611
@Override
611-
public ProcedureRegistry getProcedureRegistry()
612+
public IProcedureRegistry getProcedureRegistry()
612613
{
613614
throw new UnsupportedOperationException();
614615
}

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import com.facebook.presto.metadata.InternalNodeManager;
7474
import com.facebook.presto.metadata.Metadata;
7575
import com.facebook.presto.metadata.MetadataManager;
76+
import com.facebook.presto.metadata.ProcedureRegistry;
7677
import com.facebook.presto.metadata.SchemaPropertyManager;
7778
import com.facebook.presto.metadata.SessionPropertyManager;
7879
import com.facebook.presto.metadata.StaticCatalogStore;
@@ -136,6 +137,7 @@
136137
import com.facebook.presto.spi.PageSorter;
137138
import com.facebook.presto.spi.analyzer.ViewDefinition;
138139
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
140+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
139141
import com.facebook.presto.spi.relation.DeterminismEvaluator;
140142
import com.facebook.presto.spi.relation.DomainTranslator;
141143
import com.facebook.presto.spi.relation.PredicateCompiler;
@@ -347,6 +349,8 @@ protected void setup(Binder binder)
347349
binder.bind(MetadataManager.class).in(Scopes.SINGLETON);
348350
binder.bind(Metadata.class).to(MetadataManager.class).in(Scopes.SINGLETON);
349351
binder.bind(StaticFunctionNamespaceStore.class).in(Scopes.SINGLETON);
352+
binder.bind(ProcedureRegistry.class).in(Scopes.SINGLETON);
353+
binder.bind(IProcedureRegistry.class).to(ProcedureRegistry.class).in(Scopes.SINGLETON);
350354

351355
// type
352356
newSetBinder(binder, Type.class);
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spi;
15+
16+
@SuppressWarnings("MarkerInterface")
17+
public interface ConnectorDistributedProcedureHandle
18+
{
19+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spi.connector;
15+
16+
public interface ConnectorTransactionContext
17+
{
18+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spi.procedure;
15+
16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
19+
import com.facebook.presto.spi.connector.ConnectorTransactionContext;
20+
import io.airlift.slice.Slice;
21+
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.OptionalInt;
25+
26+
import static java.lang.String.format;
27+
import static java.util.Objects.requireNonNull;
28+
29+
public class DistributedProcedure
30+
extends Procedure
31+
{
32+
public static final String SCHEMA = "schema";
33+
public static final String TABLE_NAME = "table_name";
34+
public static final String FILTER = "filter";
35+
private final BeginCallDistributedProcedure beginCallDistributedProcedure;
36+
private final FinishCallDistributedProcedure finishCallDistributedProcedure;
37+
38+
private int schemaIndex = -1;
39+
private int tableNameIndex = -1;
40+
private OptionalInt filterIndex = OptionalInt.empty();
41+
42+
public DistributedProcedure(String schema, String name, List<Argument> arguments, BeginCallDistributedProcedure beginCallDistributedProcedure, FinishCallDistributedProcedure finishCallDistributedProcedure)
43+
{
44+
super(schema, name, arguments);
45+
for (int i = 0; i < getArguments().size(); i++) {
46+
if (getArguments().get(i).getName().equals(SCHEMA)) {
47+
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
48+
format("Argument `%s` must be string type", SCHEMA));
49+
schemaIndex = i;
50+
}
51+
else if (getArguments().get(i).getName().equals(TABLE_NAME)) {
52+
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
53+
format("Argument `%s` must be string type", TABLE_NAME));
54+
tableNameIndex = i;
55+
}
56+
else if (getArguments().get(i).getName().equals(FILTER)) {
57+
filterIndex = OptionalInt.of(i);
58+
}
59+
}
60+
checkArgument(schemaIndex >= 0 && tableNameIndex >= 0,
61+
format("A distributed procedure need at least 2 arguments: `%s` and `%s` for the target table", SCHEMA, TABLE_NAME));
62+
this.beginCallDistributedProcedure = requireNonNull(beginCallDistributedProcedure, "beginTableExecute is null");
63+
this.finishCallDistributedProcedure = requireNonNull(finishCallDistributedProcedure, "finishTableExecute is null");
64+
}
65+
66+
public BeginCallDistributedProcedure getBeginCallDistributedProcedure()
67+
{
68+
return beginCallDistributedProcedure;
69+
}
70+
71+
public FinishCallDistributedProcedure getFinishCallDistributedProcedure()
72+
{
73+
return finishCallDistributedProcedure;
74+
}
75+
76+
public String getSchema(Object[] parameters)
77+
{
78+
return (String) parameters[schemaIndex];
79+
}
80+
81+
public String getTableName(Object[] parameters)
82+
{
83+
return (String) parameters[tableNameIndex];
84+
}
85+
86+
public String getFilter(Object[] parameters)
87+
{
88+
if (filterIndex.isPresent()) {
89+
return (String) parameters[filterIndex.getAsInt()];
90+
}
91+
else {
92+
return "TRUE";
93+
}
94+
}
95+
96+
@FunctionalInterface
97+
public interface BeginCallDistributedProcedure
98+
{
99+
ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorTransactionContext transactionContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments);
100+
}
101+
102+
@FunctionalInterface
103+
public interface FinishCallDistributedProcedure
104+
{
105+
void finish(ConnectorTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments);
106+
}
107+
}

0 commit comments

Comments
 (0)