Skip to content

Commit 4a75c9b

Browse files
committed
[native] Support the execution of calling distributed procedures
1 parent 7b97da0 commit 4a75c9b

File tree

6 files changed

+104
-1
lines changed

6 files changed

+104
-1
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,34 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
270270
typeParser);
271271
}
272272

273+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
274+
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
275+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
276+
const TypeParser& typeParser) const {
277+
auto icebergDistributedProcedureHandle = std::dynamic_pointer_cast<
278+
protocol::iceberg::IcebergDistributedProcedureHandle>(
279+
executeProcedureHandle->handle.connectorHandle);
280+
281+
VELOX_CHECK_NOT_NULL(
282+
icebergDistributedProcedureHandle,
283+
"Unexpected call distributed procedure handle type {}",
284+
executeProcedureHandle->handle.connectorHandle->_type);
285+
286+
const auto inputColumns = toHiveColumns(
287+
icebergDistributedProcedureHandle->inputColumns, typeParser);
288+
289+
return std::make_unique<
290+
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
291+
inputColumns,
292+
std::make_shared<velox::connector::hive::LocationHandle>(
293+
fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath),
294+
fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath),
295+
velox::connector::hive::LocationHandle::TableType::kExisting),
296+
toVeloxFileFormat(icebergDistributedProcedureHandle->fileFormat),
297+
std::optional(toFileCompressionKind(
298+
icebergDistributedProcedureHandle->compressionCodec)));
299+
}
300+
273301
std::unique_ptr<protocol::ConnectorProtocol>
274302
IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
275303
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
3939
const TypeParser& typeParser,
4040
velox::connector::ColumnHandleMap& assignments) const final;
4141

42+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
43+
toVeloxInsertTableHandle(
44+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
45+
const TypeParser& typeParser) const final;
46+
4247
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
4348
const final;
4449

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ class PrestoToVeloxConnector {
8585
return {};
8686
}
8787

88+
[[nodiscard]] virtual std::unique_ptr<
89+
velox::connector::ConnectorInsertTableHandle>
90+
toVeloxInsertTableHandle(
91+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
92+
const TypeParser& typeParser) const {
93+
return {};
94+
}
95+
8896
[[nodiscard]] std::unique_ptr<velox::core::PartitionFunctionSpec>
8997
createVeloxPartitionFunctionSpec(
9098
const protocol::ConnectorPartitioningHandle* partitioningHandle,

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,6 +1539,59 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15391539
sourceVeloxPlan);
15401540
}
15411541

1542+
std::shared_ptr<const core::TableWriteNode>
1543+
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
1544+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
1545+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
1546+
const protocol::TaskId& taskId) {
1547+
const auto executeProcedureHandle =
1548+
std::dynamic_pointer_cast<protocol::ExecuteProcedureHandle>(
1549+
tableWriteInfo->writerTarget);
1550+
1551+
if (!executeProcedureHandle) {
1552+
VELOX_UNSUPPORTED(
1553+
"Unsupported execute procedure handle: {}",
1554+
toJsonString(tableWriteInfo->writerTarget));
1555+
}
1556+
1557+
std::string connectorId = executeProcedureHandle->handle.connectorId;
1558+
auto& connector = getPrestoToVeloxConnector(
1559+
executeProcedureHandle->handle.connectorHandle->_type);
1560+
auto veloxHandle = connector.toVeloxInsertTableHandle(
1561+
executeProcedureHandle.get(), typeParser_);
1562+
auto connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));
1563+
1564+
if (!connectorInsertHandle) {
1565+
VELOX_UNSUPPORTED(
1566+
"Unsupported execute procedure handle: {}",
1567+
toJsonString(tableWriteInfo->writerTarget));
1568+
}
1569+
1570+
auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
1571+
connectorId, connectorInsertHandle);
1572+
1573+
const auto outputType = toRowType(
1574+
generateOutputVariables(
1575+
{node->rowCountVariable,
1576+
node->fragmentVariable,
1577+
node->tableCommitContextVariable},
1578+
nullptr),
1579+
typeParser_);
1580+
const auto sourceVeloxPlan =
1581+
toVeloxQueryPlan(node->source, tableWriteInfo, taskId);
1582+
1583+
return std::make_shared<core::TableWriteNode>(
1584+
node->id,
1585+
toRowType(node->columns, typeParser_),
1586+
node->columnNames,
1587+
std::nullopt,
1588+
std::move(insertTableHandle),
1589+
node->partitioningScheme != nullptr,
1590+
outputType,
1591+
getCommitStrategy(),
1592+
sourceVeloxPlan);
1593+
}
1594+
15421595
std::shared_ptr<const core::TableWriteNode>
15431596
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15441597
const std::shared_ptr<const protocol::DeleteNode>& node,
@@ -1927,6 +1980,10 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
19271980
std::dynamic_pointer_cast<const protocol::TableWriterNode>(node)) {
19281981
return toVeloxQueryPlan(tableWriter, tableWriteInfo, taskId);
19291982
}
1983+
if (auto callDistributedProcedure = std::dynamic_pointer_cast<
1984+
const protocol::CallDistributedProcedureNode>(node)) {
1985+
return toVeloxQueryPlan(callDistributedProcedure, tableWriteInfo, taskId);
1986+
}
19301987
if (auto deleteNode =
19311988
std::dynamic_pointer_cast<const protocol::DeleteNode>(node)) {
19321989
return toVeloxQueryPlan(deleteNode, tableWriteInfo, taskId);

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ class VeloxQueryPlanConverterBase {
160160
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
161161
const protocol::TaskId& taskId);
162162

163+
std::shared_ptr<const velox::core::TableWriteNode> toVeloxQueryPlan(
164+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
165+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
166+
const protocol::TaskId& taskId);
167+
163168
std::shared_ptr<const velox::core::TableWriteMergeNode> toVeloxQueryPlan(
164169
const std::shared_ptr<const protocol::TableWriterMergeNode>& node,
165170
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate<
2727
IcebergSplit,
2828
NotImplemented,
2929
hive::HiveTransactionHandle,
30-
NotImplemented,
30+
IcebergDistributedProcedureHandle,
3131
NotImplemented,
3232
NotImplemented>;
3333

0 commit comments

Comments
 (0)