Skip to content

Commit 1360ea6

Browse files
PingLiuPingczentgr
authored andcommitted
Use IcebergColumnHandle
1 parent c6752c2 commit 1360ea6

File tree

3 files changed

+139
-14
lines changed

3 files changed

+139
-14
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ toVeloxIcebergPartitionSpec(
150150
spec.specId, fields);
151151
}
152152

153+
velox::parquet::ParquetFieldId toParquetField(
154+
const protocol::iceberg::ColumnIdentity& column) {
155+
std::vector<velox::parquet::ParquetFieldId> children;
156+
if (!column.children.empty()) {
157+
children.reserve(column.children.size());
158+
for (const auto& child : column.children) {
159+
children.push_back(toParquetField(child));
160+
}
161+
}
162+
return velox::parquet::ParquetFieldId(column.id, children);
163+
}
164+
153165
} // namespace
154166

155167
std::unique_ptr<velox::connector::ConnectorSplit>
@@ -234,13 +246,13 @@ IcebergPrestoToVeloxConnector::toVeloxColumnHandle(
234246
columnParseParameters.partitionDateValueFormat = velox::connector::hive::
235247
HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch;
236248
}
237-
return std::make_unique<velox::connector::hive::HiveColumnHandle>(
249+
250+
return std::make_unique<velox::connector::hive::iceberg::IcebergColumnHandle>(
238251
icebergColumn->columnIdentity.name,
239252
toHiveColumnType(icebergColumn->columnType),
240253
type,
241-
type,
242-
toRequiredSubfields(icebergColumn->requiredSubfields),
243-
columnParseParameters);
254+
toParquetField(icebergColumn->columnIdentity),
255+
toRequiredSubfields(icebergColumn->requiredSubfields));
244256
}
245257

246258
std::unique_ptr<velox::connector::ConnectorTableHandle>
@@ -324,7 +336,7 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
324336
createHandle->handle.connectorHandle->_type);
325337

326338
const auto inputColumns =
327-
toHiveColumns(icebergOutputTableHandle->inputColumns, typeParser);
339+
toIcebergColumns(icebergOutputTableHandle->inputColumns, typeParser);
328340

329341
return std::make_unique<
330342
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
@@ -354,7 +366,7 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
354366
insertHandle->handle.connectorHandle->_type);
355367

356368
const auto inputColumns =
357-
toHiveColumns(icebergInsertTableHandle->inputColumns, typeParser);
369+
toIcebergColumns(icebergInsertTableHandle->inputColumns, typeParser);
358370

359371
return std::make_unique<
360372
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
@@ -370,18 +382,20 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
370382
toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
371383
}
372384

