Skip to content

Commit 3f53a40

Browse files
committed
[native] Prepare for the actual support of distributed procedures
1 parent e80dbc5 commit 3f53a40

File tree

9 files changed

+102
-1
lines changed

9 files changed

+102
-1
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
198198
typeParser);
199199
}
200200

201+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
202+
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
203+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
204+
const TypeParser& typeParser) const {
205+
// TODO: requires data insertion support
206+
VELOX_FAIL("Not yet supported, requires data insertion support first");
207+
}
208+
201209
std::unique_ptr<protocol::ConnectorProtocol>
202210
IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
203211
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
3838
const TypeParser& typeParser,
3939
velox::connector::ColumnHandleMap& assignments) const final;
4040

41+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle> toVeloxInsertTableHandle(
42+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
43+
const TypeParser& typeParser) const final;
44+
4145
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
4246
const final;
4347
};

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ class PrestoToVeloxConnector {
107107
return {};
108108
}
109109

110+
[[nodiscard]] virtual std::unique_ptr<
111+
velox::connector::ConnectorInsertTableHandle>
112+
toVeloxInsertTableHandle(
113+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
114+
const TypeParser& typeParser) const {
115+
return {};
116+
}
117+
110118
[[nodiscard]] std::unique_ptr<velox::core::PartitionFunctionSpec>
111119
createVeloxPartitionFunctionSpec(
112120
const protocol::ConnectorPartitioningHandle* partitioningHandle,

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,6 +1540,57 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15401540
sourceVeloxPlan);
15411541
}
15421542

1543+
std::shared_ptr<const core::TableWriteNode>
1544+
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
1545+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
1546+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
1547+
const protocol::TaskId& taskId) {
1548+
const auto executeProcedureHandle = std::dynamic_pointer_cast<protocol::ExecuteProcedureHandle>(tableWriteInfo->writerTarget);
1549+
1550+
if (!executeProcedureHandle) {
1551+
VELOX_UNSUPPORTED(
1552+
"Unsupported execute procedure handle: {}",
1553+
toJsonString(tableWriteInfo->writerTarget));
1554+
}
1555+
1556+
std::string connectorId = executeProcedureHandle->handle.connectorId;
1557+
auto& connector =
1558+
getPrestoToVeloxConnector(executeProcedureHandle->handle.connectorHandle->_type);
1559+
auto veloxHandle = connector.toVeloxInsertTableHandle(
1560+
executeProcedureHandle.get(), typeParser_);
1561+
auto connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));
1562+
1563+
if (!connectorInsertHandle) {
1564+
VELOX_UNSUPPORTED(
1565+
"Unsupported execute procedure handle: {}",
1566+
toJsonString(tableWriteInfo->writerTarget));
1567+
}
1568+
1569+
auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
1570+
connectorId, connectorInsertHandle);
1571+
1572+
const auto outputType = toRowType(
1573+
generateOutputVariables(
1574+
{node->rowCountVariable,
1575+
node->fragmentVariable,
1576+
node->tableCommitContextVariable},
1577+
nullptr),
1578+
typeParser_);
1579+
const auto sourceVeloxPlan =
1580+
toVeloxQueryPlan(node->source, tableWriteInfo, taskId);
1581+
1582+
return std::make_shared<core::TableWriteNode>(
1583+
node->id,
1584+
toRowType(node->columns, typeParser_),
1585+
node->columnNames,
1586+
std::nullopt,
1587+
std::move(insertTableHandle),
1588+
node->partitioningScheme != nullptr,
1589+
outputType,
1590+
getCommitStrategy(),
1591+
sourceVeloxPlan);
1592+
}
1593+
15431594
std::shared_ptr<const core::TableWriteNode>
15441595
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15451596
const std::shared_ptr<const protocol::DeleteNode>& node,
@@ -1928,6 +1979,10 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
19281979
std::dynamic_pointer_cast<const protocol::TableWriterNode>(node)) {
19291980
return toVeloxQueryPlan(tableWriter, tableWriteInfo, taskId);
19301981
}
1982+
if (auto callDistributedProcedure =
1983+
std::dynamic_pointer_cast<const protocol::CallDistributedProcedureNode>(node)) {
1984+
return toVeloxQueryPlan(callDistributedProcedure, tableWriteInfo, taskId);
1985+
}
19311986
if (auto deleteNode =
19321987
std::dynamic_pointer_cast<const protocol::DeleteNode>(node)) {
19331988
return toVeloxQueryPlan(deleteNode, tableWriteInfo, taskId);

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ class VeloxQueryPlanConverterBase {
160160
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
161161
const protocol::TaskId& taskId);
162162

163+
std::shared_ptr<const velox::core::TableWriteNode> toVeloxQueryPlan(
164+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
165+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
166+
const protocol::TaskId& taskId);
167+
163168
std::shared_ptr<const velox::core::TableWriteMergeNode> toVeloxQueryPlan(
164169
const std::shared_ptr<const protocol::TableWriterMergeNode>& node,
165170
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,

presto-native-execution/presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ using HiveConnectorProtocol = ConnectorProtocolTemplate<
2626
HivePartitioningHandle,
2727
HiveTransactionHandle,
2828
NotImplemented,
29+
NotImplemented,
2930
NotImplemented>;
3031
} // namespace facebook::presto::protocol::hive

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate<
2727
IcebergSplit,
2828
NotImplemented,
2929
hive::HiveTransactionHandle,
30+
IcebergDistributedProcedureHandle,
3031
NotImplemented,
3132
NotImplemented>;
3233

presto-native-execution/presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ using TpchConnectorProtocol = ConnectorProtocolTemplate<
2929
TpchPartitioningHandle,
3030
TpchTransactionHandle,
3131
NotImplemented,
32+
NotImplemented,
3233
NotImplemented>;
3334

3435
} // namespace facebook::presto::protocol::tpch

