Skip to content

Commit 02cd5fd

Browse files
committed
feat: add support for avro to arrow data conversion
1 parent 3cf5963 commit 02cd5fd

File tree

8 files changed

+640
-23
lines changed

8 files changed

+640
-23
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ function(resolve_arrow_dependency)
7171
set(ARROW_FILESYSTEM
7272
ON
7373
CACHE BOOL "" FORCE)
74+
set(ARROW_JSON
75+
ON
76+
CACHE BOOL "" FORCE)
7477
set(ARROW_PARQUET
7578
ON
7679
CACHE BOOL "" FORCE)
@@ -95,8 +98,8 @@ function(resolve_arrow_dependency)
9598

9699
fetchcontent_declare(VendoredArrow
97100
${FC_DECLARE_COMMON_OPTIONS}
98-
GIT_REPOSITORY https://github.com/wgtmac/arrow.git
99-
GIT_TAG 7d50c4ac803ad983734de5f418b7cd18f25b0dc9
101+
GIT_REPOSITORY https://github.com/apache/arrow.git
102+
GIT_TAG 5f0aeb5de53fb25b59a52661a80071faef99a4a4
100103
#URL ${ARROW_SOURCE_URL}
101104
#URL_HASH "SHA256=${ICEBERG_ARROW_BUILD_SHA256_CHECKSUM}"
102105
SOURCE_SUBDIR

src/iceberg/avro/avro_data_util.cc

Lines changed: 369 additions & 2 deletions
Large diffs are not rendered by default.

src/iceberg/avro/avro_data_util_internal.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,21 @@
2626

2727
namespace iceberg::avro {
2828

29+
/// \brief Append an Avro datum to an Arrow array builder.
30+
///
31+
/// This function handles schema evolution by using the provided projection to map
32+
/// fields from the Avro data to the expected Arrow schema.
33+
///
34+
/// \param avro_node The Avro schema node (must be a record at root level)
35+
/// \param avro_datum The Avro data to append
36+
/// \param projection Schema projection from `projected_schema` to `avro_node`
37+
/// \param projected_schema The projected schema
38+
/// \param array_builder The Arrow array builder to append to (must be a struct builder)
39+
/// \return Status indicating success or failure
2940
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
3041
const ::avro::GenericDatum& avro_datum,
3142
const SchemaProjection& projection,
32-
const Schema& arrow_schema,
43+
const Schema& projected_schema,
3344
::arrow::ArrayBuilder* array_builder);
3445

3546
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util.cc

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
7373

7474
} // namespace
7575

76+
std::string ToString(const ::avro::NodePtr& node) {
77+
std::stringstream ss;
78+
ss << *node;
79+
return ss.str();
80+
}
81+
82+
std::string ToString(const ::avro::LogicalType& logical_type) {
83+
std::stringstream ss;
84+
logical_type.printJson(ss);
85+
return ss.str();
86+
}
87+
88+
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
89+
return ToString(::avro::LogicalType(logical_type));
90+
}
91+
7692
Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) {
7793
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL);
7894
return {};
@@ -383,22 +399,6 @@ Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.r
383399

