Skip to content

Commit 90bbfd8

Browse files
committed
feat: add support for avro to arrow data conversion
1 parent 3cf5963 commit 90bbfd8

File tree

8 files changed

+628
-23
lines changed

8 files changed

+628
-23
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ function(resolve_arrow_dependency)
7171
set(ARROW_FILESYSTEM
7272
ON
7373
CACHE BOOL "" FORCE)
74+
set(ARROW_JSON
75+
ON
76+
CACHE BOOL "" FORCE)
7477
set(ARROW_PARQUET
7578
ON
7679
CACHE BOOL "" FORCE)
@@ -95,8 +98,8 @@ function(resolve_arrow_dependency)
9598

9699
fetchcontent_declare(VendoredArrow
97100
${FC_DECLARE_COMMON_OPTIONS}
98-
GIT_REPOSITORY https://github.com/wgtmac/arrow.git
99-
GIT_TAG 7d50c4ac803ad983734de5f418b7cd18f25b0dc9
101+
GIT_REPOSITORY https://github.com/apache/arrow.git
102+
GIT_TAG 5f0aeb5de53fb25b59a52661a80071faef99a4a4
100103
#URL ${ARROW_SOURCE_URL}
101104
#URL_HASH "SHA256=${ICEBERG_ARROW_BUILD_SHA256_CHECKSUM}"
102105
SOURCE_SUBDIR

src/iceberg/avro/avro_data_util.cc

Lines changed: 369 additions & 2 deletions
Large diffs are not rendered by default.

src/iceberg/avro/avro_data_util_internal.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,21 @@
2626

2727
namespace iceberg::avro {
2828

29+
/// \brief Append an Avro datum to an Arrow array builder.
30+
///
31+
/// This function handles schema evolution by using the provided projection to map
32+
/// fields from the Avro data to the expected Arrow schema.
33+
///
34+
/// \param avro_node The Avro schema node (must be a record at root level)
35+
/// \param avro_datum The Avro data to append
36+
/// \param projection Schema projection from `projected_schema` to `avro_node`
37+
/// \param projected_schema The projected schema
38+
/// \param array_builder The Arrow array builder to append to (must be a struct builder)
39+
/// \return Status indicating success or failure
2940
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
3041
const ::avro::GenericDatum& avro_datum,
3142
const SchemaProjection& projection,
32-
const Schema& arrow_schema,
43+
const Schema& projected_schema,
3344
::arrow::ArrayBuilder* array_builder);
3445

3546
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util.cc

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
7373

7474
} // namespace
7575

76+
std::string ToString(const ::avro::NodePtr& node) {
77+
std::stringstream ss;
78+
ss << *node;
79+
return ss.str();
80+
}
81+
82+
std::string ToString(const ::avro::LogicalType& logical_type) {
83+
std::stringstream ss;
84+
logical_type.printJson(ss);
85+
return ss.str();
86+
}
87+
88+
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
89+
return ToString(::avro::LogicalType(logical_type));
90+
}
91+
7692
Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) {
7793
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL);
7894
return {};
@@ -383,22 +399,6 @@ Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.r
383399

