Skip to content

Commit c5fe11f

Browse files
wgtmacshuxu.li
authored andcommitted
feat: add support for avro to arrow data conversion (#124)
Preliminary avro datum to arrow array data conversion support: - Support projecting all primitive and nested types. - Support missing fields (reading into null values). - Support int->long and float->double promotion. - Add test cases for all primitive and nested types.
1 parent 04d56d0 commit c5fe11f

File tree

8 files changed

+1245
-30
lines changed

8 files changed

+1245
-30
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ function(resolve_arrow_dependency)
7171
set(ARROW_FILESYSTEM
7272
ON
7373
CACHE BOOL "" FORCE)
74+
set(ARROW_JSON
75+
ON
76+
CACHE BOOL "" FORCE)
7477
set(ARROW_PARQUET
7578
ON
7679
CACHE BOOL "" FORCE)
@@ -95,8 +98,8 @@ function(resolve_arrow_dependency)
9598

9699
fetchcontent_declare(VendoredArrow
97100
${FC_DECLARE_COMMON_OPTIONS}
98-
GIT_REPOSITORY https://github.com/wgtmac/arrow.git
99-
GIT_TAG 7d50c4ac803ad983734de5f418b7cd18f25b0dc9
101+
GIT_REPOSITORY https://github.com/apache/arrow.git
102+
GIT_TAG 5f0aeb5de53fb25b59a52661a80071faef99a4a4
100103
#URL ${ARROW_SOURCE_URL}
101104
#URL_HASH "SHA256=${ICEBERG_ARROW_BUILD_SHA256_CHECKSUM}"
102105
SOURCE_SUBDIR

src/iceberg/avro/avro_data_util.cc

Lines changed: 426 additions & 2 deletions
Large diffs are not rendered by default.

src/iceberg/avro/avro_data_util_internal.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,21 @@
2626

2727
namespace iceberg::avro {
2828

29+
/// \brief Append an Avro datum to an Arrow array builder.
30+
///
31+
/// This function handles schema evolution by using the provided projection to map
32+
/// fields from the Avro data to the expected Arrow schema.
33+
///
34+
/// \param avro_node The Avro schema node (must be a record at root level)
35+
/// \param avro_datum The Avro data to append
36+
/// \param projection Schema projection from `projected_schema` to `avro_node`
37+
/// \param projected_schema The projected schema
38+
/// \param array_builder The Arrow array builder to append to (must be a struct builder)
39+
/// \return Status indicating success or failure
2940
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
3041
const ::avro::GenericDatum& avro_datum,
3142
const SchemaProjection& projection,
32-
const Schema& arrow_schema,
43+
const Schema& projected_schema,
3344
::arrow::ArrayBuilder* array_builder);
3445

3546
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util.cc

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
7373

7474
} // namespace
7575

76+
std::string ToString(const ::avro::NodePtr& node) {
77+
std::stringstream ss;
78+
ss << *node;
79+
return ss.str();
80+
}
81+
82+
std::string ToString(const ::avro::LogicalType& logical_type) {
83+
std::stringstream ss;
84+
logical_type.printJson(ss);
85+
return ss.str();
86+
}
87+
88+
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
89+
return ToString(::avro::LogicalType(logical_type));
90+
}
91+
7692
Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) {
7793
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL);
7894
return {};
@@ -383,33 +399,11 @@ Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.r
383399

384400
namespace {
385401

386-
std::string ToString(const ::avro::NodePtr& node) {
387-
std::stringstream ss;
388-
ss << *node;
389-
return ss.str();
390-
}
391-
392-
std::string ToString(const ::avro::LogicalType& logical_type) {
393-
std::stringstream ss;
394-
logical_type.printJson(ss);
395-
return ss.str();
396-
}
397-
398-
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
399-
return ToString(::avro::LogicalType(logical_type));
400-
}
401-
402402
bool HasLogicalType(const ::avro::NodePtr& node,
403403
::avro::LogicalType::Type expected_type) {
404404
return node->logicalType().type() == expected_type;
405405
}
406406

407-
bool HasMapLogicalType(const ::avro::NodePtr& node) {
408-
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
409-
node->logicalType().customLogicalType() != nullptr &&
410-
node->logicalType().customLogicalType()->name() == "map";
411-
}
412-
413407
std::optional<std::string> GetAdjustToUtc(const ::avro::NodePtr& node) {
414408
if (node->customAttributes() == 0) {
415409
return std::nullopt;
@@ -501,7 +495,7 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
501495
case TypeId::kTimestamp:
502496
if (avro_node->type() == ::avro::AVRO_LONG &&
503497
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
504-
GetAdjustToUtc(avro_node).value_or("false") == "true") {
498+
GetAdjustToUtc(avro_node).value_or("false") == "false") {
505499
return {};
506500
}
507501
break;
@@ -676,6 +670,10 @@ Result<FieldProjection> ProjectList(const ListType& list_type,
676670
ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node));
677671
}
678672

673+
// Set the element projection metadata but preserve its children
674+
element_projection.kind = FieldProjection::Kind::kProjected;
675+
element_projection.from = FieldProjection::SourceFieldIndex{0};
676+
679677
FieldProjection result;
680678
result.children.emplace_back(std::move(element_projection));
681679
return result;
@@ -771,6 +769,12 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
771769

772770
} // namespace
773771

772+
bool HasMapLogicalType(const ::avro::NodePtr& node) {
773+
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
774+
node->logicalType().customLogicalType() != nullptr &&
775+
node->logicalType().customLogicalType()->name() == "map";
776+
}
777+
774778
Result<SchemaProjection> Project(const Schema& expected_schema,
775779
const ::avro::NodePtr& avro_node, bool prune_source) {
776780
ICEBERG_ASSIGN_OR_RAISE(

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,13 @@ class HasIdVisitor {
135135
Result<SchemaProjection> Project(const Schema& expected_schema,
136136
const ::avro::NodePtr& avro_node, bool prune_source);
137137

138+
std::string ToString(const ::avro::NodePtr& node);
139+
std::string ToString(const ::avro::LogicalType& logical_type);
140+
std::string ToString(const ::avro::LogicalType::Type& logical_type);
141+
142+
/// \brief Check if an Avro node has a map logical type.
143+
/// \param node The Avro node to check.
144+
/// \return True if the node has a map logical type, false otherwise.
145+
bool HasMapLogicalType(const ::avro::NodePtr& node);
146+
138147
} // namespace iceberg::avro

src/iceberg/schema_util_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
namespace iceberg {
2727

2828
// Fix `from` field of `FieldProjection` to use pruned field index.
29-
void PruneFieldProjection(FieldProjection& field_projection) {
29+
inline void PruneFieldProjection(FieldProjection& field_projection) {
3030
std::map<size_t, size_t> local_index_to_pruned_index;
3131
for (const auto& child_projection : field_projection.children) {
3232
if (child_projection.kind == FieldProjection::Kind::kProjected) {

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ add_test(NAME util_test COMMAND util_test)
7878

7979
if(ICEBERG_BUILD_BUNDLE)
8080
add_executable(avro_test)
81-
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
81+
target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc
82+
avro_stream_test.cc)
8283
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
8384
GTest::gmock)
8485
add_test(NAME avro_test COMMAND avro_test)

0 commit comments

Comments
 (0)