Skip to content

Commit e8a4050

Browse files
authored
feat: add schema conversion to avro schema (#100)
1 parent 29b887d commit e8a4050

File tree

5 files changed

+651
-2
lines changed

5 files changed

+651
-2
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
8080

8181
if(ICEBERG_BUILD_BUNDLE)
8282
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
83-
avro/demo_avro.cc)
83+
avro/demo_avro.cc avro/avro_schema_util.cc)
8484

8585
# Libraries to link with exported libiceberg_bundle.{so,a}.
8686
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <format>
21+
#include <mutex>
22+
#include <string_view>
23+
24+
#include <arrow/type.h>
25+
#include <arrow/util/decimal.h>
26+
#include <avro/CustomAttributes.hh>
27+
#include <avro/LogicalType.hh>
28+
#include <avro/NodeImpl.hh>
29+
#include <avro/Types.hh>
30+
31+
#include "iceberg/avro/avro_schema_util_internal.h"
32+
#include "iceberg/util/macros.h"
33+
#include "iceberg/util/visit_type.h"
34+
35+
namespace iceberg::avro {
36+
37+
namespace {
38+
39+
constexpr std::string_view kIcebergFieldNameProp = "iceberg-field-name";
40+
constexpr std::string_view kFieldIdProp = "field-id";
41+
constexpr std::string_view kKeyIdProp = "key-id";
42+
constexpr std::string_view kValueIdProp = "value-id";
43+
constexpr std::string_view kElementIdProp = "element-id";
44+
constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc";
45+
46+
struct MapLogicalType : public ::avro::CustomLogicalType {
47+
MapLogicalType() : ::avro::CustomLogicalType("map") {}
48+
};
49+
50+
::avro::LogicalType GetMapLogicalType() {
51+
static std::once_flag flag{};
52+
std::call_once(flag, []() {
53+
// Register the map logical type with the avro custom logical type registry.
54+
// See https://github.com/apache/avro/pull/3326 for details.
55+
::avro::CustomLogicalTypeRegistry::instance().registerType(
56+
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
57+
});
58+
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
59+
}
60+
61+
::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
62+
::avro::CustomAttributes attributes;
63+
attributes.addAttribute(std::string(kFieldIdProp), std::to_string(field_id),
64+
/*addQuotes=*/false);
65+
return attributes;
66+
}
67+
68+
} // namespace
69+
70+
Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) {
71+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL);
72+
return {};
73+
}
74+
75+
Status ToAvroNodeVisitor::Visit(const IntType& type, ::avro::NodePtr* node) {
76+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_INT);
77+
return {};
78+
}
79+
80+
Status ToAvroNodeVisitor::Visit(const LongType& type, ::avro::NodePtr* node) {
81+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
82+
return {};
83+
}
84+
85+
Status ToAvroNodeVisitor::Visit(const FloatType& type, ::avro::NodePtr* node) {
86+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_FLOAT);
87+
return {};
88+
}
89+
90+
Status ToAvroNodeVisitor::Visit(const DoubleType& type, ::avro::NodePtr* node) {
91+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_DOUBLE);
92+
return {};
93+
}
94+
95+
Status ToAvroNodeVisitor::Visit(const DecimalType& type, ::avro::NodePtr* node) {
96+
*node = std::make_shared<::avro::NodeFixed>();
97+
(*node)->setName(
98+
::avro::Name(std::format("decimal_{}_{}", type.precision(), type.scale())));
99+
(*node)->setFixedSize(::arrow::DecimalType::DecimalSize(type.precision()));
100+
101+
::avro::LogicalType logical_type(::avro::LogicalType::DECIMAL);
102+
logical_type.setPrecision(type.precision());
103+
logical_type.setScale(type.scale());
104+
(*node)->setLogicalType(logical_type);
105+
106+
return {};
107+
}
108+
109+
Status ToAvroNodeVisitor::Visit(const DateType& type, ::avro::NodePtr* node) {
110+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_INT);
111+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::DATE});
112+
return {};
113+
}
114+
115+
Status ToAvroNodeVisitor::Visit(const TimeType& type, ::avro::NodePtr* node) {
116+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
117+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIME_MICROS});
118+
return {};
119+
}
120+
121+
Status ToAvroNodeVisitor::Visit(const TimestampType& type, ::avro::NodePtr* node) {
122+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
123+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_MICROS});
124+
::avro::CustomAttributes attributes;
125+
attributes.addAttribute(std::string(kAdjustToUtcProp), "false", /*addQuotes=*/false);
126+
(*node)->addCustomAttributesForField(attributes);
127+
return {};
128+
}
129+
130+
Status ToAvroNodeVisitor::Visit(const TimestampTzType& type, ::avro::NodePtr* node) {
131+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
132+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_MICROS});
133+
::avro::CustomAttributes attributes;
134+
attributes.addAttribute(std::string(kAdjustToUtcProp), "true", /*addQuotes=*/false);
135+
(*node)->addCustomAttributesForField(attributes);
136+
return {};
137+
}
138+
139+
Status ToAvroNodeVisitor::Visit(const StringType& type, ::avro::NodePtr* node) {
140+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_STRING);
141+
return {};
142+
}
143+
144+
Status ToAvroNodeVisitor::Visit(const UuidType& type, ::avro::NodePtr* node) {
145+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_FIXED);
146+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::UUID});
147+
(*node)->setFixedSize(16);
148+
(*node)->setName(::avro::Name("uuid_fixed"));
149+
return {};
150+
}
151+
152+
Status ToAvroNodeVisitor::Visit(const FixedType& type, ::avro::NodePtr* node) {
153+
*node = std::make_shared<::avro::NodeFixed>();
154+
(*node)->setName(::avro::Name(std::format("fixed_{}", type.length())));
155+
(*node)->setFixedSize(type.length());
156+
return {};
157+
}
158+
159+
Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
160+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES);
161+
return {};
162+
}
163+
164+
Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
165+
*node = std::make_shared<::avro::NodeRecord>();
166+
167+
if (field_ids_.empty()) {
168+
(*node)->setName(::avro::Name("iceberg_schema")); // Root node
169+
} else {
170+
(*node)->setName(::avro::Name(std::format("r{}", field_ids_.top())));
171+
}
172+
173+
for (const SchemaField& sub_field : type.fields()) {
174+
::avro::NodePtr field_node;
175+
ICEBERG_RETURN_UNEXPECTED(Visit(sub_field, &field_node));
176+
177+
// TODO(gangwu): sanitize field name
178+
(*node)->addName(std::string(sub_field.name()));
179+
(*node)->addLeaf(field_node);
180+
(*node)->addCustomAttributesForField(GetAttributesWithFieldId(sub_field.field_id()));
181+
}
182+
return {};
183+
}
184+
185+
Status ToAvroNodeVisitor::Visit(const ListType& type, ::avro::NodePtr* node) {
186+
*node = std::make_shared<::avro::NodeArray>();
187+
const auto& element_field = type.fields().back();
188+
189+
::avro::CustomAttributes attributes;
190+
attributes.addAttribute(std::string(kElementIdProp),
191+
std::to_string(element_field.field_id()),
192+
/*addQuotes=*/false);
193+
194+
::avro::NodePtr element_node;
195+
ICEBERG_RETURN_UNEXPECTED(Visit(element_field, &element_node));
196+
197+
(*node)->addCustomAttributesForField(attributes);
198+
(*node)->addLeaf(std::move(element_node));
199+
return {};
200+
}
201+
202+
Status ToAvroNodeVisitor::Visit(const MapType& type, ::avro::NodePtr* node) {
203+
const auto& key_field = type.key();
204+
const auto& value_field = type.value();
205+
206+
if (key_field.optional()) [[unlikely]] {
207+
return InvalidArgument("Map key `{}` must be required", key_field.name());
208+
}
209+
210+
if (key_field.type()->type_id() == TypeId::kString) {
211+
::avro::CustomAttributes attributes;
212+
attributes.addAttribute(std::string(kKeyIdProp), std::to_string(key_field.field_id()),
213+
/*addQuotes=*/false);
214+
attributes.addAttribute(std::string(kValueIdProp),
215+
std::to_string(value_field.field_id()),
216+
/*addQuotes=*/false);
217+
218+
::avro::NodePtr value_node;
219+
ICEBERG_RETURN_UNEXPECTED(Visit(value_field, &value_node));
220+
221+
*node = std::make_shared<::avro::NodeMap>();
222+
(*node)->addLeaf(std::move(value_node));
223+
(*node)->addCustomAttributesForField(attributes);
224+
} else {
225+
auto struct_node = std::make_shared<::avro::NodeRecord>();
226+
struct_node->setName(::avro::Name(
227+
std::format("k{}_v{}", key_field.field_id(), value_field.field_id())));
228+
229+
::avro::NodePtr key_node;
230+
ICEBERG_RETURN_UNEXPECTED(Visit(key_field, &key_node));
231+
struct_node->addLeaf(std::move(key_node));
232+
struct_node->addName("key");
233+
struct_node->addCustomAttributesForField(
234+
GetAttributesWithFieldId(key_field.field_id()));
235+
236+
::avro::NodePtr value_node;
237+
ICEBERG_RETURN_UNEXPECTED(Visit(value_field, &value_node));
238+
struct_node->addLeaf(std::move(value_node));
239+
struct_node->addName("value");
240+
struct_node->addCustomAttributesForField(
241+
GetAttributesWithFieldId(value_field.field_id()));
242+
243+
*node = std::make_shared<::avro::NodeArray>();
244+
(*node)->addLeaf(std::move(struct_node));
245+
(*node)->setLogicalType(GetMapLogicalType());
246+
}
247+
248+
return {};
249+
}
250+
251+
Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node) {
252+
field_ids_.push(field.field_id());
253+
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node));
254+
255+
if (field.optional()) {
256+
::avro::MultiLeaves union_types;
257+
union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL));
258+
union_types.add(std::move(*node));
259+
*node = std::make_shared<::avro::NodeUnion>(union_types);
260+
}
261+
262+
field_ids_.pop();
263+
return {};
264+
}
265+
266+
} // namespace iceberg::avro
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <stack>
23+
24+
#include <avro/Node.hh>
25+
26+
#include "iceberg/result.h"
27+
#include "iceberg/type.h"
28+
29+
namespace iceberg::avro {
30+
31+
/// \brief A visitor that converts an Iceberg type to an Avro node.
32+
class ToAvroNodeVisitor {
33+
public:
34+
Status Visit(const BooleanType& type, ::avro::NodePtr* node);
35+
Status Visit(const IntType& type, ::avro::NodePtr* node);
36+
Status Visit(const LongType& type, ::avro::NodePtr* node);
37+
Status Visit(const FloatType& type, ::avro::NodePtr* node);
38+
Status Visit(const DoubleType& type, ::avro::NodePtr* node);
39+
Status Visit(const DecimalType& type, ::avro::NodePtr* node);
40+
Status Visit(const DateType& type, ::avro::NodePtr* node);
41+
Status Visit(const TimeType& type, ::avro::NodePtr* node);
42+
Status Visit(const TimestampType& type, ::avro::NodePtr* node);
43+
Status Visit(const TimestampTzType& type, ::avro::NodePtr* node);
44+
Status Visit(const StringType& type, ::avro::NodePtr* node);
45+
Status Visit(const UuidType& type, ::avro::NodePtr* node);
46+
Status Visit(const FixedType& type, ::avro::NodePtr* node);
47+
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
48+
Status Visit(const StructType& type, ::avro::NodePtr* node);
49+
Status Visit(const ListType& type, ::avro::NodePtr* node);
50+
Status Visit(const MapType& type, ::avro::NodePtr* node);
51+
Status Visit(const SchemaField& field, ::avro::NodePtr* node);
52+
53+
private:
54+
// Store recently accessed field ids on the current visitor path.
55+
std::stack<int32_t> field_ids_;
56+
};
57+
58+
} // namespace iceberg::avro

test/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ add_test(NAME util_test COMMAND util_test)
6565

6666
if(ICEBERG_BUILD_BUNDLE)
6767
add_executable(avro_test)
68-
target_sources(avro_test PRIVATE avro_test.cc)
68+
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc)
6969
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
7070
GTest::gmock)
7171
add_test(NAME avro_test COMMAND avro_test)

0 commit comments

Comments
 (0)