Skip to content

Commit cbf7f6e

Browse files
committed
feat: add support for avro to arrow data conversion
1 parent 3cf5963 commit cbf7f6e

File tree

8 files changed

+517
-23
lines changed

8 files changed

+517
-23
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ function(resolve_arrow_dependency)
7171
set(ARROW_FILESYSTEM
7272
ON
7373
CACHE BOOL "" FORCE)
74+
set(ARROW_JSON
75+
ON
76+
CACHE BOOL "" FORCE)
7477
set(ARROW_PARQUET
7578
ON
7679
CACHE BOOL "" FORCE)
@@ -95,8 +98,8 @@ function(resolve_arrow_dependency)
9598

9699
fetchcontent_declare(VendoredArrow
97100
${FC_DECLARE_COMMON_OPTIONS}
98-
GIT_REPOSITORY https://github.com/wgtmac/arrow.git
99-
GIT_TAG 7d50c4ac803ad983734de5f418b7cd18f25b0dc9
101+
GIT_REPOSITORY https://github.com/apache/arrow.git
102+
GIT_TAG 5f0aeb5de53fb25b59a52661a80071faef99a4a4
100103
#URL ${ARROW_SOURCE_URL}
101104
#URL_HASH "SHA256=${ICEBERG_ARROW_BUILD_SHA256_CHECKSUM}"
102105
SOURCE_SUBDIR

src/iceberg/avro/avro_data_util.cc

Lines changed: 369 additions & 2 deletions
Large diffs are not rendered by default.

src/iceberg/avro/avro_data_util_internal.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,21 @@
2626

2727
namespace iceberg::avro {
2828

29+
/// \brief Append an Avro datum to an Arrow array builder.
30+
///
31+
/// This function handles schema evolution by using the provided projection to map
32+
/// fields from the Avro data to the expected Arrow schema.
33+
///
34+
/// \param avro_node The Avro schema node (must be a record at root level)
35+
/// \param avro_datum The Avro data to append
36+
/// \param projection Schema projection from `projected_schema` to `avro_node`
37+
/// \param projected_schema The projected schema
38+
/// \param array_builder The Arrow array builder to append to (must be a struct builder)
39+
/// \return Status indicating success or failure
2940
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
3041
const ::avro::GenericDatum& avro_datum,
3142
const SchemaProjection& projection,
32-
const Schema& arrow_schema,
43+
const Schema& projected_schema,
3344
::arrow::ArrayBuilder* array_builder);
3445

3546
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util.cc

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
7373

7474
} // namespace
7575

76+
std::string ToString(const ::avro::NodePtr& node) {
77+
std::stringstream ss;
78+
ss << *node;
79+
return ss.str();
80+
}
81+
82+
std::string ToString(const ::avro::LogicalType& logical_type) {
83+
std::stringstream ss;
84+
logical_type.printJson(ss);
85+
return ss.str();
86+
}
87+
88+
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
89+
return ToString(::avro::LogicalType(logical_type));
90+
}
91+
7692
Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) {
7793
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL);
7894
return {};
@@ -383,22 +399,6 @@ Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.r
383399

