Skip to content

Commit 94de465

Browse files
committed
Execute optimization, segmentation and local planning for call distributed procedure
1 parent 1eabce1 commit 94de465

File tree

45 files changed

+1099
-13
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1099
-13
lines changed

presto-common/src/main/java/com/facebook/presto/common/QualifiedObjectName.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.drift.annotations.ThriftField;
1818
import com.facebook.drift.annotations.ThriftStruct;
1919
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
2021
import com.fasterxml.jackson.annotation.JsonValue;
2122
import com.google.errorprone.annotations.Immutable;
2223

@@ -57,8 +58,12 @@ public static QualifiedObjectName valueOf(String catalogName, String schemaName,
5758
return new QualifiedObjectName(catalogName, schemaName, objectName.toLowerCase(ENGLISH));
5859
}
5960

61+
@JsonCreator
6062
@ThriftConstructor
61-
public QualifiedObjectName(String catalogName, String schemaName, String objectName)
63+
public QualifiedObjectName(
64+
@JsonProperty("catalogName") String catalogName,
65+
@JsonProperty("schemaName") String schemaName,
66+
@JsonProperty("objectName") String objectName)
6267
{
6368
checkLowerCase(catalogName, "catalogName");
6469
this.catalogName = catalogName;
@@ -72,18 +77,21 @@ public CatalogSchemaName getCatalogSchemaName()
7277
}
7378

7479
@ThriftField(1)
80+
@JsonProperty("catalogName")
7581
public String getCatalogName()
7682
{
7783
return catalogName;
7884
}
7985

8086
@ThriftField(2)
87+
@JsonProperty("schemaName")
8188
public String getSchemaName()
8289
{
8390
return schemaName;
8491
}
8592

8693
@ThriftField(3)
94+
@JsonProperty("objectName")
8795
public String getObjectName()
8896
{
8997
return objectName;

presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import com.facebook.drift.annotations.ThriftConstructor;
1818
import com.facebook.drift.annotations.ThriftField;
1919
import com.facebook.drift.annotations.ThriftStruct;
20+
import com.facebook.presto.common.QualifiedObjectName;
2021
import com.facebook.presto.metadata.DeleteTableHandle;
22+
import com.facebook.presto.metadata.DistributedProcedureHandle;
2123
import com.facebook.presto.metadata.InsertTableHandle;
2224
import com.facebook.presto.metadata.OutputTableHandle;
2325
import com.facebook.presto.spi.SchemaTableName;
@@ -35,7 +37,9 @@
3537
@JsonSubTypes.Type(value = ExecutionWriterTarget.InsertHandle.class, name = "InsertHandle"),
3638
@JsonSubTypes.Type(value = ExecutionWriterTarget.DeleteHandle.class, name = "DeleteHandle"),
3739
@JsonSubTypes.Type(value = ExecutionWriterTarget.RefreshMaterializedViewHandle.class, name = "RefreshMaterializedViewHandle"),
38-
@JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle")})
40+
@JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle"),
41+
@JsonSubTypes.Type(value = ExecutionWriterTarget.ExecuteProcedureHandle.class, name = "TableExecuteHandle")
42+
})
3943
@SuppressWarnings({"EmptyClass", "ClassMayBeInterface"})
4044
public abstract class ExecutionWriterTarget
4145
{
@@ -228,4 +232,47 @@ public String toString()
228232
return handle.toString();
229233
}
230234
}
235+
236+
public static class ExecuteProcedureHandle
237+
extends ExecutionWriterTarget
238+
{
239+
private final DistributedProcedureHandle handle;
240+
private final SchemaTableName schemaTableName;
241+
private final QualifiedObjectName procedureName;
242+
243+
@JsonCreator
244+
public ExecuteProcedureHandle(
245+
@JsonProperty("handle") DistributedProcedureHandle handle,
246+
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
247+
@JsonProperty("procedureName") QualifiedObjectName procedureName)
248+
{
249+
this.handle = requireNonNull(handle, "handle is null");
250+
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
251+
this.procedureName = requireNonNull(procedureName, "procedureName is null");
252+
}
253+
254+
@JsonProperty
255+
public DistributedProcedureHandle getHandle()
256+
{
257+
return handle;
258+
}
259+
260+
@JsonProperty
261+
public SchemaTableName getSchemaTableName()
262+
{
263+
return schemaTableName;
264+
}
265+
266+
@JsonProperty
267+
public QualifiedObjectName getProcedureName()
268+
{
269+
return procedureName;
270+
}
271+
272+
@Override
273+
public String toString()
274+
{
275+
return handle.toString();
276+
}
277+
}
231278
}

presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import com.facebook.drift.annotations.ThriftField;
1919
import com.facebook.drift.annotations.ThriftStruct;
2020
import com.facebook.presto.Session;
21+
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle;
2122
import com.facebook.presto.metadata.AnalyzeTableHandle;
2223
import com.facebook.presto.metadata.Metadata;
2324
import com.facebook.presto.spi.plan.PlanNode;
2425
import com.facebook.presto.spi.plan.TableFinishNode;
2526
import com.facebook.presto.spi.plan.TableWriterNode;
27+
import com.facebook.presto.spi.plan.TableWriterNode.CallDistributedProcedureTarget;
2628
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
2729
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
2830
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -101,6 +103,17 @@ private static Optional<ExecutionWriterTarget> createWriterTarget(Optional<Table
101103
TableWriterNode.UpdateTarget update = (TableWriterNode.UpdateTarget) target;
102104
return Optional.of(new ExecutionWriterTarget.UpdateHandle(update.getHandle(), update.getSchemaTableName()));
103105
}
106+
if (target instanceof CallDistributedProcedureTarget) {
107+
CallDistributedProcedureTarget callDistributedProcedureTarget = (CallDistributedProcedureTarget) target;
108+
return Optional.of(new ExecuteProcedureHandle(
109+
metadata.beginCallDistributedProcedure(
110+
session,
111+
callDistributedProcedureTarget.getProcedureName(),
112+
callDistributedProcedureTarget.getSourceHandle().orElse(null),
113+
callDistributedProcedureTarget.getProcedureArguments()),
114+
callDistributedProcedureTarget.getSchemaTableName(),
115+
callDistributedProcedureTarget.getProcedureName()));
116+
}
104117
throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName());
105118
}
106119

presto-main-base/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,18 @@ public Optional<ConnectorOutputMetadata> finishDeleteWithOutput(Session session,
401401
return delegate.finishDeleteWithOutput(session, tableHandle, fragments);
402402
}
403403