presto-native-execution/presto_cpp/presto_protocol/core/ConnectorProtocol.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ class ConnectorProtocol {
7575
const std::string& thrift,
7676
std::shared_ptr<ConnectorInsertTableHandle>& proto) const = 0;
7777

78+
virtual void to_json(
79+
json& j,
80+
const std::shared_ptr<ConnectorDistributedProcedureHandle>& p) const = 0;
81+
virtual void from_json(
82+
const json& j,
83+
std::shared_ptr<ConnectorDistributedProcedureHandle>& p) const = 0;
84+
7885
virtual void to_json(
7986
json& j,
8087
const std::shared_ptr<ConnectorOutputTableHandle>& p) const = 0;
@@ -117,7 +124,7 @@ class ConnectorProtocol {
117124
std::string& thrift) const = 0;
118125
virtual void deserialize(
119126
const std::string& thrift,
120-
std::shared_ptr<ConnectorTransactionHandle>& proto) const = 0;
127+
std::shared_ptr<ConnectorTransactionHandle>& proto) const = 0;
121128

122129
virtual void to_json(
123130
json& j,
@@ -153,6 +160,7 @@ template <
153160
typename ConnectorSplitType = NotImplemented,
154161
typename ConnectorPartitioningHandleType = NotImplemented,
155162
typename ConnectorTransactionHandleType = NotImplemented,
163+
typename ConnectorDistributedProcedureHandleType = NotImplemented,
156164
typename ConnectorDeleteTableHandleType = NotImplemented,
157165
typename ConnectorIndexHandleType = NotImplemented>
158166
class ConnectorProtocolTemplate final : public ConnectorProtocol {
@@ -221,6 +229,15 @@ class ConnectorProtocolTemplate final : public ConnectorProtocol {
221229
deserializeTemplate<ConnectorInsertTableHandleType>(thrift, proto);
222230
}
223231

232+
void to_json(json& j, const std::shared_ptr<ConnectorDistributedProcedureHandle>& p)
233+
const final {
234+
to_json_template<ConnectorDistributedProcedureHandleType>(j, p);
235+
}
236+
void from_json(const json& j, std::shared_ptr<ConnectorDistributedProcedureHandle>& p)
237+
const final {
238+
from_json_template<ConnectorDistributedProcedureHandleType>(j, p);
239+
}
240+
224241
void to_json(json& j, const std::shared_ptr<ConnectorOutputTableHandle>& p)
225242
const final {
226243
to_json_template<ConnectorOutputTableHandleType>(j, p);
@@ -406,6 +423,7 @@ using SystemConnectorProtocol = ConnectorProtocolTemplate<
406423
SystemPartitioningHandle,
407424
SystemTransactionHandle,
408425
NotImplemented,
426+
NotImplemented,
409427
NotImplemented>;
410428

411429
} // namespace facebook::presto::protocol

0 commit comments

Comments
 (0)