Skip to content

Commit bffc17e

Browse files
committed
Refactor connector spi to support call distributed procedure
1 parent 73f78e8 commit bffc17e

File tree

7 files changed

+52
-2
lines changed

7 files changed

+52
-2
lines changed

presto-main/src/main/java/com/facebook/presto/connector/ConnectorContextInstance.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.facebook.presto.spi.function.FunctionMetadataManager;
2424
import com.facebook.presto.spi.function.StandardFunctionResolution;
2525
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
26+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2627
import com.facebook.presto.spi.relation.RowExpressionService;
2728

2829
import static java.util.Objects.requireNonNull;
@@ -32,6 +33,7 @@ public class ConnectorContextInstance
3233
{
3334
private final NodeManager nodeManager;
3435
private final TypeManager typeManager;
36+
private final IProcedureRegistry procedureRegistry;
3537
private final FunctionMetadataManager functionMetadataManager;
3638
private final StandardFunctionResolution functionResolution;
3739
private final PageSorter pageSorter;
@@ -44,6 +46,7 @@ public class ConnectorContextInstance
4446
public ConnectorContextInstance(
4547
NodeManager nodeManager,
4648
TypeManager typeManager,
49+
IProcedureRegistry procedureRegistry,
4750
FunctionMetadataManager functionMetadataManager,
4851
StandardFunctionResolution functionResolution,
4952
PageSorter pageSorter,
@@ -55,6 +58,7 @@ public ConnectorContextInstance(
5558
{
5659
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
5760
this.typeManager = requireNonNull(typeManager, "typeManager is null");
61+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
5862
this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
5963
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
6064
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
@@ -77,6 +81,12 @@ public TypeManager getTypeManager()
7781
return typeManager;
7882
}
7983

84+
@Override
85+
public IProcedureRegistry getProcedureRegistry()
86+
{
87+
return procedureRegistry;
88+
}
89+
8090
@Override
8191
public FunctionMetadataManager getFunctionMetadataManager()
8292
{

presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
5353
import com.facebook.presto.spi.connector.ConnectorSplitManager;
5454
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
55+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
5556
import com.facebook.presto.spi.procedure.Procedure;
5657
import com.facebook.presto.spi.relation.DeterminismEvaluator;
5758
import com.facebook.presto.spi.relation.DomainTranslator;
@@ -112,6 +113,7 @@ public class ConnectorManager
112113
private final HandleResolver handleResolver;
113114
private final InternalNodeManager nodeManager;
114115
private final TypeManager typeManager;
116+
private final IProcedureRegistry procedureRegistry;
115117
private final PageSorter pageSorter;
116118
private final PageIndexerFactory pageIndexerFactory;
117119
private final NodeInfo nodeInfo;
@@ -148,6 +150,7 @@ public ConnectorManager(
148150
InternalNodeManager nodeManager,
149151
NodeInfo nodeInfo,
150152
TypeManager typeManager,
153+
IProcedureRegistry procedureRegistry,
151154
PageSorter pageSorter,
152155
PageIndexerFactory pageIndexerFactory,
153156
TransactionManager transactionManager,
@@ -172,6 +175,7 @@ public ConnectorManager(
172175
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
173176
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
174177
this.typeManager = requireNonNull(typeManager, "typeManager is null");
178+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
175179
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
176180
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
177181
this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null");
@@ -376,6 +380,7 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact
376380
ConnectorContext context = new ConnectorContextInstance(
377381
new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), connectorId),
378382
typeManager,
383+
procedureRegistry,
379384
metadataManager.getFunctionAndTypeManager(),
380385
new FunctionResolution(metadataManager.getFunctionAndTypeManager().getFunctionAndTypeResolver()),
381386
pageSorter,

presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.execution.TaskMetadataContext;
2828
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget;
2929
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.CreateHandle;
30+
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle;
3031
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.InsertHandle;
3132
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.RefreshMaterializedViewHandle;
3233
import com.facebook.presto.memory.context.LocalMemoryContext;
@@ -119,8 +120,11 @@ public TableWriterOperatorFactory(
119120
this.metadataUpdaterManager = requireNonNull(metadataUpdaterManager, "metadataUpdaterManager is null");
120121
this.taskMetadataContext = requireNonNull(taskMetadataContext, "taskMetadataContext is null");
121122
checkArgument(
122-
writerTarget instanceof CreateHandle || writerTarget instanceof InsertHandle || writerTarget instanceof RefreshMaterializedViewHandle,
123-
"writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle");
123+
writerTarget instanceof CreateHandle ||
124+
writerTarget instanceof InsertHandle ||
125+
writerTarget instanceof RefreshMaterializedViewHandle ||
126+
writerTarget instanceof ExecuteProcedureHandle,
127+
"writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle or TableExecuteHandle");
124128
this.target = requireNonNull(writerTarget, "writerTarget is null");
125129
this.session = session;
126130
this.statisticsAggregationOperatorFactory = requireNonNull(statisticsAggregationOperatorFactory, "statisticsAggregationOperatorFactory is null");
@@ -170,6 +174,9 @@ private ConnectorPageSink createPageSink()
170174
if (target instanceof RefreshMaterializedViewHandle) {
171175
return pageSinkManager.createPageSink(session, ((RefreshMaterializedViewHandle) target).getHandle(), pageSinkContextBuilder.build());
172176
}
177+
if (target instanceof ExecuteProcedureHandle) {
178+
return pageSinkManager.createPageSink(session, ((ExecuteProcedureHandle) target).getHandle(), pageSinkContextBuilder.build());
179+
}
173180
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
174181
}
175182

@@ -187,6 +194,9 @@ private static ConnectorId getConnectorId(ExecutionWriterTarget handle)
187194
return ((RefreshMaterializedViewHandle) handle).getHandle().getConnectorId();
188195
}
189196

197+
if (handle instanceof ExecuteProcedureHandle) {
198+
return ((ExecuteProcedureHandle) handle).getHandle().getConnectorId();
199+
}
190200
throw new UnsupportedOperationException("Unhandled target type: " + handle.getClass().getName());
191201
}
192202

presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
473473
nodeManager,
474474
nodeInfo,
475475
metadata.getFunctionAndTypeManager(),
476+
procedureRegistry,
476477
pageSorter,
477478
pageIndexerFactory,
478479
transactionManager,

presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.facebook.presto.spi.function.FunctionMetadataManager;
2323
import com.facebook.presto.spi.function.StandardFunctionResolution;
2424
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
25+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2526
import com.facebook.presto.spi.relation.RowExpressionService;
2627

2728
public interface ConnectorContext
@@ -36,6 +37,11 @@ default TypeManager getTypeManager()
3637
throw new UnsupportedOperationException();
3738
}
3839

40+
default IProcedureRegistry getProcedureRegistry()
41+
{
42+
throw new UnsupportedOperationException();
43+
}
44+
3945
default FunctionMetadataManager getFunctionMetadataManager()
4046
{
4147
throw new UnsupportedOperationException();

presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPageSinkProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,24 @@
1313
*/
1414
package com.facebook.presto.spi.connector;
1515

16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
1617
import com.facebook.presto.spi.ConnectorInsertTableHandle;
1718
import com.facebook.presto.spi.ConnectorOutputTableHandle;
1819
import com.facebook.presto.spi.ConnectorPageSink;
1920
import com.facebook.presto.spi.ConnectorSession;
2021
import com.facebook.presto.spi.PageSinkContext;
22+
import com.facebook.presto.spi.PrestoException;
23+
24+
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
2125

2226
public interface ConnectorPageSinkProvider
2327
{
2428
ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, PageSinkContext pageSinkContext);
2529

2630
ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, PageSinkContext pageSinkContext);
31+
32+
default ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext)
33+
{
34+
throw new PrestoException(NOT_SUPPORTED, "This connector does not support distributed procedure");
35+
}
2736
}

presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.spi.connector.classloader;
1515

16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
1617
import com.facebook.presto.spi.ConnectorInsertTableHandle;
1718
import com.facebook.presto.spi.ConnectorOutputTableHandle;
1819
import com.facebook.presto.spi.ConnectorPageSink;
@@ -51,4 +52,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
5152
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, insertTableHandle, pageSinkContext), classLoader);
5253
}
5354
}
55+
56+
@Override
57+
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext)
58+
{
59+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
60+
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, procedureHandle, pageSinkContext), classLoader);
61+
}
62+
}
5463
}

0 commit comments

Comments
 (0)