404+
@Override
405+
public DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments)
406+
{
407+
return delegate.beginCallDistributedProcedure(session, procedureName, tableHandle, arguments);
408+
}
409+
410+
@Override
411+
public void finishCallDistributedProcedure(Session session, DistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
412+
{
413+
delegate.finishCallDistributedProcedure(session, procedureHandle, procedureName, fragments);
414+
}
415+
404416
@Override
405417
public TableHandle beginUpdate(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
406418
{
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
17+
import com.facebook.presto.spi.ConnectorId;
18+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
19+
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
import java.util.Objects;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public final class DistributedProcedureHandle
27+
{
28+
private final ConnectorId connectorId;
29+
private final ConnectorTransactionHandle transactionHandle;
30+
private final ConnectorDistributedProcedureHandle connectorHandle;
31+
32+
@JsonCreator
33+
public DistributedProcedureHandle(
34+
@JsonProperty("connectorId") ConnectorId connectorId,
35+
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
36+
@JsonProperty("connectorHandle") ConnectorDistributedProcedureHandle connectorHandle)
37+
{
38+
this.connectorId = requireNonNull(connectorId, "connectorId is null");
39+
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
40+
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
41+
}
42+
43+
@JsonProperty
44+
public ConnectorId getConnectorId()
45+
{
46+
return connectorId;
47+
}
48+
49+
@JsonProperty
50+
public ConnectorTransactionHandle getTransactionHandle()
51+
{
52+
return transactionHandle;
53+
}
54+
55+
@JsonProperty
56+
public ConnectorDistributedProcedureHandle getConnectorHandle()
57+
{
58+
return connectorHandle;
59+
}
60+
61+
@Override
62+
public int hashCode()
63+
{
64+
return Objects.hash(connectorId, transactionHandle, connectorHandle);
65+
}
66+
67+
@Override
68+
public boolean equals(Object obj)
69+
{
70+
if (this == obj) {
71+
return true;
72+
}
73+
if (obj == null || getClass() != obj.getClass()) {
74+
return false;
75+
}
76+
DistributedProcedureHandle o = (DistributedProcedureHandle) obj;
77+
return Objects.equals(this.connectorId, o.connectorId) &&
78+
Objects.equals(this.transactionHandle, o.transactionHandle) &&
79+
Objects.equals(this.connectorHandle, o.connectorHandle);
80+
}
81+
82+
@Override
83+
public String toString()
84+
{
85+
return connectorId + ":" + connectorHandle;
86+
}
87+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
17+
18+
import javax.inject.Inject;
19+
20+
public class DistributedProcedureHandleJacksonModule
21+
extends AbstractTypedJacksonModule<ConnectorDistributedProcedureHandle>
22+
{
23+
@Inject
24+
public DistributedProcedureHandleJacksonModule(HandleResolver handleResolver)
25+
{
26+
super(ConnectorDistributedProcedureHandle.class,
27+
handleResolver::getId,
28+
handleResolver::getDistributedProcedureHandleClass);
29+
}
30+
}

presto-main-base/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public void configure(Binder binder)
3333
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
3434
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
3535
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
36+
jsonBinder(binder).addModuleBinding().to(DistributedProcedureHandleJacksonModule.class);
3637
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
3738
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
3839
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);

presto-main-base/src/main/java/com/facebook/presto/metadata/HandleResolver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.connector.system.SystemHandleResolver;
1818
import com.facebook.presto.spi.ColumnHandle;
1919
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
20+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
2021
import com.facebook.presto.spi.ConnectorHandleResolver;
2122
import com.facebook.presto.spi.ConnectorIndexHandle;
2223
import com.facebook.presto.spi.ConnectorInsertTableHandle;
@@ -119,6 +120,11 @@ public String getId(ConnectorDeleteTableHandle deleteHandle)
119120
return getId(deleteHandle, MaterializedHandleResolver::getDeleteTableHandleClass);
120121
}
121122

123+
public String getId(ConnectorDistributedProcedureHandle distributedProcedureHandle)
124+
{
125+
return getId(distributedProcedureHandle, MaterializedHandleResolver::getDistributedProcedureHandleClass);
126+
}
127+
122128
public String getId(ConnectorPartitioningHandle partitioningHandle)
123129
{
124130
return getId(partitioningHandle, MaterializedHandleResolver::getPartitioningHandleClass);
@@ -174,6 +180,11 @@ public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass(Str
174180
return resolverFor(id).getDeleteTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
175181
}
176182

183+
public Class<? extends ConnectorDistributedProcedureHandle> getDistributedProcedureHandleClass(String id)
184+
{
185+
return resolverFor(id).getDistributedProcedureHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
186+
}
187+
177188
public Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass(String id)
178189
{
179190
return resolverFor(id).getPartitioningHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
@@ -241,6 +252,7 @@ private static class MaterializedHandleResolver
241252
private final Optional<Class<? extends ConnectorOutputTableHandle>> outputTableHandle;
242253
private final Optional<Class<? extends ConnectorInsertTableHandle>> insertTableHandle;
243254
private final Optional<Class<? extends ConnectorDeleteTableHandle>> deleteTableHandle;
255+
private final Optional<Class<? extends ConnectorDistributedProcedureHandle>> distributedProcedureHandle;
244256
private final Optional<Class<? extends ConnectorPartitioningHandle>> partitioningHandle;
245257
private final Optional<Class<? extends ConnectorTransactionHandle>> transactionHandle;
246258

@@ -256,6 +268,7 @@ public MaterializedHandleResolver(ConnectorHandleResolver resolver)
256268
deleteTableHandle = getHandleClass(resolver::getDeleteTableHandleClass);
257269
partitioningHandle = getHandleClass(resolver::getPartitioningHandleClass);
258270
transactionHandle = getHandleClass(resolver::getTransactionHandleClass);
271+
distributedProcedureHandle = getHandleClass(resolver::getDistributedProcedureHandleClass);
259272
}
260273

261274
private static <T> Optional<Class<? extends T>> getHandleClass(Supplier<Class<? extends T>> callable)
@@ -308,6 +321,11 @@ public Optional<Class<? extends ConnectorDeleteTableHandle>> getDeleteTableHandl
308321
return deleteTableHandle;
309322
}
310323

324+
public Optional<Class<? extends ConnectorDistributedProcedureHandle>> getDistributedProcedureHandleClass()
325+
{
326+
return distributedProcedureHandle;
327+
}
328+
311329
public Optional<Class<? extends ConnectorPartitioningHandle>> getPartitioningHandleClass()
312330
{
313331
return partitioningHandle;

presto-main-base/src/main/java/com/facebook/presto/metadata/Metadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,16 @@ public interface Metadata
342342
*/
343343
Optional<ConnectorOutputMetadata> finishDeleteWithOutput(Session session, DeleteTableHandle tableHandle, Collection<Slice> fragments);
344344

345+
/**
346+
* Begin call distributed procedure
347+
*/
348+
DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments);
349+
350+
/**
351+
* Finish call distributed procedure
352+
*/
353+
void finishCallDistributedProcedure(Session session, DistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments);
354+
345355
/**
346356
* Begin update query
347357
*/

0 commit comments

Comments
 (0)