373-
std::vector<velox::connector::hive::HiveColumnHandlePtr>
374-
IcebergPrestoToVeloxConnector::toHiveColumns(
385+
std::vector<velox::connector::hive::iceberg::IcebergColumnHandlePtr>
386+
IcebergPrestoToVeloxConnector::toIcebergColumns(
375387
const protocol::List<protocol::iceberg::IcebergColumnHandle>& inputColumns,
376388
const TypeParser& typeParser) const {
377-
std::vector<velox::connector::hive::HiveColumnHandlePtr> hiveColumns;
378-
hiveColumns.reserve(inputColumns.size());
389+
std::vector<velox::connector::hive::iceberg::IcebergColumnHandlePtr>
390+
icebergColumns;
391+
icebergColumns.reserve(inputColumns.size());
379392
for (const auto& columnHandle : inputColumns) {
380-
hiveColumns.emplace_back(
381-
std::dynamic_pointer_cast<velox::connector::hive::HiveColumnHandle>(
393+
icebergColumns.emplace_back(
394+
std::dynamic_pointer_cast<
395+
velox::connector::hive::iceberg::IcebergColumnHandle>(
382396
std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser))));
383397
}
384-
return hiveColumns;
398+
return icebergColumns;
385399
}
386400

387401
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
1818
#include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h"
1919

20+
#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h"
21+
2022
namespace facebook::presto {
2123

2224
class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
@@ -52,7 +54,8 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
5254
const TypeParser& typeParser) const final;
5355

5456
private:
55-
std::vector<velox::connector::hive::HiveColumnHandlePtr> toHiveColumns(
57+
std::vector<velox::connector::hive::iceberg::IcebergColumnHandlePtr>
58+
toIcebergColumns(
5659
const protocol::List<protocol::iceberg::IcebergColumnHandle>&
5760
inputColumns,
5861
const TypeParser& typeParser) const;

presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "velox/common/base/tests/GTestUtils.h"
2222
#include "velox/connectors/hive/HiveConnector.h"
2323
#include "velox/connectors/hive/TableHandle.h"
24+
#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h"
2425

2526
using namespace facebook::presto;
2627
using namespace facebook::velox;
@@ -185,3 +186,110 @@ TEST_F(PrestoToVeloxConnectorTest, hiveLowercasesColumnNames) {
185186
EXPECT_EQ(dataColumnsType->nameOf(0), "mixedcasecol1");
186187
EXPECT_EQ(dataColumnsType->nameOf(1), "uppercasecol2");
187188
}
189+
190+
namespace {
191+
192+
protocol::iceberg::IcebergColumnHandle createIcebergColumnHandle(
193+
const std::string& name,
194+
int32_t fieldId,
195+
const std::string& type,
196+
protocol::iceberg::TypeCategory typeCategory =
197+
protocol::iceberg::TypeCategory::PRIMITIVE,
198+
const std::vector<protocol::iceberg::ColumnIdentity>& children = {}) {
199+
protocol::iceberg::IcebergColumnHandle column;
200+
column.columnIdentity.name = name;
201+
column.columnIdentity.id = fieldId;
202+
column.columnIdentity.typeCategory = typeCategory;
203+
column.columnIdentity.children = children;
204+
column.type = type;
205+
column.columnType = protocol::hive::ColumnType::REGULAR;
206+
return column;
207+
}
208+
209+
} // namespace
210+
211+
TEST_F(PrestoToVeloxConnectorTest, icebergColumnHandleSimple) {
212+
auto icebergColumn = createIcebergColumnHandle("col1", 1, "integer");
213+
214+
IcebergPrestoToVeloxConnector icebergConnector("iceberg");
215+
auto handle =
216+
icebergConnector.toVeloxColumnHandle(&icebergColumn, *typeParser_);
217+
auto* icebergHandle =
218+
dynamic_cast<connector::hive::iceberg::IcebergColumnHandle*>(
219+
handle.get());
220+
ASSERT_NE(icebergHandle, nullptr);
221+
222+
EXPECT_EQ(icebergHandle->name(), "col1");
223+
EXPECT_EQ(icebergHandle->dataType()->kind(), TypeKind::INTEGER);
224+
EXPECT_EQ(icebergHandle->field().fieldId, 1);
225+
EXPECT_TRUE(icebergHandle->field().children.empty());
226+
}
227+
228+
TEST_F(PrestoToVeloxConnectorTest, icebergColumnHandleNested) {
229+
protocol::iceberg::ColumnIdentity child1;
230+
child1.name = "child1";
231+
child1.id = 2;
232+
child1.typeCategory = protocol::iceberg::TypeCategory::PRIMITIVE;
233+
234+
protocol::iceberg::ColumnIdentity child2;
235+
child2.name = "child2";
236+
child2.id = 3;
237+
child2.typeCategory = protocol::iceberg::TypeCategory::PRIMITIVE;
238+
239+
auto icebergColumn = createIcebergColumnHandle(
240+
"struct_col",
241+
1,
242+
"row(child1 integer, child2 varchar)",
243+
protocol::iceberg::TypeCategory::STRUCT,
244+
{child1, child2});
245+
246+
IcebergPrestoToVeloxConnector icebergConnector("iceberg");
247+
auto handle =
248+
icebergConnector.toVeloxColumnHandle(&icebergColumn, *typeParser_);
249+
auto* icebergHandle =
250+
dynamic_cast<connector::hive::iceberg::IcebergColumnHandle*>(
251+
handle.get());
252+
ASSERT_NE(icebergHandle, nullptr);
253+
254+
EXPECT_EQ(icebergHandle->name(), "struct_col");
255+
EXPECT_EQ(icebergHandle->dataType()->kind(), TypeKind::ROW);
256+
EXPECT_EQ(icebergHandle->field().fieldId, 1);
257+
ASSERT_EQ(icebergHandle->field().children.size(), 2);
258+
EXPECT_EQ(icebergHandle->field().children[0].fieldId, 2);
259+
EXPECT_EQ(icebergHandle->field().children[1].fieldId, 3);
260+
}
261+
262+
TEST_F(PrestoToVeloxConnectorTest, icebergColumnHandleDeeplyNested) {
263+
protocol::iceberg::ColumnIdentity inner;
264+
inner.name = "inner";
265+
inner.id = 3;
266+
inner.typeCategory = protocol::iceberg::TypeCategory::PRIMITIVE;
267+
268+
protocol::iceberg::ColumnIdentity middle;
269+
middle.name = "middle";
270+
middle.id = 2;
271+
middle.typeCategory = protocol::iceberg::TypeCategory::STRUCT;
272+
middle.children = {inner};
273+
274+
auto icebergColumn = createIcebergColumnHandle(
275+
"outer",
276+
1,
277+
"row(middle row(inner bigint))",
278+
protocol::iceberg::TypeCategory::STRUCT,
279+
{middle});
280+
281+
IcebergPrestoToVeloxConnector icebergConnector("iceberg");
282+
auto handle =
283+
icebergConnector.toVeloxColumnHandle(&icebergColumn, *typeParser_);
284+
auto* icebergHandle =
285+
dynamic_cast<connector::hive::iceberg::IcebergColumnHandle*>(
286+
handle.get());
287+
ASSERT_NE(icebergHandle, nullptr);
288+
289+
EXPECT_EQ(icebergHandle->name(), "outer");
290+
EXPECT_EQ(icebergHandle->field().fieldId, 1);
291+
ASSERT_EQ(icebergHandle->field().children.size(), 1);
292+
EXPECT_EQ(icebergHandle->field().children[0].fieldId, 2);
293+
ASSERT_EQ(icebergHandle->field().children[0].children.size(), 1);
294+
EXPECT_EQ(icebergHandle->field().children[0].children[0].fieldId, 3);
295+
}

0 commit comments

Comments
 (0)