Skip to content

Commit 9ec75df

Browse files
authored
feat(native): Populate partitionSpec in IcebergInsertTableHandle (#26560)
## Description Advance Velox and logic to convert iceberg partition spec. ``` == NO RELEASE NOTE == ```
1 parent 1bd47cb commit 9ec75df

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,48 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toIcebergTableHandle(
108108
columnHandles);
109109
}
110110

111+
velox::connector::hive::iceberg::IcebergPartitionSpec::Field
112+
toVeloxIcebergPartitionField(
113+
const protocol::iceberg::IcebergPartitionField& field,
114+
const TypeParser& typeParser,
115+
const protocol::iceberg::PrestoIcebergSchema& schema) {
116+
std::string type;
117+
for (const auto& column : schema.columns) {
118+
if (column.name == field.name) {
119+
type = column.prestoType;
120+
break;
121+
}
122+
}
123+
124+
VELOX_USER_CHECK(
125+
!type.empty(),
126+
"Partition column not found in table schema: {}",
127+
field.name);
128+
129+
return velox::connector::hive::iceberg::IcebergPartitionSpec::Field{
130+
field.name,
131+
stringToType(type, typeParser),
132+
static_cast<velox::connector::hive::iceberg::TransformType>(
133+
field.transform),
134+
field.parameter ? *field.parameter : std::optional<int32_t>()};
135+
}
136+
137+
std::unique_ptr<velox::connector::hive::iceberg::IcebergPartitionSpec>
138+
toVeloxIcebergPartitionSpec(
139+
const protocol::iceberg::PrestoIcebergPartitionSpec& spec,
140+
const TypeParser& typeParser) {
141+
std::vector<velox::connector::hive::iceberg::IcebergPartitionSpec::Field>
142+
fields;
143+
fields.reserve(spec.fields.size());
144+
for (const auto& field : spec.fields) {
145+
fields.emplace_back(
146+
toVeloxIcebergPartitionField(field, typeParser, spec.schema));
147+
}
148+
return std::make_unique<
149+
velox::connector::hive::iceberg::IcebergPartitionSpec>(
150+
spec.specId, fields);
151+
}
152+
111153
} // namespace
112154

113155
std::unique_ptr<velox::connector::ConnectorSplit>
@@ -293,6 +335,8 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
293335
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
294336
velox::connector::hive::LocationHandle::TableType::kNew),
295337
toVeloxFileFormat(icebergOutputTableHandle->fileFormat),
338+
toVeloxIcebergPartitionSpec(
339+
icebergOutputTableHandle->partitionSpec, typeParser),
296340
std::optional(
297341
toFileCompressionKind(icebergOutputTableHandle->compressionCodec)));
298342
}
@@ -321,6 +365,8 @@ IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
321365
fmt::format("{}/data", icebergInsertTableHandle->outputPath),
322366
velox::connector::hive::LocationHandle::TableType::kExisting),
323367
toVeloxFileFormat(icebergInsertTableHandle->fileFormat),
368+
toVeloxIcebergPartitionSpec(
369+
icebergInsertTableHandle->partitionSpec, typeParser),
324370
std::optional(
325371
toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
326372
}

presto-native-execution/velox

Submodule velox updated 64 files

0 commit comments

Comments
 (0)