384400
namespace {
385401

386-
std::string ToString(const ::avro::NodePtr& node) {
387-
std::stringstream ss;
388-
ss << *node;
389-
return ss.str();
390-
}
391-
392-
std::string ToString(const ::avro::LogicalType& logical_type) {
393-
std::stringstream ss;
394-
logical_type.printJson(ss);
395-
return ss.str();
396-
}
397-
398-
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
399-
return ToString(::avro::LogicalType(logical_type));
400-
}
401-
402402
bool HasLogicalType(const ::avro::NodePtr& node,
403403
::avro::LogicalType::Type expected_type) {
404404
return node->logicalType().type() == expected_type;

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,8 @@ class HasIdVisitor {
135135
Result<SchemaProjection> Project(const Schema& expected_schema,
136136
const ::avro::NodePtr& avro_node, bool prune_source);
137137

138+
std::string ToString(const ::avro::NodePtr& node);
139+
std::string ToString(const ::avro::LogicalType& logical_type);
140+
std::string ToString(const ::avro::LogicalType::Type& logical_type);
141+
138142
} // namespace iceberg::avro

src/iceberg/schema_util_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
namespace iceberg {
2727

2828
// Fix `from` field of `FieldProjection` to use pruned field index.
29-
void PruneFieldProjection(FieldProjection& field_projection) {
29+
inline void PruneFieldProjection(FieldProjection& field_projection) {
3030
std::map<size_t, size_t> local_index_to_pruned_index;
3131
for (const auto& child_projection : field_projection.children) {
3232
if (child_projection.kind == FieldProjection::Kind::kProjected) {

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ add_test(NAME util_test COMMAND util_test)
7171

7272
if(ICEBERG_BUILD_BUNDLE)
7373
add_executable(avro_test)
74-
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
74+
target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc
75+
avro_stream_test.cc)
7576
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
7677
GTest::gmock)
7778
add_test(NAME avro_test COMMAND avro_test)

test/avro_data_test.cc

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
/// \file avro_data_test.cc
21+
/// Comprehensive test suite for AppendDatumToBuilder function.
22+
///
23+
/// This test suite covers:
24+
/// - All basic primitive types (Boolean, Int, Long, Float, Double, String)
25+
/// - Schema evolution (int->long, float->double promotions)
26+
/// - Multiple field records
27+
/// - Parameterized tests for code reuse and maintainability
28+
///
29+
/// The tests use Arrow's JSON parsing for expected results comparison,
30+
/// ensuring accurate validation of the conversion from Avro to Arrow format.
31+
32+
#include <arrow/c/bridge.h>
33+
#include <arrow/json/from_string.h>
34+
#include <avro/Compiler.hh>
35+
#include <avro/Generic.hh>
36+
#include <avro/Node.hh>
37+
#include <avro/Types.hh>
38+
#include <gmock/gmock.h>
39+
#include <gtest/gtest.h>
40+
#include <iceberg/arrow/arrow_error_transform_internal.h>
41+
#include <iceberg/avro/avro_data_util_internal.h>
42+
#include <iceberg/avro/avro_schema_util_internal.h>
43+
#include <iceberg/schema.h>
44+
#include <iceberg/schema_internal.h>
45+
#include <iceberg/schema_util.h>
46+
#include <iceberg/type.h>
47+
#include <iceberg/util/macros.h>
48+
49+
#include "matchers.h"
50+
51+
namespace iceberg::avro {
52+
53+
/// \brief Test case structure for parameterized primitive type tests
54+
struct AppendDatumParam {
55+
std::string name;
56+
std::shared_ptr<Type> projected_type;
57+
std::shared_ptr<Type> source_type;
58+
std::function<void(::avro::GenericDatum&, int)> value_setter;
59+
std::string expected_json;
60+
};
61+
62+
/// \brief Helper function to create test data for a primitive type
63+
std::vector<::avro::GenericDatum> CreateTestData(
64+
const ::avro::NodePtr& avro_node,
65+
const std::function<void(::avro::GenericDatum&, int)>& value_setter, int count = 3) {
66+
std::vector<::avro::GenericDatum> avro_data;
67+
for (int i = 0; i < count; ++i) {
68+
::avro::GenericDatum avro_datum(avro_node);
69+
value_setter(avro_datum, i);
70+
avro_data.push_back(avro_datum);
71+
}
72+
return avro_data;
73+
}
74+
75+
/// \brief Utility function to verify AppendDatumToBuilder behavior
76+
void VerifyAppendDatumToBuilder(const Schema& projected_schema,
77+
const ::avro::NodePtr& avro_node,
78+
const std::vector<::avro::GenericDatum>& avro_data,
79+
std::string_view expected_array_json) {
80+
// Create 1 to 1 projection
81+
auto projection_result = Project(projected_schema, avro_node, /*prune_source=*/false);
82+
ASSERT_THAT(projection_result, IsOk());
83+
auto projection = std::move(projection_result.value());
84+
85+
// Create arrow schema and array builder
86+
ArrowSchema arrow_c_schema;
87+
ASSERT_THAT(ToArrowSchema(projected_schema, &arrow_c_schema), IsOk());
88+
auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie();
89+
auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields());
90+
auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie();
91+
92+
// Call AppendDatumToBuilder repeatedly to append the datum
93+
for (const auto& avro_datum : avro_data) {
94+
ASSERT_THAT(AppendDatumToBuilder(avro_node, avro_datum, projection, projected_schema,
95+
builder.get()),
96+
IsOk());
97+
}
98+
99+
// Verify the result
100+
auto array = builder->Finish().ValueOrDie();
101+
auto expected_array =
102+
::arrow::json::ArrayFromJSONString(arrow_struct_type, expected_array_json)
103+
.ValueOrDie();
104+
ASSERT_TRUE(array->Equals(*expected_array));
105+
}
106+
107+
/// \brief Test class for primitive types using parameterized tests
108+
class AppendDatumToBuilderTest : public ::testing::TestWithParam<AppendDatumParam> {};
109+
110+
TEST_P(AppendDatumToBuilderTest, PrimitiveType) {
111+
const auto& test_case = GetParam();
112+
113+
Schema projected_schema({SchemaField::MakeRequired(
114+
/*field_id=*/1, /*name=*/"a", test_case.projected_type)});
115+
Schema source_schema({SchemaField::MakeRequired(
116+
/*field_id=*/1, /*name=*/"a", test_case.source_type)});
117+
118+
::avro::NodePtr avro_node;
119+
EXPECT_THAT(ToAvroNodeVisitor{}.Visit(source_schema, &avro_node), IsOk());
120+
121+
auto avro_data = CreateTestData(avro_node, test_case.value_setter);
122+
ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(projected_schema, avro_node,
123+
avro_data, test_case.expected_json));
124+
}
125+
126+
// Define test cases for all primitive types
127+
const std::vector<AppendDatumParam> kPrimitiveTestCases = {
128+
{
129+
.name = "Boolean",
130+
.projected_type = std::make_shared<BooleanType>(),
131+
.source_type = std::make_shared<BooleanType>(),
132+
.value_setter =
133+
[](::avro::GenericDatum& datum, int i) {
134+
datum.value<::avro::GenericRecord>().fieldAt(0).value<bool>() =
135+
(i % 2 == 0);
136+
},
137+
.expected_json = R"([{"a": true}, {"a": false}, {"a": true}])",
138+
},
139+
{
140+
.name = "Int",
141+
.projected_type = std::make_shared<IntType>(),
142+
.source_type = std::make_shared<IntType>(),
143+
.value_setter =
144+
[](::avro::GenericDatum& datum, int i) {
145+
datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i * 100;
146+
},
147+
.expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
148+
},
149+
{
150+
.name = "Long",
151+
.projected_type = std::make_shared<LongType>(),
152+
.source_type = std::make_shared<LongType>(),
153+
.value_setter =
154+
[](::avro::GenericDatum& datum, int i) {
155+
datum.value<::avro::GenericRecord>().fieldAt(0).value<int64_t>() =
156+
i * 1000000LL;
157+
},
158+
.expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
159+
},
160+
{
161+
.name = "Float",
162+
.projected_type = std::make_shared<FloatType>(),
163+
.source_type = std::make_shared<FloatType>(),
164+
.value_setter =
165+
[](::avro::GenericDatum& datum, int i) {
166+
datum.value<::avro::GenericRecord>().fieldAt(0).value<float>() = i * 3.14f;
167+
},
168+
.expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])",
169+
},
170+
{
171+
.name = "Double",
172+
.projected_type = std::make_shared<DoubleType>(),
173+
.source_type = std::make_shared<DoubleType>(),
174+
.value_setter =
175+
[](::avro::GenericDatum& datum, int i) {
176+
datum.value<::avro::GenericRecord>().fieldAt(0).value<double>() =
177+
i * 1.234567890;
178+
},
179+
.expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])",
180+
},
181+
{
182+
.name = "String",
183+
.projected_type = std::make_shared<StringType>(),
184+
.source_type = std::make_shared<StringType>(),
185+
.value_setter =
186+
[](::avro::GenericDatum& datum, int i) {
187+
datum.value<::avro::GenericRecord>().fieldAt(0).value<std::string>() =
188+
"test_string_" + std::to_string(i);
189+
},
190+
.expected_json =
191+
R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])",
192+
},
193+
{
194+
.name = "IntToLongPromotion",
195+
.projected_type = std::make_shared<LongType>(),
196+
.source_type = std::make_shared<IntType>(),
197+
.value_setter =
198+
[](::avro::GenericDatum& datum, int i) {
199+
datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i * 100;
200+
},
201+
.expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
202+
},
203+
// TODO(gangwu): add test cases for other types
204+
};
205+
206+
INSTANTIATE_TEST_SUITE_P(AllPrimitiveTypes, AppendDatumToBuilderTest,
207+
::testing::ValuesIn(kPrimitiveTestCases),
208+
[](const ::testing::TestParamInfo<AppendDatumParam>& info) {
209+
return info.param.name;
210+
});
211+
212+
TEST(AppendDatumToBuilderTest, TwoFieldsRecord) {
213+
Schema iceberg_schema({
214+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
215+
SchemaField::MakeRequired(2, "name", std::make_shared<StringType>()),
216+
});
217+
::avro::NodePtr avro_node;
218+
ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk());
219+
220+
std::vector<::avro::GenericDatum> avro_data;
221+
::avro::GenericDatum avro_datum(avro_node);
222+
auto& record = avro_datum.value<::avro::GenericRecord>();
223+
record.fieldAt(0).value<int32_t>() = 42;
224+
record.fieldAt(1).value<std::string>() = "test";
225+
avro_data.push_back(avro_datum);
226+
227+
ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data,
228+
R"([{"id": 42, "name": "test"}])"));
229+
}
230+
231+
} // namespace iceberg::avro

0 commit comments

Comments
 (0)