diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 328befd54..0dfb57790 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -80,7 +80,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc - avro/demo_avro.cc) + avro/demo_avro.cc avro/avro_schema_util.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc new file mode 100644 index 000000000..fdbac82d5 --- /dev/null +++ b/src/iceberg/avro/avro_schema_util.cc @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/visit_type.h" + +namespace iceberg::avro { + +namespace { + +constexpr std::string_view kIcebergFieldNameProp = "iceberg-field-name"; +constexpr std::string_view kFieldIdProp = "field-id"; +constexpr std::string_view kKeyIdProp = "key-id"; +constexpr std::string_view kValueIdProp = "value-id"; +constexpr std::string_view kElementIdProp = "element-id"; +constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc"; + +struct MapLogicalType : public ::avro::CustomLogicalType { + MapLogicalType() : ::avro::CustomLogicalType("map") {} +}; + +::avro::LogicalType GetMapLogicalType() { + static std::once_flag flag{}; + std::call_once(flag, []() { + // Register the map logical type with the avro custom logical type registry. + // See https://github.com/apache/avro/pull/3326 for details. + ::avro::CustomLogicalTypeRegistry::instance().registerType( + "map", [](const std::string&) { return std::make_shared(); }); + }); + return ::avro::LogicalType(std::make_shared()); +} + +::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) { + ::avro::CustomAttributes attributes; + attributes.addAttribute(std::string(kFieldIdProp), std::to_string(field_id), + /*addQuotes=*/false); + return attributes; +} + +} // namespace + +Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const IntType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_INT); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const LongType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const FloatType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_FLOAT); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const DoubleType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_DOUBLE); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const DecimalType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodeFixed>(); + (*node)->setName( + ::avro::Name(std::format("decimal_{}_{}", type.precision(), type.scale()))); + (*node)->setFixedSize(::arrow::DecimalType::DecimalSize(type.precision())); + + ::avro::LogicalType logical_type(::avro::LogicalType::DECIMAL); + logical_type.setPrecision(type.precision()); + logical_type.setScale(type.scale()); + (*node)->setLogicalType(logical_type); + + return {}; +} + +Status ToAvroNodeVisitor::Visit(const DateType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_INT); + (*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::DATE}); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const TimeType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG); + (*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIME_MICROS}); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const TimestampType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG); + (*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_MICROS}); + ::avro::CustomAttributes attributes; + attributes.addAttribute(std::string(kAdjustToUtcProp), "false", /*addQuotes=*/false); + (*node)->addCustomAttributesForField(attributes); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const TimestampTzType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG); + (*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_MICROS}); + ::avro::CustomAttributes attributes; + attributes.addAttribute(std::string(kAdjustToUtcProp), "true", /*addQuotes=*/false); + (*node)->addCustomAttributesForField(attributes); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const StringType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_STRING); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const UuidType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_FIXED); + (*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::UUID}); + (*node)->setFixedSize(16); + (*node)->setName(::avro::Name("uuid_fixed")); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const FixedType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodeFixed>(); + (*node)->setName(::avro::Name(std::format("fixed_{}", type.length()))); + (*node)->setFixedSize(type.length()); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodeRecord>(); + + if (field_ids_.empty()) { + (*node)->setName(::avro::Name("iceberg_schema")); // Root node + } else { + (*node)->setName(::avro::Name(std::format("r{}", field_ids_.top()))); + } + + for (const SchemaField& sub_field : type.fields()) { + ::avro::NodePtr field_node; + ICEBERG_RETURN_UNEXPECTED(Visit(sub_field, &field_node)); + + // TODO(gangwu): sanitize field name + (*node)->addName(std::string(sub_field.name())); + (*node)->addLeaf(field_node); + (*node)->addCustomAttributesForField(GetAttributesWithFieldId(sub_field.field_id())); + } + return {}; +} + +Status ToAvroNodeVisitor::Visit(const ListType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodeArray>(); + const auto& element_field = type.fields().back(); + + ::avro::CustomAttributes attributes; + attributes.addAttribute(std::string(kElementIdProp), + std::to_string(element_field.field_id()), + /*addQuotes=*/false); + + ::avro::NodePtr element_node; + ICEBERG_RETURN_UNEXPECTED(Visit(element_field, &element_node)); + + (*node)->addCustomAttributesForField(attributes); + (*node)->addLeaf(std::move(element_node)); + return {}; +} + +Status ToAvroNodeVisitor::Visit(const MapType& type, ::avro::NodePtr* node) { + const auto& key_field = type.key(); + const auto& value_field = type.value(); + + if (key_field.optional()) [[unlikely]] { + return InvalidArgument("Map key `{}` must be required", key_field.name()); + } + + if (key_field.type()->type_id() == TypeId::kString) { + ::avro::CustomAttributes attributes; + attributes.addAttribute(std::string(kKeyIdProp), std::to_string(key_field.field_id()), + /*addQuotes=*/false); + attributes.addAttribute(std::string(kValueIdProp), + std::to_string(value_field.field_id()), + /*addQuotes=*/false); + + ::avro::NodePtr value_node; + ICEBERG_RETURN_UNEXPECTED(Visit(value_field, &value_node)); + + *node = std::make_shared<::avro::NodeMap>(); + (*node)->addLeaf(std::move(value_node)); + (*node)->addCustomAttributesForField(attributes); + } else { + auto struct_node = std::make_shared<::avro::NodeRecord>(); + struct_node->setName(::avro::Name( + std::format("k{}_v{}", key_field.field_id(), value_field.field_id()))); + + ::avro::NodePtr key_node; + ICEBERG_RETURN_UNEXPECTED(Visit(key_field, &key_node)); + struct_node->addLeaf(std::move(key_node)); + struct_node->addName("key"); + struct_node->addCustomAttributesForField( + GetAttributesWithFieldId(key_field.field_id())); + + ::avro::NodePtr value_node; + ICEBERG_RETURN_UNEXPECTED(Visit(value_field, &value_node)); + struct_node->addLeaf(std::move(value_node)); + struct_node->addName("value"); + struct_node->addCustomAttributesForField( + GetAttributesWithFieldId(value_field.field_id())); + + *node = std::make_shared<::avro::NodeArray>(); + (*node)->addLeaf(std::move(struct_node)); + (*node)->setLogicalType(GetMapLogicalType()); + } + + return {}; +} + +Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node) { + field_ids_.push(field.field_id()); + ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node)); + + if (field.optional()) { + ::avro::MultiLeaves union_types; + union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL)); + union_types.add(std::move(*node)); + *node = std::make_shared<::avro::NodeUnion>(union_types); + } + + field_ids_.pop(); + return {}; +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h new file mode 100644 index 000000000..2bc650e2d --- /dev/null +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include + +#include "iceberg/result.h" +#include "iceberg/type.h" + +namespace iceberg::avro { + +/// \brief A visitor that converts an Iceberg type to an Avro node. +class ToAvroNodeVisitor { + public: + Status Visit(const BooleanType& type, ::avro::NodePtr* node); + Status Visit(const IntType& type, ::avro::NodePtr* node); + Status Visit(const LongType& type, ::avro::NodePtr* node); + Status Visit(const FloatType& type, ::avro::NodePtr* node); + Status Visit(const DoubleType& type, ::avro::NodePtr* node); + Status Visit(const DecimalType& type, ::avro::NodePtr* node); + Status Visit(const DateType& type, ::avro::NodePtr* node); + Status Visit(const TimeType& type, ::avro::NodePtr* node); + Status Visit(const TimestampType& type, ::avro::NodePtr* node); + Status Visit(const TimestampTzType& type, ::avro::NodePtr* node); + Status Visit(const StringType& type, ::avro::NodePtr* node); + Status Visit(const UuidType& type, ::avro::NodePtr* node); + Status Visit(const FixedType& type, ::avro::NodePtr* node); + Status Visit(const BinaryType& type, ::avro::NodePtr* node); + Status Visit(const StructType& type, ::avro::NodePtr* node); + Status Visit(const ListType& type, ::avro::NodePtr* node); + Status Visit(const MapType& type, ::avro::NodePtr* node); + Status Visit(const SchemaField& field, ::avro::NodePtr* node); + + private: + // Store recently accessed field ids on the current visitor path. + std::stack field_ids_; +}; + +} // namespace iceberg::avro diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 31569a78d..0c3776bea 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -65,7 +65,7 @@ add_test(NAME util_test COMMAND util_test) if(ICEBERG_BUILD_BUNDLE) add_executable(avro_test) - target_sources(avro_test PRIVATE avro_test.cc) + target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME avro_test COMMAND avro_test) diff --git a/test/avro_schema_test.cc b/test/avro_schema_test.cc new file mode 100644 index 000000000..35cc4847f --- /dev/null +++ b/test/avro_schema_test.cc @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include + +#include "iceberg/avro/avro_schema_util_internal.h" +#include "matchers.h" + +namespace iceberg::avro { + +namespace { + +void CheckCustomLogicalType(const ::avro::NodePtr& node, const std::string& type_name) { + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::CUSTOM); + ASSERT_TRUE(node->logicalType().customLogicalType() != nullptr); + EXPECT_EQ(node->logicalType().customLogicalType()->name(), type_name); +} + +void CheckFieldIdAt(const ::avro::NodePtr& node, size_t index, int32_t field_id, + const std::string& key = "field-id") { + ASSERT_LT(index, node->customAttributes()); + const auto& attrs = node->customAttributesAt(index); + ASSERT_EQ(attrs.getAttribute(key), std::make_optional(std::to_string(field_id))); +} + +} // namespace + +TEST(ToAvroNodeVisitorTest, BooleanType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(BooleanType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_BOOL); +} + +TEST(ToAvroNodeVisitorTest, IntType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(IntType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_INT); +} + +TEST(ToAvroNodeVisitorTest, LongType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(LongType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_LONG); +} + +TEST(ToAvroNodeVisitorTest, FloatType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(FloatType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_FLOAT); +} + +TEST(ToAvroNodeVisitorTest, DoubleType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(DoubleType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_DOUBLE); +} + +TEST(ToAvroNodeVisitorTest, DecimalType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(DecimalType{10, 2}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_FIXED); + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::DECIMAL); + + EXPECT_EQ(node->logicalType().precision(), 10); + EXPECT_EQ(node->logicalType().scale(), 2); + EXPECT_EQ(node->name().simpleName(), "decimal_10_2"); +} + +TEST(ToAvroNodeVisitorTest, DateType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(DateType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_INT); + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::DATE); +} + +TEST(ToAvroNodeVisitorTest, TimeType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(TimeType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_LONG); + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::TIME_MICROS); +} + +TEST(ToAvroNodeVisitorTest, TimestampType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(TimestampType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_LONG); + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::TIMESTAMP_MICROS); + + ASSERT_EQ(node->customAttributes(), 1); + EXPECT_EQ(node->customAttributesAt(0).getAttribute("adjust-to-utc"), "false"); +} + +TEST(ToAvroNodeVisitorTest, TimestampTzType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(TimestampTzType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_LONG); + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::TIMESTAMP_MICROS); + + ASSERT_EQ(node->customAttributes(), 1); + EXPECT_EQ(node->customAttributesAt(0).getAttribute("adjust-to-utc"), "true"); +} + +TEST(ToAvroNodeVisitorTest, StringType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(StringType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_STRING); +} + +// FIXME: https://issues.apache.org/jira/browse/AVRO-4140 +TEST(ToAvroNodeVisitorTest, DISABLED_UuidType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(UuidType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_FIXED); + EXPECT_EQ(node->logicalType().type(), ::avro::LogicalType::UUID); + + EXPECT_EQ(node->fixedSize(), 16); + EXPECT_EQ(node->name().fullname(), "uuid_fixed"); +} + +TEST(ToAvroNodeVisitorTest, FixedType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(FixedType{20}, &node), IsOk()); + + EXPECT_EQ(node->type(), ::avro::AVRO_FIXED); + EXPECT_EQ(node->fixedSize(), 20); + EXPECT_EQ(node->name().fullname(), "fixed_20"); +} + +TEST(ToAvroNodeVisitorTest, BinaryType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(BinaryType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_BYTES); +} + +TEST(ToAvroNodeVisitorTest, StructType) { + StructType struct_type{ + {SchemaField{/*field_id=*/1, "bool_field", std::make_shared(), + /*optional=*/false}, + SchemaField{/*field_id=*/2, "int_field", std::make_shared(), + /*optional=*/true}}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_RECORD); + + ASSERT_EQ(node->names(), 2); + EXPECT_EQ(node->nameAt(0), "bool_field"); + EXPECT_EQ(node->nameAt(1), "int_field"); + + ASSERT_EQ(node->customAttributes(), 2); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/1)); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/1, /*field_id=*/2)); + + ASSERT_EQ(node->leaves(), 2); + ASSERT_EQ(node->leafAt(0)->type(), ::avro::AVRO_BOOL); + ASSERT_EQ(node->leafAt(1)->type(), ::avro::AVRO_UNION); + ASSERT_EQ(node->leafAt(1)->leaves(), 2); + EXPECT_EQ(node->leafAt(1)->leafAt(0)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(node->leafAt(1)->leafAt(1)->type(), ::avro::AVRO_INT); +} + +TEST(ToAvroNodeVisitorTest, ListType) { + ListType list_type{SchemaField{/*field_id=*/5, "element", + std::make_shared(), + /*optional=*/true}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(list_type, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_ARRAY); + + ASSERT_EQ(node->customAttributes(), 1); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/5, + /*key=*/"element-id")); + + ASSERT_EQ(node->leaves(), 1); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_UNION); + ASSERT_EQ(node->leafAt(0)->leaves(), 2); + EXPECT_EQ(node->leafAt(0)->leafAt(0)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(node->leafAt(0)->leafAt(1)->type(), ::avro::AVRO_STRING); +} + +TEST(ToAvroNodeVisitorTest, MapTypeWithStringKey) { + MapType map_type{SchemaField{/*field_id=*/10, "key", std::make_shared(), + /*optional=*/false}, + SchemaField{/*field_id=*/11, "value", std::make_shared(), + /*optional=*/false}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(map_type, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_MAP); + + ASSERT_GT(node->customAttributes(), 0); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/10, + /*key=*/"key-id")); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/11, + /*key=*/"value-id")); + + ASSERT_EQ(node->leaves(), 2); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_STRING); + EXPECT_EQ(node->leafAt(1)->type(), ::avro::AVRO_INT); +} + +TEST(ToAvroNodeVisitorTest, MapTypeWithNonStringKey) { + MapType map_type{SchemaField{/*field_id=*/10, "key", std::make_shared(), + /*optional=*/false}, + SchemaField{/*field_id=*/11, "value", std::make_shared(), + /*optional=*/false}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(map_type, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_ARRAY); + CheckCustomLogicalType(node, "map"); + + ASSERT_EQ(node->leaves(), 1); + auto record_node = node->leafAt(0); + ASSERT_EQ(record_node->type(), ::avro::AVRO_RECORD); + EXPECT_EQ(record_node->name().fullname(), "k10_v11"); + + ASSERT_EQ(record_node->customAttributes(), 2); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(record_node, /*index=*/0, /*field_id=*/10)); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(record_node, /*index=*/1, /*field_id=*/11)); + + ASSERT_EQ(record_node->names(), 2); + EXPECT_EQ(record_node->nameAt(0), "key"); + EXPECT_EQ(record_node->nameAt(1), "value"); + + ASSERT_EQ(record_node->leaves(), 2); + EXPECT_EQ(record_node->leafAt(0)->type(), ::avro::AVRO_INT); + EXPECT_EQ(record_node->leafAt(1)->type(), ::avro::AVRO_STRING); +} + +TEST(ToAvroNodeVisitorTest, InvalidMapKeyType) { + MapType map_type{SchemaField{/*field_id=*/1, "key", std::make_shared(), + /*optional=*/true}, + SchemaField{/*field_id=*/2, "value", std::make_shared(), + /*optional=*/false}}; + + ::avro::NodePtr node; + auto status = ToAvroNodeVisitor{}.Visit(map_type, &node); + EXPECT_THAT(status, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(status, HasErrorMessage("Map key `key` must be required")); +} + +TEST(ToAvroNodeVisitorTest, NestedTypes) { + auto inner_struct = std::make_shared(std::vector{ + SchemaField{/*field_id=*/2, "string_field", std::make_shared(), + /*optional=*/false}, + SchemaField{/*field_id=*/3, "int_field", std::make_shared(), + /*optional=*/true}}); + auto inner_list = std::make_shared(SchemaField{/*field_id=*/5, "element", + std::make_shared(), + /*optional=*/false}); + StructType root_struct{{SchemaField{/*field_id=*/1, "struct_field", inner_struct, + /*optional=*/false}, + SchemaField{/*field_id=*/4, "list_field", inner_list, + /*optional=*/true}}}; + + ::avro::NodePtr root_node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(root_struct, &root_node), IsOk()); + EXPECT_EQ(root_node->type(), ::avro::AVRO_RECORD); + + ASSERT_EQ(root_node->names(), 2); + EXPECT_EQ(root_node->nameAt(0), "struct_field"); + EXPECT_EQ(root_node->nameAt(1), "list_field"); + + ASSERT_EQ(root_node->customAttributes(), 2); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(root_node, /*index=*/0, /*field_id=*/1)); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(root_node, /*index=*/1, /*field_id=*/4)); + + // Check struct field + auto struct_node = root_node->leafAt(0); + ASSERT_EQ(struct_node->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(struct_node->names(), 2); + EXPECT_EQ(struct_node->nameAt(0), "string_field"); + EXPECT_EQ(struct_node->nameAt(1), "int_field"); + + ASSERT_EQ(struct_node->customAttributes(), 2); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(struct_node, /*index=*/0, /*field_id=*/2)); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(struct_node, /*index=*/1, /*field_id=*/3)); + + ASSERT_EQ(struct_node->leaves(), 2); + EXPECT_EQ(struct_node->leafAt(0)->type(), ::avro::AVRO_STRING); + EXPECT_EQ(struct_node->leafAt(1)->type(), ::avro::AVRO_UNION); + ASSERT_EQ(struct_node->leafAt(1)->leaves(), 2); + EXPECT_EQ(struct_node->leafAt(1)->leafAt(0)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(struct_node->leafAt(1)->leafAt(1)->type(), ::avro::AVRO_INT); + + // Check list field + auto list_union_node = root_node->leafAt(1); + ASSERT_EQ(list_union_node->type(), ::avro::AVRO_UNION); + ASSERT_EQ(list_union_node->leaves(), 2); + EXPECT_EQ(list_union_node->leafAt(0)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(list_union_node->leafAt(1)->type(), ::avro::AVRO_ARRAY); + + auto list_node = list_union_node->leafAt(1); + ASSERT_EQ(list_node->type(), ::avro::AVRO_ARRAY); + + ASSERT_EQ(list_node->customAttributes(), 1); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(list_node, /*index=*/0, /*field_id=*/5, + /*key=*/"element-id")); + + ASSERT_EQ(list_node->leaves(), 1); + EXPECT_EQ(list_node->leafAt(0)->type(), ::avro::AVRO_DOUBLE); +} + +} // namespace iceberg::avro