Skip to content

Commit 8dd2b4c

Browse files
kevinwilfongkgpai
authored andcommitted
[native] Add Builder classes for custom PlanNodes
1 parent 3145f65 commit 8dd2b4c

File tree

6 files changed

+419
-4
lines changed

6 files changed

+419
-4
lines changed

presto-native-execution/presto_cpp/main/operators/BroadcastWrite.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,61 @@ class BroadcastWriteNode : public velox::core::PlanNode {
3535
serdeRowType_{serdeRowType},
3636
sources_{std::move(source)} {}
3737

38+
class Builder {
39+
public:
40+
Builder() = default;
41+
42+
explicit Builder(const BroadcastWriteNode& other) {
43+
id_ = other.id();
44+
basePath_ = other.basePath();
45+
serdeRowType_ = other.serdeRowType();
46+
source_ = other.sources()[0];
47+
}
48+
49+
Builder& id(velox::core::PlanNodeId id) {
50+
id_ = std::move(id);
51+
return *this;
52+
}
53+
54+
Builder& basePath(std::string basePath) {
55+
basePath_ = std::move(basePath);
56+
return *this;
57+
}
58+
59+
Builder& serdeRowType(velox::RowTypePtr serdeRowType) {
60+
serdeRowType_ = std::move(serdeRowType);
61+
return *this;
62+
}
63+
64+
Builder& source(velox::core::PlanNodePtr source) {
65+
source_ = std::move(source);
66+
return *this;
67+
}
68+
69+
std::shared_ptr<BroadcastWriteNode> build() const {
70+
VELOX_USER_CHECK(id_.has_value(), "BroadcastWriteNode id is not set");
71+
VELOX_USER_CHECK(
72+
basePath_.has_value(), "BroadcastWriteNode basePath is not set");
73+
VELOX_USER_CHECK(
74+
serdeRowType_.has_value(),
75+
"BroadcastWriteNode serdeRowType is not set");
76+
VELOX_USER_CHECK(
77+
source_.has_value(), "BroadcastWriteNode source is not set");
78+
79+
return std::make_shared<BroadcastWriteNode>(
80+
id_.value(),
81+
basePath_.value(),
82+
serdeRowType_.value(),
83+
source_.value());
84+
}
85+
86+
private:
87+
std::optional<velox::core::PlanNodeId> id_;
88+
std::optional<std::string> basePath_;
89+
std::optional<velox::RowTypePtr> serdeRowType_;
90+
std::optional<velox::core::PlanNodePtr> source_;
91+
};
92+
3893
folly::dynamic serialize() const override;
3994

4095
static velox::core::PlanNodePtr create(

presto-native-execution/presto_cpp/main/operators/PartitionAndSerialize.h

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,117 @@ class PartitionAndSerializeNode : public velox::core::PlanNode {
5151
partitionFunctionSpec_, "Partition function factory cannot be null.");
5252
}
5353

54+
class Builder {
55+
public:
56+
Builder() = default;
57+
58+
explicit Builder(const PartitionAndSerializeNode& other) {
59+
id_ = other.id();
60+
keys_ = other.keys();
61+
numPartitions_ = other.numPartitions();
62+
serializedRowType_ = other.serializedRowType();
63+
source_ = other.sources()[0];
64+
replicateNullsAndAny_ = other.isReplicateNullsAndAny();
65+
partitionFunctionFactory_ = other.partitionFunctionFactory();
66+
sortingOrders_ = other.sortingOrders();
67+
sortingKeys_ = other.sortingKeys();
68+
}
69+
70+
Builder& id(velox::core::PlanNodeId id) {
71+
id_ = std::move(id);
72+
return *this;
73+
}
74+
75+
Builder& keys(std::vector<velox::core::TypedExprPtr> keys) {
76+
keys_ = std::move(keys);
77+
return *this;
78+
}
79+
80+
Builder& numPartitions(uint32_t numPartitions) {
81+
numPartitions_ = numPartitions;
82+
return *this;
83+
}
84+
85+
Builder& serializedRowType(velox::RowTypePtr serializedRowType) {
86+
serializedRowType_ = std::move(serializedRowType);
87+
return *this;
88+
}
89+
90+
Builder& source(velox::core::PlanNodePtr source) {
91+
source_ = std::move(source);
92+
return *this;
93+
}
94+
95+
Builder& replicateNullsAndAny(bool replicateNullsAndAny) {
96+
replicateNullsAndAny_ = replicateNullsAndAny;
97+
return *this;
98+
}
99+
100+
Builder& partitionFunctionFactory(
101+
velox::core::PartitionFunctionSpecPtr partitionFunctionFactory) {
102+
partitionFunctionFactory_ = std::move(partitionFunctionFactory);
103+
return *this;
104+
}
105+
106+
Builder& sortingOrders(
107+
std::optional<std::vector<velox::core::SortOrder>> sortingOrders) {
108+
sortingOrders_ = std::move(sortingOrders);
109+
return *this;
110+
}
111+
112+
Builder& sortingKeys(
113+
std::optional<std::vector<velox::core::FieldAccessTypedExprPtr>>
114+
sortingKeys) {
115+
sortingKeys_ = sortingKeys;
116+
return *this;
117+
}
118+
119+
std::shared_ptr<PartitionAndSerializeNode> build() const {
120+
VELOX_USER_CHECK(
121+
id_.has_value(), "PartitionAndSerializeNode id is not set");
122+
VELOX_USER_CHECK(
123+
keys_.has_value(), "PartitionAndSerializeNode keys is not set");
124+
VELOX_USER_CHECK(
125+
numPartitions_.has_value(),
126+
"PartitionAndSerializeNode numPartitions is not set");
127+
VELOX_USER_CHECK(
128+
serializedRowType_.has_value(),
129+
"PartitionAndSerializeNode serializedRowType is not set");
130+
VELOX_USER_CHECK(
131+
source_.has_value(), "PartitionAndSerializeNode source is not set");
132+
VELOX_USER_CHECK(
133+
replicateNullsAndAny_.has_value(),
134+
"PartitionAndSerializeNode replicateNullsAndAny is not set");
135+
VELOX_USER_CHECK(
136+
partitionFunctionFactory_.has_value(),
137+
"PartitionAndSerializeNode partitionFunctionFactory is not set");
138+
139+
return std::make_shared<PartitionAndSerializeNode>(
140+
id_.value(),
141+
keys_.value(),
142+
numPartitions_.value(),
143+
serializedRowType_.value(),
144+
source_.value(),
145+
replicateNullsAndAny_.value(),
146+
partitionFunctionFactory_.value(),
147+
sortingOrders_,
148+
sortingKeys_);
149+
}
150+
151+
private:
152+
std::optional<velox::core::PlanNodeId> id_;
153+
std::optional<std::vector<velox::core::TypedExprPtr>> keys_;
154+
std::optional<uint32_t> numPartitions_;
155+
std::optional<velox::RowTypePtr> serializedRowType_;
156+
std::optional<velox::core::PlanNodePtr> source_;
157+
std::optional<bool> replicateNullsAndAny_;
158+
std::optional<velox::core::PartitionFunctionSpecPtr>
159+
partitionFunctionFactory_;
160+
std::optional<std::vector<velox::core::SortOrder>> sortingOrders_;
161+
std::optional<std::vector<velox::core::FieldAccessTypedExprPtr>>
162+
sortingKeys_;
163+
};
164+
54165
folly::dynamic serialize() const override;
55166

56167
static velox::core::PlanNodePtr create(

presto-native-execution/presto_cpp/main/operators/ShuffleRead.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,39 @@ class ShuffleReadNode : public velox::core::PlanNode {
2222
ShuffleReadNode(const velox::core::PlanNodeId& id, velox::RowTypePtr type)
2323
: PlanNode(id), outputType_(type) {}
2424

25+
class Builder {
26+
public:
27+
Builder() = default;
28+
29+
explicit Builder(const ShuffleReadNode& other) {
30+
id_ = other.id();
31+
outputType_ = other.outputType();
32+
}
33+
34+
Builder& id(velox::core::PlanNodeId id) {
35+
id_ = std::move(id);
36+
return *this;
37+
}
38+
39+
Builder& outputType(velox::RowTypePtr outputType) {
40+
outputType_ = std::move(outputType);
41+
return *this;
42+
}
43+
44+
std::shared_ptr<ShuffleReadNode> build() const {
45+
VELOX_USER_CHECK(id_.has_value(), "ShuffleReadNode id is not set");
46+
VELOX_USER_CHECK(
47+
outputType_.has_value(), "ShuffleReadNode outputType is not set");
48+
49+
return std::make_shared<ShuffleReadNode>(
50+
id_.value(), outputType_.value());
51+
}
52+
53+
private:
54+
std::optional<velox::core::PlanNodeId> id_;
55+
std::optional<velox::RowTypePtr> outputType_;
56+
};
57+
2558
folly::dynamic serialize() const override;
2659

2760
static velox::core::PlanNodePtr create(

presto-native-execution/presto_cpp/main/operators/ShuffleWrite.h

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,73 @@ class ShuffleWriteNode : public velox::core::PlanNode {
3333
serializedShuffleWriteInfo_(serializedShuffleWriteInfo),
3434
sources_{std::move(source)} {}
3535

36+
class Builder {
37+
public:
38+
Builder() = default;
39+
40+
explicit Builder(const ShuffleWriteNode& other) {
41+
id_ = other.id();
42+
numPartitions_ = other.numPartitions();
43+
shuffleName_ = other.shuffleName();
44+
serializedShuffleWriteInfo_ = other.serializedShuffleWriteInfo();
45+
source_ = other.sources()[0];
46+
}
47+
48+
Builder& id(velox::core::PlanNodeId id) {
49+
id_ = std::move(id);
50+
return *this;
51+
}
52+
53+
Builder& numPartitions(uint32_t numPartitions) {
54+
numPartitions_ = numPartitions;
55+
return *this;
56+
}
57+
58+
Builder& shuffleName(std::string shuffleName) {
59+
shuffleName_ = std::move(shuffleName);
60+
return *this;
61+
}
62+
63+
Builder& serializedShuffleWriteInfo(
64+
std::string serializedShuffleWriteInfo) {
65+
serializedShuffleWriteInfo_ = std::move(serializedShuffleWriteInfo);
66+
return *this;
67+
}
68+
69+
Builder& source(velox::core::PlanNodePtr source) {
70+
source_ = std::move(source);
71+
return *this;
72+
}
73+
74+
std::shared_ptr<ShuffleWriteNode> build() const {
75+
VELOX_USER_CHECK(id_.has_value(), "ShuffleWriteNode id is not set");
76+
VELOX_USER_CHECK(
77+
numPartitions_.has_value(),
78+
"ShuffleWriteNode numPartitions_ is not set");
79+
VELOX_USER_CHECK(
80+
shuffleName_.has_value(), "ShuffleWriteNode shuffleName is not set");
81+
VELOX_USER_CHECK(
82+
serializedShuffleWriteInfo_.has_value(),
83+
"ShuffleWriteNode serializedShuffleWriteInfo is not set");
84+
VELOX_USER_CHECK(
85+
source_.has_value(), "ShuffleWriteNode source is not set");
86+
87+
return std::make_shared<ShuffleWriteNode>(
88+
id_.value(),
89+
numPartitions_.value(),
90+
shuffleName_.value(),
91+
serializedShuffleWriteInfo_.value(),
92+
source_.value());
93+
}
94+
95+
private:
96+
std::optional<velox::core::PlanNodeId> id_;
97+
std::optional<uint32_t> numPartitions_;
98+
std::optional<std::string> shuffleName_;
99+
std::optional<std::string> serializedShuffleWriteInfo_;
100+
std::optional<velox::core::PlanNodePtr> source_;
101+
};
102+
36103
folly::dynamic serialize() const override;
37104

38105
static velox::core::PlanNodePtr create(

presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ add_library(presto_operators_plan_builder PlanBuilder.cpp)
1313

1414
target_link_libraries(presto_operators_plan_builder velox_core)
1515

16-
add_executable(presto_operators_test PlanNodeSerdeTest.cpp
17-
UnsafeRowShuffleTest.cpp
18-
BroadcastTest.cpp
19-
BinarySortableSerializerTest.cpp)
16+
add_executable(
17+
presto_operators_test
18+
PlanNodeSerdeTest.cpp UnsafeRowShuffleTest.cpp BroadcastTest.cpp
19+
BinarySortableSerializerTest.cpp PlanNodeBuilderTest.cpp)
2020

2121
add_test(presto_operators_test presto_operators_test)
2222

0 commit comments

Comments
 (0)