Skip to content

Commit e19fb9e

Browse files
zacw7j-sund
authored andcommitted
[native] Add protocol for index lookup join plan
1 parent 27fb20d commit e19fb9e

File tree

7 files changed

+692
-277
lines changed

7 files changed

+692
-277
lines changed

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "presto_cpp/main/common/Configs.h"
1717
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
1818
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
19+
#include <velox/type/TypeUtil.h>
1920
#include <velox/type/Filter.h>
2021
#include "velox/core/QueryCtx.h"
2122
#include "velox/exec/HashPartitionFunction.h"
@@ -1116,6 +1117,8 @@ core::JoinType toJoinType(protocol::JoinType type) {
11161117
return core::JoinType::kRight;
11171118
case protocol::JoinType::FULL:
11181119
return core::JoinType::kFull;
1120+
case protocol::JoinType::SOURCE_OUTER:
1121+
return core::JoinType::kInner; // TODO: Map to proper join type.
11191122
}
11201123

11211124
VELOX_UNSUPPORTED("Unknown join type");
@@ -1198,6 +1201,54 @@ velox::core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
11981201
ROW(std::move(outputNames), std::move(outputTypes)));
11991202
}
12001203

1204+
1205+
std::shared_ptr<const velox::core::IndexLookupJoinNode>
1206+
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
1207+
const std::shared_ptr<const protocol::IndexJoinNode>& node,
1208+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
1209+
const protocol::TaskId& taskId) {
1210+
std::vector<core::FieldAccessTypedExprPtr> leftKeys;
1211+
std::vector<core::FieldAccessTypedExprPtr> rightKeys;
1212+
1213+
leftKeys.reserve(node->criteria.size());
1214+
rightKeys.reserve(node->criteria.size());
1215+
for (const auto& clause : node->criteria) {
1216+
leftKeys.emplace_back(exprConverter_.toVeloxExpr(clause.left));
1217+
rightKeys.emplace_back(exprConverter_.toVeloxExpr(clause.right));
1218+
}
1219+
1220+
auto left = toVeloxQueryPlan(node->probeSource, tableWriteInfo, taskId);
1221+
auto right = toVeloxQueryPlan(node->indexSource, tableWriteInfo, taskId);
1222+
1223+
return std::make_shared<core::IndexLookupJoinNode>(
1224+
node->id,
1225+
core::JoinType::kInner,
1226+
/*leftKeys=*/leftKeys,
1227+
/*rightKeys=*/rightKeys,
1228+
/*joinConditions=*/std::vector<velox::core::IndexLookupConditionPtr>{},
1229+
/*left=*/left,
1230+
/*right=*/std::dynamic_pointer_cast<const core::TableScanNode>(right),
1231+
/*outputType=*/type::concatRowTypes({left->outputType(), right->outputType()}));
1232+
}
1233+
1234+
std::shared_ptr<const velox::core::TableScanNode>
1235+
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
1236+
const std::shared_ptr<const protocol::IndexSourceNode>& node,
1237+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
1238+
const protocol::TaskId& taskId) {
1239+
auto rowType = toRowType(node->outputVariables, typeParser_);
1240+
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1241+
assignments;
1242+
for (const auto& entry : node->assignments) {
1243+
assignments.emplace(
1244+
entry.first.name, toColumnHandle(entry.second.get(), typeParser_));
1245+
}
1246+
auto connectorTableHandle = toConnectorTableHandle(
1247+
node->tableHandle, exprConverter_, typeParser_, assignments);
1248+
return std::make_shared<core::TableScanNode>(
1249+
node->id, rowType, connectorTableHandle, assignments);
1250+
}
1251+
12011252
core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
12021253
const std::shared_ptr<const protocol::MarkDistinctNode>& node,
12031254
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
@@ -1724,6 +1775,14 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
17241775
std::dynamic_pointer_cast<const protocol::SemiJoinNode>(node)) {
17251776
return toVeloxQueryPlan(join, tableWriteInfo, taskId);
17261777
}
1778+
if (auto join =
1779+
std::dynamic_pointer_cast<const protocol::IndexJoinNode>(node)) {
1780+
return toVeloxQueryPlan(join, tableWriteInfo, taskId);
1781+
}
1782+
if (auto indexSource =
1783+
std::dynamic_pointer_cast<const protocol::IndexSourceNode>(node)) {
1784+
return toVeloxQueryPlan(indexSource, tableWriteInfo, taskId);
1785+
}
17271786
if (auto join =
17281787
std::dynamic_pointer_cast<const protocol::MergeJoinNode>(node)) {
17291788
return toVeloxQueryPlan(join, tableWriteInfo, taskId);

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ class VeloxQueryPlanConverterBase {
110110
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
111111
const protocol::TaskId& taskId);
112112

113+
std::shared_ptr<const velox::core::IndexLookupJoinNode> toVeloxQueryPlan(
114+
const std::shared_ptr<const protocol::IndexJoinNode>& node,
115+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
116+
const protocol::TaskId& taskId);
117+
118+
std::shared_ptr<const velox::core::TableScanNode> toVeloxQueryPlan(
119+
const std::shared_ptr<const protocol::IndexSourceNode>& node,
120+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
121+
const protocol::TaskId& taskId);
122+
113123
velox::core::PlanNodePtr toVeloxQueryPlan(
114124
const std::shared_ptr<const protocol::MarkDistinctNode>& node,
115125
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,

presto-native-execution/presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -370,9 +370,10 @@ namespace facebook::presto::protocol::hive {
370370

371371
// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays
372372
static const std::pair<BucketFunctionType, json>
373-
BucketFunctionType_enum_table[] = { // NOLINT: cert-err58-cpp
374-
{BucketFunctionType::HIVE_COMPATIBLE, "HIVE_COMPATIBLE"},
375-
{BucketFunctionType::PRESTO_NATIVE, "PRESTO_NATIVE"}};
373+
BucketFunctionType_enum_table[] =
374+
{ // NOLINT: cert-err58-cpp
375+
{BucketFunctionType::HIVE_COMPATIBLE, "HIVE_COMPATIBLE"},
376+
{BucketFunctionType::PRESTO_NATIVE, "PRESTO_NATIVE"}};
376377
void to_json(json& j, const BucketFunctionType& e) {
377378
static_assert(
378379
std::is_enum<BucketFunctionType>::value,
@@ -598,12 +599,13 @@ namespace facebook::presto::protocol::hive {
598599

599600
// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays
600601
static const std::pair<HiveCompressionCodec, json>
601-
HiveCompressionCodec_enum_table[] = { // NOLINT: cert-err58-cpp
602-
{HiveCompressionCodec::NONE, "NONE"},
603-
{HiveCompressionCodec::SNAPPY, "SNAPPY"},
604-
{HiveCompressionCodec::GZIP, "GZIP"},
605-
{HiveCompressionCodec::LZ4, "LZ4"},
606-
{HiveCompressionCodec::ZSTD, "ZSTD"}};
602+
HiveCompressionCodec_enum_table[] =
603+
{ // NOLINT: cert-err58-cpp
604+
{HiveCompressionCodec::NONE, "NONE"},
605+
{HiveCompressionCodec::SNAPPY, "SNAPPY"},
606+
{HiveCompressionCodec::GZIP, "GZIP"},
607+
{HiveCompressionCodec::LZ4, "LZ4"},
608+
{HiveCompressionCodec::ZSTD, "ZSTD"}};
607609
void to_json(json& j, const HiveCompressionCodec& e) {
608610
static_assert(
609611
std::is_enum<HiveCompressionCodec>::value,

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ namespace facebook::presto::protocol::iceberg {
2525

2626
// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays
2727
static const std::pair<ChangelogOperation, json>
28-
ChangelogOperation_enum_table[] = { // NOLINT: cert-err58-cpp
29-
{ChangelogOperation::INSERT, "INSERT"},
30-
{ChangelogOperation::DELETE, "DELETE"},
31-
{ChangelogOperation::UPDATE_BEFORE, "UPDATE_BEFORE"},
32-
{ChangelogOperation::UPDATE_AFTER, "UPDATE_AFTER"}};
28+
ChangelogOperation_enum_table[] =
29+
{ // NOLINT: cert-err58-cpp
30+
{ChangelogOperation::INSERT, "INSERT"},
31+
{ChangelogOperation::DELETE, "DELETE"},
32+
{ChangelogOperation::UPDATE_BEFORE, "UPDATE_BEFORE"},
33+
{ChangelogOperation::UPDATE_AFTER, "UPDATE_AFTER"}};
3334
void to_json(json& j, const ChangelogOperation& e) {
3435
static_assert(
3536
std::is_enum<ChangelogOperation>::value,

0 commit comments

Comments
 (0)