384400
namespace {
385401

386-
std::string ToString(const ::avro::NodePtr& node) {
387-
std::stringstream ss;
388-
ss << *node;
389-
return ss.str();
390-
}
391-
392-
std::string ToString(const ::avro::LogicalType& logical_type) {
393-
std::stringstream ss;
394-
logical_type.printJson(ss);
395-
return ss.str();
396-
}
397-
398-
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
399-
return ToString(::avro::LogicalType(logical_type));
400-
}
401-
402402
bool HasLogicalType(const ::avro::NodePtr& node,
403403
::avro::LogicalType::Type expected_type) {
404404
return node->logicalType().type() == expected_type;

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,8 @@ class HasIdVisitor {
135135
Result<SchemaProjection> Project(const Schema& expected_schema,
136136
const ::avro::NodePtr& avro_node, bool prune_source);
137137

138+
std::string ToString(const ::avro::NodePtr& node);
139+
std::string ToString(const ::avro::LogicalType& logical_type);
140+
std::string ToString(const ::avro::LogicalType::Type& logical_type);
141+
138142
} // namespace iceberg::avro

src/iceberg/schema_util_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
namespace iceberg {
2727

2828
// Fix `from` field of `FieldProjection` to use pruned field index.
29-
void PruneFieldProjection(FieldProjection& field_projection) {
29+
static void PruneFieldProjection(FieldProjection& field_projection) {
3030
std::map<size_t, size_t> local_index_to_pruned_index;
3131
for (const auto& child_projection : field_projection.children) {
3232
if (child_projection.kind == FieldProjection::Kind::kProjected) {

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ add_test(NAME util_test COMMAND util_test)
7171

7272
if(ICEBERG_BUILD_BUNDLE)
7373
add_executable(avro_test)
74-
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
74+
target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc
75+
avro_stream_test.cc)
7576
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
7677
GTest::gmock)
7778
add_test(NAME avro_test COMMAND avro_test)

test/avro_data_test.cc

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/c/bridge.h>
21+
#include <arrow/json/from_string.h>
22+
#include <avro/Compiler.hh>
23+
#include <avro/Generic.hh>
24+
#include <avro/Node.hh>
25+
#include <avro/Types.hh>
26+
#include <gmock/gmock.h>
27+
#include <gtest/gtest.h>
28+
#include <iceberg/arrow/arrow_error_transform_internal.h>
29+
#include <iceberg/avro/avro_data_util_internal.h>
30+
#include <iceberg/avro/avro_schema_util_internal.h>
31+
#include <iceberg/schema.h>
32+
#include <iceberg/schema_internal.h>
33+
#include <iceberg/schema_util.h>
34+
#include <iceberg/type.h>
35+
#include <iceberg/util/macros.h>
36+
37+
#include "matchers.h"
38+
39+
namespace iceberg::avro {
40+
41+
void VerifyAppendDatumToBuilder(const Schema& iceberg_schema,
42+
const ::avro::NodePtr& avro_node,
43+
const std::vector<::avro::GenericDatum>& avro_data,
44+
std::string_view expected_array_json) {
45+
// Create 1 to 1 projection
46+
auto projection_result = Project(iceberg_schema, avro_node, /*prune_source=*/false);
47+
ASSERT_THAT(projection_result, IsOk());
48+
auto projection = std::move(projection_result.value());
49+
50+
// Create arrow schema and array builder
51+
ArrowSchema arrow_c_schema;
52+
ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk());
53+
auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie();
54+
auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields());
55+
auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie();
56+
57+
// Call AppendDatumToBuilder repeatedly to append the datum
58+
for (const auto& avro_datum : avro_data) {
59+
ASSERT_THAT(AppendDatumToBuilder(avro_node, avro_datum, projection, iceberg_schema,
60+
builder.get()),
61+
IsOk());
62+
}
63+
64+
// Verify the result
65+
auto array = builder->Finish().ValueOrDie();
66+
auto expected_array =
67+
::arrow::json::ArrayFromJSONString(arrow_struct_type, expected_array_json)
68+
.ValueOrDie();
69+
ASSERT_TRUE(array->Equals(*expected_array));
70+
}
71+
72+
TEST(AvroDataTest, AppendIntDatumToBuilder) {
73+
Schema iceberg_schema(
74+
{SchemaField::MakeRequired(/*field_id=*/1, "a", std::make_shared<IntType>())});
75+
::avro::NodePtr avro_node;
76+
EXPECT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk());
77+
78+
std::vector<::avro::GenericDatum> avro_data;
79+
for (int i = 0; i < 3; ++i) {
80+
::avro::GenericDatum avro_datum(avro_node);
81+
avro_datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i;
82+
avro_data.push_back(avro_datum);
83+
}
84+
85+
ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(
86+
iceberg_schema, avro_node, avro_data, R"([{"a": 0}, {"a": 1}, {"a": 2}])"));
87+
}
88+
89+
TEST(AvroDataTest, AppendStringDatumToBuilder) {
90+
Schema iceberg_schema(
91+
{SchemaField::MakeRequired(/*field_id=*/1, "a", std::make_shared<StringType>())});
92+
::avro::NodePtr avro_node;
93+
EXPECT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk());
94+
95+
std::vector<::avro::GenericDatum> avro_data;
96+
for (int i = 0; i < 3; ++i) {
97+
::avro::GenericDatum avro_datum(avro_node);
98+
avro_datum.value<::avro::GenericRecord>().fieldAt(0).value<std::string>() =
99+
"a" + std::to_string(i);
100+
avro_data.push_back(avro_datum);
101+
}
102+
103+
ASSERT_NO_FATAL_FAILURE(
104+
VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data,
105+
R"([{"a": "a0"}, {"a": "a1"}, {"a": "a2"}])"));
106+
}
107+
108+
} // namespace iceberg::avro

0 commit comments

Comments
 (0)