Skip to content

Commit 8f49cbe

Browse files
committed
Execute optimization, segmentation and local planning for call distributed procedure
1 parent 54edf15 commit 8f49cbe

36 files changed

+815
-7
lines changed

presto-common/src/main/java/com/facebook/presto/common/QualifiedObjectName.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.drift.annotations.ThriftField;
1818
import com.facebook.drift.annotations.ThriftStruct;
1919
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
2021
import com.fasterxml.jackson.annotation.JsonValue;
2122

2223
import javax.annotation.concurrent.Immutable;
@@ -58,8 +59,12 @@ public static QualifiedObjectName valueOf(String catalogName, String schemaName,
5859
return new QualifiedObjectName(catalogName, schemaName, objectName.toLowerCase(ENGLISH));
5960
}
6061

62+
@JsonCreator
6163
@ThriftConstructor
62-
public QualifiedObjectName(String catalogName, String schemaName, String objectName)
64+
public QualifiedObjectName(
65+
@JsonProperty("catalogName") String catalogName,
66+
@JsonProperty("schemaName") String schemaName,
67+
@JsonProperty("objectName") String objectName)
6368
{
6469
checkLowerCase(catalogName, "catalogName");
6570
checkLowerCase(schemaName, "schemaName");
@@ -75,18 +80,21 @@ public CatalogSchemaName getCatalogSchemaName()
7580
}
7681

7782
@ThriftField(1)
83+
@JsonProperty("catalogName")
7884
public String getCatalogName()
7985
{
8086
return catalogName;
8187
}
8288

8389
@ThriftField(2)
90+
@JsonProperty("schemaName")
8491
public String getSchemaName()
8592
{
8693
return schemaName;
8794
}
8895

8996
@ThriftField(3)
97+
@JsonProperty("objectName")
9098
public String getObjectName()
9199
{
92100
return objectName;

presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.facebook.presto.execution.scheduler;
1616

17+
import com.facebook.presto.common.QualifiedObjectName;
18+
import com.facebook.presto.metadata.DistributedProcedureHandle;
1719
import com.facebook.presto.metadata.InsertTableHandle;
1820
import com.facebook.presto.metadata.OutputTableHandle;
1921
import com.facebook.presto.spi.SchemaTableName;
@@ -31,7 +33,9 @@
3133
@JsonSubTypes.Type(value = ExecutionWriterTarget.InsertHandle.class, name = "InsertHandle"),
3234
@JsonSubTypes.Type(value = ExecutionWriterTarget.DeleteHandle.class, name = "DeleteHandle"),
3335
@JsonSubTypes.Type(value = ExecutionWriterTarget.RefreshMaterializedViewHandle.class, name = "RefreshMaterializedViewHandle"),
34-
@JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle")})
36+
@JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle"),
37+
@JsonSubTypes.Type(value = ExecutionWriterTarget.ExecuteProcedureHandle.class, name = "TableExecuteHandle")
38+
})
3539
@SuppressWarnings({"EmptyClass", "ClassMayBeInterface"})
3640
public abstract class ExecutionWriterTarget
3741
{
@@ -204,4 +208,47 @@ public String toString()
204208
return handle.toString();
205209
}
206210
}
211+
212+
public static class ExecuteProcedureHandle
213+
extends ExecutionWriterTarget
214+
{
215+
private final DistributedProcedureHandle handle;
216+
private final SchemaTableName schemaTableName;
217+
private final QualifiedObjectName procedureName;
218+
219+
@JsonCreator
220+
public ExecuteProcedureHandle(
221+
@JsonProperty("handle") DistributedProcedureHandle handle,
222+
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
223+
@JsonProperty("procedureName") QualifiedObjectName procedureName)
224+
{
225+
this.handle = requireNonNull(handle, "handle is null");
226+
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
227+
this.procedureName = requireNonNull(procedureName, "procedureName is null");
228+
}
229+
230+
@JsonProperty
231+
public DistributedProcedureHandle getHandle()
232+
{
233+
return handle;
234+
}
235+
236+
@JsonProperty
237+
public SchemaTableName getSchemaTableName()
238+
{
239+
return schemaTableName;
240+
}
241+
242+
@JsonProperty
243+
public QualifiedObjectName getProcedureName()
244+
{
245+
return procedureName;
246+
}
247+
248+
@Override
249+
public String toString()
250+
{
251+
return handle.toString();
252+
}
253+
}
207254
}

presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.facebook.presto.Session;
1818
import com.facebook.presto.common.predicate.TupleDomain;
19+
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle;
1920
import com.facebook.presto.metadata.AnalyzeTableHandle;
2021
import com.facebook.presto.metadata.Metadata;
2122
import com.facebook.presto.metadata.TableLayoutResult;
@@ -34,6 +35,7 @@
3435
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
3536
import com.facebook.presto.sql.planner.plan.TableFinishNode;
3637
import com.facebook.presto.sql.planner.plan.TableWriterNode;
38+
import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget;
3739
import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget;
3840
import com.fasterxml.jackson.annotation.JsonCreator;
3941
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -107,6 +109,17 @@ private static Optional<ExecutionWriterTarget> createWriterTarget(Optional<Table
107109
TableWriterNode.RefreshMaterializedViewReference refresh = (TableWriterNode.RefreshMaterializedViewReference) target;
108110
return Optional.of(new ExecutionWriterTarget.RefreshMaterializedViewHandle(metadata.beginRefreshMaterializedView(session, refresh.getHandle()), refresh.getSchemaTableName()));
109111
}
112+
if (target instanceof CallDistributedProcedureTarget) {
113+
CallDistributedProcedureTarget callDistributedProcedureTarget = (CallDistributedProcedureTarget) target;
114+
return Optional.of(new ExecuteProcedureHandle(
115+
metadata.beginCallDistributedProcedure(
116+
session,
117+
callDistributedProcedureTarget.getProcedureName(),
118+
callDistributedProcedureTarget.getSourceHandle().orElse(null),
119+
callDistributedProcedureTarget.getProcedureArguments()),
120+
callDistributedProcedureTarget.getSchemaTableName(),
121+
callDistributedProcedureTarget.getProcedureName()));
122+
}
110123
throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName());
111124
}
112125

presto-main/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,18 @@ public void finishDelete(Session session, TableHandle tableHandle, Collection<Sl
403403
delegate.finishDelete(session, tableHandle, fragments);
404404
}
405405