384400
namespace {
385401

386-
std::string ToString(const ::avro::NodePtr& node) {
387-
std::stringstream ss;
388-
ss << *node;
389-
return ss.str();
390-
}
391-
392-
std::string ToString(const ::avro::LogicalType& logical_type) {
393-
std::stringstream ss;
394-
logical_type.printJson(ss);
395-
return ss.str();
396-
}
397-
398-
std::string ToString(const ::avro::LogicalType::Type& logical_type) {
399-
return ToString(::avro::LogicalType(logical_type));
400-
}
401-
402402
bool HasLogicalType(const ::avro::NodePtr& node,
403403
::avro::LogicalType::Type expected_type) {
404404
return node->logicalType().type() == expected_type;

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,8 @@ class HasIdVisitor {
135135
Result<SchemaProjection> Project(const Schema& expected_schema,
136136
const ::avro::NodePtr& avro_node, bool prune_source);
137137

138+
std::string ToString(const ::avro::NodePtr& node);
139+
std::string ToString(const ::avro::LogicalType& logical_type);
140+
std::string ToString(const ::avro::LogicalType::Type& logical_type);
141+
138142
} // namespace iceberg::avro

src/iceberg/schema_util_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
namespace iceberg {
2727

2828
// Fix `from` field of `FieldProjection` to use pruned field index.
29-
void PruneFieldProjection(FieldProjection& field_projection) {
29+
inline void PruneFieldProjection(FieldProjection& field_projection) {
3030
std::map<size_t, size_t> local_index_to_pruned_index;
3131
for (const auto& child_projection : field_projection.children) {
3232
if (child_projection.kind == FieldProjection::Kind::kProjected) {

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ add_test(NAME util_test COMMAND util_test)
7171

7272
if(ICEBERG_BUILD_BUNDLE)
7373
add_executable(avro_test)
74-
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
74+
target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc
75+
avro_stream_test.cc)
7576
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
7677
GTest::gmock)
7778
add_test(NAME avro_test COMMAND avro_test)

test/avro_data_test.cc

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/c/bridge.h>
21+
#include <arrow/json/from_string.h>
22+
#include <avro/Compiler.hh>
23+
#include <avro/Generic.hh>
24+
#include <avro/Node.hh>
25+
#include <avro/Types.hh>
26+
#include <gmock/gmock.h>
27+
#include <gtest/gtest.h>
28+
#include <iceberg/arrow/arrow_error_transform_internal.h>
29+
#include <iceberg/avro/avro_data_util_internal.h>
30+
#include <iceberg/avro/avro_schema_util_internal.h>
31+
#include <iceberg/schema.h>
32+
#include <iceberg/schema_internal.h>
33+
#include <iceberg/schema_util.h>
34+
#include <iceberg/type.h>
35+
#include <iceberg/util/macros.h>
36+
37+
#include "matchers.h"
38+
39+
namespace iceberg::avro {
40+
41+
/// \brief Test case structure for parameterized primitive type tests
42+
struct AppendDatumParam {
43+
std::string name;
44+
std::shared_ptr<Type> projected_type;
45+
std::shared_ptr<Type> source_type;
46+
std::function<void(::avro::GenericDatum&, int)> value_setter;
47+
std::string expected_json;
48+
};
49+
50+
/// \brief Helper function to create test data for a primitive type
51+
std::vector<::avro::GenericDatum> CreateTestData(
52+
const ::avro::NodePtr& avro_node,
53+
const std::function<void(::avro::GenericDatum&, int)>& value_setter, int count = 3) {
54+
std::vector<::avro::GenericDatum> avro_data;
55+
for (int i = 0; i < count; ++i) {
56+
::avro::GenericDatum avro_datum(avro_node);
57+
value_setter(avro_datum, i);
58+
avro_data.push_back(avro_datum);
59+
}
60+
return avro_data;
61+
}
62+
63+
/// \brief Utility function to verify AppendDatumToBuilder behavior
64+
void VerifyAppendDatumToBuilder(const Schema& projected_schema,
65+
const ::avro::NodePtr& avro_node,
66+
const std::vector<::avro::GenericDatum>& avro_data,
67+
std::string_view expected_array_json) {
68+
// Create 1 to 1 projection
69+
auto projection_result = Project(projected_schema, avro_node, /*prune_source=*/false);
70+
ASSERT_THAT(projection_result, IsOk());
71+
auto projection = std::move(projection_result.value());
72+
73+
// Create arrow schema and array builder
74+
ArrowSchema arrow_c_schema;
75+
ASSERT_THAT(ToArrowSchema(projected_schema, &arrow_c_schema), IsOk());
76+
auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie();
77+
auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields());
78+
auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie();
79+
80+
// Call AppendDatumToBuilder repeatedly to append the datum
81+
for (const auto& avro_datum : avro_data) {
82+
ASSERT_THAT(AppendDatumToBuilder(avro_node, avro_datum, projection, projected_schema,
83+
builder.get()),
84+
IsOk());
85+
}
86+
87+
// Verify the result
88+
auto array = builder->Finish().ValueOrDie();
89+
auto expected_array =
90+
::arrow::json::ArrayFromJSONString(arrow_struct_type, expected_array_json)
91+
.ValueOrDie();
92+
ASSERT_TRUE(array->Equals(*expected_array));
93+
}
94+
95+
/// \brief Test class for primitive types using parameterized tests
96+
class AppendDatumToBuilderTest : public ::testing::TestWithParam<AppendDatumParam> {};
97+
98+
TEST_P(AppendDatumToBuilderTest, PrimitiveType) {
99+
const auto& test_case = GetParam();
100+
101+
Schema projected_schema({SchemaField::MakeRequired(
102+
/*field_id=*/1, /*name=*/"a", test_case.projected_type)});
103+
Schema source_schema({SchemaField::MakeRequired(
104+
/*field_id=*/1, /*name=*/"a", test_case.source_type)});
105+
106+
::avro::NodePtr avro_node;
107+
EXPECT_THAT(ToAvroNodeVisitor{}.Visit(source_schema, &avro_node), IsOk());
108+
109+
auto avro_data = CreateTestData(avro_node, test_case.value_setter);
110+
ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(projected_schema, avro_node,
111+
avro_data, test_case.expected_json));
112+
}
113+
114+
// Define test cases for all primitive types
115+
const std::vector<AppendDatumParam> kPrimitiveTestCases = {
116+
{
117+
.name = "Boolean",
118+
.projected_type = std::make_shared<BooleanType>(),
119+
.source_type = std::make_shared<BooleanType>(),
120+
.value_setter =
121+
[](::avro::GenericDatum& datum, int i) {
122+
datum.value<::avro::GenericRecord>().fieldAt(0).value<bool>() =
123+
(i % 2 == 0);
124+
},
125+
.expected_json = R"([{"a": true}, {"a": false}, {"a": true}])",
126+
},
127+
{
128+
.name = "Int",
129+
.projected_type = std::make_shared<IntType>(),
130+
.source_type = std::make_shared<IntType>(),
131+
.value_setter =
132+
[](::avro::GenericDatum& datum, int i) {
133+
datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i * 100;
134+
},
135+
.expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
136+
},
137+
{
138+
.name = "Long",
139+
.projected_type = std::make_shared<LongType>(),
140+
.source_type = std::make_shared<LongType>(),
141+
.value_setter =
142+
[](::avro::GenericDatum& datum, int i) {
143+
datum.value<::avro::GenericRecord>().fieldAt(0).value<int64_t>() =
144+
i * 1000000LL;
145+
},
146+
.expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
147+
},
148+
{
149+
.name = "Float",
150+
.projected_type = std::make_shared<FloatType>(),
151+
.source_type = std::make_shared<FloatType>(),
152+
.value_setter =
153+
[](::avro::GenericDatum& datum, int i) {
154+
datum.value<::avro::GenericRecord>().fieldAt(0).value<float>() = i * 3.14f;
155+
},
156+
.expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])",
157+
},
158+
{
159+
.name = "Double",
160+
.projected_type = std::make_shared<DoubleType>(),
161+
.source_type = std::make_shared<DoubleType>(),
162+
.value_setter =
163+
[](::avro::GenericDatum& datum, int i) {
164+
datum.value<::avro::GenericRecord>().fieldAt(0).value<double>() =
165+
i * 1.234567890;
166+
},
167+
.expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])",
168+
},
169+
{
170+
.name = "String",
171+
.projected_type = std::make_shared<StringType>(),
172+
.source_type = std::make_shared<StringType>(),
173+
.value_setter =
174+
[](::avro::GenericDatum& datum, int i) {
175+
datum.value<::avro::GenericRecord>().fieldAt(0).value<std::string>() =
176+
"test_string_" + std::to_string(i);
177+
},
178+
.expected_json =
179+
R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])",
180+
},
181+
{
182+
.name = "IntToLongPromotion",
183+
.projected_type = std::make_shared<LongType>(),
184+
.source_type = std::make_shared<IntType>(),
185+
.value_setter =
186+
[](::avro::GenericDatum& datum, int i) {
187+
datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i * 100;
188+
},
189+
.expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
190+
},
191+
// TODO(gangwu): add test cases for other types
192+
};
193+
194+
INSTANTIATE_TEST_SUITE_P(AllPrimitiveTypes, AppendDatumToBuilderTest,
195+
::testing::ValuesIn(kPrimitiveTestCases),
196+
[](const ::testing::TestParamInfo<AppendDatumParam>& info) {
197+
return info.param.name;
198+
});
199+
200+
TEST(AppendDatumToBuilderTest, TwoFieldsRecord) {
201+
Schema iceberg_schema({
202+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
203+
SchemaField::MakeRequired(2, "name", std::make_shared<StringType>()),
204+
});
205+
::avro::NodePtr avro_node;
206+
ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk());
207+
208+
std::vector<::avro::GenericDatum> avro_data;
209+
::avro::GenericDatum avro_datum(avro_node);
210+
auto& record = avro_datum.value<::avro::GenericRecord>();
211+
record.fieldAt(0).value<int32_t>() = 42;
212+
record.fieldAt(1).value<std::string>() = "test";
213+
avro_data.push_back(avro_datum);
214+
215+
ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data,
216+
R"([{"id": 42, "name": "test"}])"));
217+
}
218+
219+
} // namespace iceberg::avro

0 commit comments

Comments
 (0)