406+
@Override
407+
public DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments)
408+
{
409+
return delegate.beginCallDistributedProcedure(session, procedureName, tableHandle, arguments);
410+
}
411+
412+
@Override
413+
public void finishCallDistributedProcedure(Session session, DistributedProcedureHandle tableHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
414+
{
415+
delegate.finishCallDistributedProcedure(session, tableHandle, procedureName, fragments);
416+
}
417+
406418
@Override
407419
public TableHandle beginUpdate(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
408420
{
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
17+
import com.facebook.presto.spi.ConnectorId;
18+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
19+
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
import java.util.Objects;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public final class DistributedProcedureHandle
27+
{
28+
private final ConnectorId connectorId;
29+
private final ConnectorTransactionHandle transactionHandle;
30+
private final ConnectorDistributedProcedureHandle connectorHandle;
31+
32+
@JsonCreator
33+
public DistributedProcedureHandle(
34+
@JsonProperty("connectorId") ConnectorId connectorId,
35+
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
36+
@JsonProperty("connectorHandle") ConnectorDistributedProcedureHandle connectorHandle)
37+
{
38+
this.connectorId = requireNonNull(connectorId, "connectorId is null");
39+
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
40+
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
41+
}
42+
43+
@JsonProperty
44+
public ConnectorId getConnectorId()
45+
{
46+
return connectorId;
47+
}
48+
49+
@JsonProperty
50+
public ConnectorTransactionHandle getTransactionHandle()
51+
{
52+
return transactionHandle;
53+
}
54+
55+
@JsonProperty
56+
public ConnectorDistributedProcedureHandle getConnectorHandle()
57+
{
58+
return connectorHandle;
59+
}
60+
61+
@Override
62+
public int hashCode()
63+
{
64+
return Objects.hash(connectorId, transactionHandle, connectorHandle);
65+
}
66+
67+
@Override
68+
public boolean equals(Object obj)
69+
{
70+
if (this == obj) {
71+
return true;
72+
}
73+
if (obj == null || getClass() != obj.getClass()) {
74+
return false;
75+
}
76+
DistributedProcedureHandle o = (DistributedProcedureHandle) obj;
77+
return Objects.equals(this.connectorId, o.connectorId) &&
78+
Objects.equals(this.transactionHandle, o.transactionHandle) &&
79+
Objects.equals(this.connectorHandle, o.connectorHandle);
80+
}
81+
82+
@Override
83+
public String toString()
84+
{
85+
return connectorId + ":" + connectorHandle;
86+
}
87+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
17+
18+
import javax.inject.Inject;
19+
20+
public class DistributedProcedureHandleJacksonModule
21+
extends AbstractTypedJacksonModule<ConnectorDistributedProcedureHandle>
22+
{
23+
@Inject
24+
public DistributedProcedureHandleJacksonModule(HandleResolver handleResolver)
25+
{
26+
super(ConnectorDistributedProcedureHandle.class,
27+
handleResolver::getId,
28+
handleResolver::getDistributedProcedureHandleClass);
29+
}
30+
}

presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public void configure(Binder binder)
3232
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
3333
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
3434
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
35+
jsonBinder(binder).addModuleBinding().to(DistributedProcedureHandleJacksonModule.class);
3536
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
3637
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
3738
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);

presto-main/src/main/java/com/facebook/presto/metadata/HandleResolver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.connector.informationSchema.InformationSchemaHandleResolver;
1717
import com.facebook.presto.connector.system.SystemHandleResolver;
1818
import com.facebook.presto.spi.ColumnHandle;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
1920
import com.facebook.presto.spi.ConnectorHandleResolver;
2021
import com.facebook.presto.spi.ConnectorIndexHandle;
2122
import com.facebook.presto.spi.ConnectorInsertTableHandle;
@@ -114,6 +115,11 @@ public String getId(ConnectorInsertTableHandle insertHandle)
114115
return getId(insertHandle, MaterializedHandleResolver::getInsertTableHandleClass);
115116
}
116117

118+
public String getId(ConnectorDistributedProcedureHandle distributedProcedureHandle)
119+
{
120+
return getId(distributedProcedureHandle, MaterializedHandleResolver::getDistributedProcedureHandleClass);
121+
}
122+
117123
public String getId(ConnectorPartitioningHandle partitioningHandle)
118124
{
119125
return getId(partitioningHandle, MaterializedHandleResolver::getPartitioningHandleClass);
@@ -169,6 +175,11 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass(Str
169175
return resolverFor(id).getInsertTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
170176
}
171177

178+
public Class<? extends ConnectorDistributedProcedureHandle> getDistributedProcedureHandleClass(String id)
179+
{
180+
return resolverFor(id).getDistributedProcedureHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
181+
}
182+
172183
public Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass(String id)
173184
{
174185
return resolverFor(id).getPartitioningHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
@@ -240,6 +251,7 @@ private static class MaterializedHandleResolver
240251
private final Optional<Class<? extends ConnectorIndexHandle>> indexHandle;
241252
private final Optional<Class<? extends ConnectorOutputTableHandle>> outputTableHandle;
242253
private final Optional<Class<? extends ConnectorInsertTableHandle>> insertTableHandle;
254+
private final Optional<Class<? extends ConnectorDistributedProcedureHandle>> distributedProcedureHandle;
243255
private final Optional<Class<? extends ConnectorPartitioningHandle>> partitioningHandle;
244256
private final Optional<Class<? extends ConnectorTransactionHandle>> transactionHandle;
245257
private final Optional<Class<? extends ConnectorMetadataUpdateHandle>> metadataUpdateHandle;
@@ -256,6 +268,7 @@ public MaterializedHandleResolver(ConnectorHandleResolver resolver)
256268
partitioningHandle = getHandleClass(resolver::getPartitioningHandleClass);
257269
transactionHandle = getHandleClass(resolver::getTransactionHandleClass);
258270
metadataUpdateHandle = getHandleClass(resolver::getMetadataUpdateHandleClass);
271+
distributedProcedureHandle = getHandleClass(resolver::getDistributedProcedureHandleClass);
259272
}
260273

261274
private static <T> Optional<Class<? extends T>> getHandleClass(Supplier<Class<? extends T>> callable)
@@ -303,6 +316,11 @@ public Optional<Class<? extends ConnectorInsertTableHandle>> getInsertTableHandl
303316
return insertTableHandle;
304317
}
305318

319+
public Optional<Class<? extends ConnectorDistributedProcedureHandle>> getDistributedProcedureHandleClass()
320+
{
321+
return distributedProcedureHandle;
322+
}
323+
306324
public Optional<Class<? extends ConnectorPartitioningHandle>> getPartitioningHandleClass()
307325
{
308326
return partitioningHandle;

presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,16 @@ public interface Metadata
337337
*/
338338
void finishDelete(Session session, TableHandle tableHandle, Collection<Slice> fragments);
339339

340+
/**
341+
* Begin table execute
342+
*/
343+
DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments);
344+
345+
/**
346+
* Finish table execute
347+
*/
348+
void finishCallDistributedProcedure(Session session, DistributedProcedureHandle tableHandle, QualifiedObjectName procedureName, Collection<Slice> fragments);
349+
340350
/**
341351
* Begin update query
342352
*/

0 commit comments

Comments
 (0)