diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 37a34d702..229c62b48 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -31,6 +31,10 @@ #include #include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/schema.h" +#include "iceberg/schema_util_internal.h" +#include "iceberg/util/formatter.h" #include "iceberg/util/macros.h" #include "iceberg/util/visit_type.h" @@ -377,4 +381,402 @@ Status HasIdVisitor::Visit(const ::avro::ValidSchema& schema) { Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.root()); } +namespace { + +std::string ToString(const ::avro::NodePtr& node) { + std::stringstream ss; + ss << *node; + return ss.str(); +} + +std::string ToString(const ::avro::LogicalType& logical_type) { + std::stringstream ss; + logical_type.printJson(ss); + return ss.str(); +} + +std::string ToString(const ::avro::LogicalType::Type& logical_type) { + return ToString(::avro::LogicalType(logical_type)); +} + +bool HasLogicalType(const ::avro::NodePtr& node, + ::avro::LogicalType::Type expected_type) { + return node->logicalType().type() == expected_type; +} + +bool HasMapLogicalType(const ::avro::NodePtr& node) { + return node->logicalType().type() == ::avro::LogicalType::CUSTOM && + node->logicalType().customLogicalType() != nullptr && + node->logicalType().customLogicalType()->name() == "map"; +} + +std::optional GetAdjustToUtc(const ::avro::NodePtr& node) { + if (node->customAttributes() == 0) { + return std::nullopt; + } + return node->customAttributesAt(0).getAttribute(std::string(kAdjustToUtcProp)); +} + +Result GetId(const ::avro::NodePtr& node, const std::string& attr_name, + size_t field_idx) { + if (field_idx >= node->customAttributes()) { + return InvalidSchema("Field index {} exceeds available custom attributes {}", + field_idx, node->customAttributes()); + } + + auto id_str = node->customAttributesAt(field_idx).getAttribute(attr_name); + if (!id_str.has_value()) { + return InvalidSchema("Missing avro attribute: {}", attr_name); + } + + try { + return std::stoi(id_str.value()); + } catch (const std::exception& e) { + return InvalidSchema("Invalid {}: {}", attr_name, id_str.value()); + } +} + +Result GetElementId(const ::avro::NodePtr& node) { + static const std::string kElementIdKey{kElementIdProp}; + return GetId(node, kElementIdKey, /*field_idx=*/0); +} + +Result GetKeyId(const ::avro::NodePtr& node) { + static const std::string kKeyIdKey{kKeyIdProp}; + return GetId(node, kKeyIdKey, /*field_idx=*/0); +} + +Result GetValueId(const ::avro::NodePtr& node) { + static const std::string kValueIdKey{kValueIdProp}; + return GetId(node, kValueIdKey, /*field_idx=*/0); +} + +Result GetFieldId(const ::avro::NodePtr& node, size_t field_idx) { + static const std::string kFieldIdKey{kFieldIdProp}; + return GetId(node, kFieldIdKey, field_idx); +} + +Status ValidateAvroSchemaEvolution(const Type& expected_type, + const ::avro::NodePtr& avro_node) { + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (avro_node->type() == ::avro::AVRO_BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (avro_node->type() == ::avro::AVRO_INT) { + return {}; + } + break; + case TypeId::kLong: + if (avro_node->type() == ::avro::AVRO_LONG || + avro_node->type() == ::avro::AVRO_INT) { + return {}; + } + break; + case TypeId::kFloat: + if (avro_node->type() == ::avro::AVRO_FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (avro_node->type() == ::avro::AVRO_DOUBLE || + avro_node->type() == ::avro::AVRO_FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (avro_node->type() == ::avro::AVRO_INT && + HasLogicalType(avro_node, ::avro::LogicalType::DATE)) { + return {}; + } + break; + case TypeId::kTime: + if (avro_node->type() == ::avro::AVRO_LONG && + HasLogicalType(avro_node, ::avro::LogicalType::TIME_MICROS)) { + return {}; + } + break; + case TypeId::kTimestamp: + if (avro_node->type() == ::avro::AVRO_LONG && + HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) && + GetAdjustToUtc(avro_node).value_or("false") == "true") { + return {}; + } + break; + case TypeId::kTimestampTz: + if (avro_node->type() == ::avro::AVRO_LONG && + HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) && + GetAdjustToUtc(avro_node).value_or("false") == "true") { + return {}; + } + break; + case TypeId::kString: + if (avro_node->type() == ::avro::AVRO_STRING) { + return {}; + } + break; + case TypeId::kDecimal: + if (avro_node->type() == ::avro::AVRO_FIXED && + HasLogicalType(avro_node, ::avro::LogicalType::DECIMAL)) { + const auto& decimal_type = + internal::checked_cast(expected_type); + const auto logical_type = avro_node->logicalType(); + if (decimal_type.scale() == logical_type.scale() && + decimal_type.precision() >= logical_type.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (avro_node->type() == ::avro::AVRO_FIXED && avro_node->fixedSize() == 16 && + HasLogicalType(avro_node, ::avro::LogicalType::UUID)) { + return {}; + } + break; + case TypeId::kFixed: + if (avro_node->type() == ::avro::AVRO_FIXED && + avro_node->fixedSize() == + internal::checked_cast(expected_type).length()) { + return {}; + } + break; + case TypeId::kBinary: + if (avro_node->type() == ::avro::AVRO_BYTES) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Avro type: {}", expected_type, + ToString(avro_node)); +} + +// XXX: Result<::avro::NodePtr> leads to unresolved external symbol error on Windows. +Status UnwrapUnion(const ::avro::NodePtr& node, ::avro::NodePtr* result) { + if (node->type() != ::avro::AVRO_UNION) { + *result = node; + return {}; + } + if (node->leaves() != 2) { + return InvalidSchema("Union type must have exactly two branches"); + } + auto branch_0 = node->leafAt(0); + auto branch_1 = node->leafAt(1); + if (branch_0->type() == ::avro::AVRO_NULL) { + *result = branch_1; + } else if (branch_1->type() == ::avro::AVRO_NULL) { + *result = branch_0; + } else { + return InvalidSchema("Union type must have exactly one null branch, got {}", + ToString(node)); + } + return {}; +} + +// Forward declaration +Result ProjectNested(const Type& expected_type, + const ::avro::NodePtr& avro_node, + bool prune_source); + +Result ProjectStruct(const StructType& struct_type, + const ::avro::NodePtr& avro_node, + bool prune_source) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidSchema("Expected AVRO_RECORD type, but got {}", ToString(avro_node)); + } + + const auto& expected_fields = struct_type.fields(); + + struct NodeInfo { + size_t local_index; + ::avro::NodePtr field_node; + }; + std::unordered_map node_info_map; + node_info_map.reserve(avro_node->leaves()); + + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_ASSIGN_OR_RAISE(int32_t field_id, GetFieldId(avro_node, i)); + ::avro::NodePtr field_node = avro_node->leafAt(i); + if (const auto [iter, inserted] = node_info_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id), + std::forward_as_tuple(i, field_node)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Avro schema: {}", field_id); + } + } + + FieldProjection result; + result.children.reserve(expected_fields.size()); + + for (const auto& expected_field : expected_fields) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) { + ::avro::NodePtr field_node; + ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node)); + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + child_projection, + ProjectNested(*expected_field.type(), field_node, prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateAvroSchemaEvolution(*expected_field.type(), field_node)); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + return InvalidSchema("Missing required field with ID: {}", field_id); + } + + result.children.emplace_back(std::move(child_projection)); + } + + if (prune_source) { + PruneFieldProjection(result); + } + + return result; +} + +Result ProjectList(const ListType& list_type, + const ::avro::NodePtr& avro_node, bool prune_source) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidSchema("Expected AVRO_ARRAY type, but got {}", ToString(avro_node)); + } + if (avro_node->leaves() != 1) { + return InvalidSchema("Array type must have exactly one node, got {}", + avro_node->leaves()); + } + + const auto& expected_element_field = list_type.fields().back(); + ICEBERG_ASSIGN_OR_RAISE(int32_t avro_element_id, GetElementId(avro_node)); + if (expected_element_field.field_id() != avro_element_id) [[unlikely]] { + return InvalidSchema("element-id mismatch, expected {}, got {}", + expected_element_field.field_id(), avro_element_id); + } + + FieldProjection element_projection; + ::avro::NodePtr element_node; + ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node)); + if (expected_element_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + element_projection, + ProjectNested(*expected_element_field.type(), element_node, prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node)); + } + + FieldProjection result; + result.children.emplace_back(std::move(element_projection)); + return result; +} + +Result ProjectMap(const MapType& map_type, + const ::avro::NodePtr& avro_node, bool prune_source) { + const auto& expected_key_field = map_type.key(); + const auto& expected_value_field = map_type.value(); + + FieldProjection result, key_projection, value_projection; + int32_t avro_key_id, avro_value_id; + ::avro::NodePtr map_node; + + if (avro_node->type() == ::avro::AVRO_MAP) { + if (avro_node->leaves() != 2) { + return InvalidSchema("Map type must have exactly two nodes, got {}", + avro_node->leaves()); + } + map_node = avro_node; + + ICEBERG_ASSIGN_OR_RAISE(avro_key_id, GetKeyId(avro_node)); + ICEBERG_ASSIGN_OR_RAISE(avro_value_id, GetValueId(avro_node)); + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + if (avro_node->leaves() != 1) { + return InvalidSchema("Array-backed map type must have exactly one node, got {}", + avro_node->leaves()); + } + + map_node = avro_node->leafAt(0); + if (map_node->type() != ::avro::AVRO_RECORD || map_node->leaves() != 2) { + return InvalidSchema( + "Array-backed map type must have a record node with two fields"); + } + + ICEBERG_ASSIGN_OR_RAISE(avro_key_id, GetFieldId(map_node, 0)); + ICEBERG_ASSIGN_OR_RAISE(avro_value_id, GetFieldId(map_node, 1)); + } else { + return InvalidSchema("Expected a map type, but got Avro type {}", + ToString(avro_node)); + } + + if (expected_key_field.field_id() != avro_key_id) { + return InvalidSchema("key-id mismatch, expected {}, got {}", + expected_key_field.field_id(), avro_key_id); + } + if (expected_value_field.field_id() != avro_value_id) { + return InvalidSchema("value-id mismatch, expected {}, got {}", + expected_value_field.field_id(), avro_value_id); + } + + for (size_t i = 0; i < map_node->leaves(); ++i) { + FieldProjection sub_projection; + ::avro::NodePtr sub_node; + ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node)); + const auto& expected_sub_field = map_type.fields()[i]; + if (expected_sub_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(), + sub_node, prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node)); + } + sub_projection.kind = FieldProjection::Kind::kProjected; + sub_projection.from = i; + result.children.emplace_back(std::move(sub_projection)); + } + + return result; +} + +Result ProjectNested(const Type& expected_type, + const ::avro::NodePtr& avro_node, + bool prune_source) { + if (!expected_type.is_nested()) { + return InvalidSchema("Expected a nested type, but got {}", expected_type); + } + + switch (expected_type.type_id()) { + case TypeId::kStruct: + return ProjectStruct(internal::checked_cast(expected_type), + avro_node, prune_source); + case TypeId::kList: + return ProjectList(internal::checked_cast(expected_type), + avro_node, prune_source); + case TypeId::kMap: + return ProjectMap(internal::checked_cast(expected_type), avro_node, + prune_source); + default: + return InvalidSchema("Unsupported nested type: {}", expected_type); + } +} + +} // namespace + +Result Project(const Schema& expected_schema, + const ::avro::NodePtr& avro_node, bool prune_source) { + ICEBERG_ASSIGN_OR_RAISE( + auto field_projection, + ProjectNested(static_cast(expected_schema), avro_node, prune_source)); + return SchemaProjection{std::move(field_projection.children)}; +} + } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index d0beb09a4..50ff9b239 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -24,6 +24,7 @@ #include #include "iceberg/result.h" +#include "iceberg/schema_util.h" #include "iceberg/type.h" namespace avro { @@ -121,4 +122,17 @@ class HasIdVisitor { size_t fields_with_id_ = 0; }; +/// \brief Project an Iceberg Schema onto an Avro NodePtr. +/// +/// This function creates a projection from an Iceberg Schema to an Avro schema node. +/// The projection determines how to read data from the Avro schema into the expected +/// Iceberg Schema. +/// +/// \param expected_schema The Iceberg Schema that defines the expected structure. +/// \param avro_node The Avro node to read data from. +/// \param prune_source Whether the source schema can be pruned. +/// \return The schema projection result. +Result Project(const Schema& expected_schema, + const ::avro::NodePtr& avro_node, bool prune_source); + } // namespace iceberg::avro diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index d8a5dcf31..3e409efd7 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -20,12 +20,12 @@ #include "iceberg/schema_util.h" #include -#include #include #include #include "iceberg/metadata_columns.h" #include "iceberg/schema.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/formatter_internal.h" #include "iceberg/util/macros.h" @@ -78,25 +78,6 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ return NotSupported("Cannot read {} from {}", expected_type, source_type); } -// Fix `from` field of `FieldProjection` to use pruned field index. -void PruneFieldProjection(FieldProjection& field_projection) { - std::map local_index_to_pruned_index; - for (const auto& child_projection : field_projection.children) { - if (child_projection.kind == FieldProjection::Kind::kProjected) { - local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 0); - } - } - for (size_t pruned_index = 0; auto& [_, value] : local_index_to_pruned_index) { - value = pruned_index++; - } - for (auto& child_projection : field_projection.children) { - if (child_projection.kind == FieldProjection::Kind::kProjected) { - child_projection.from = - local_index_to_pruned_index.at(std::get<1>(child_projection.from)); - } - } -} - Result ProjectNested(const Type& expected_type, const Type& source_type, bool prune_source) { if (!expected_type.is_nested()) { diff --git a/src/iceberg/schema_util_internal.h b/src/iceberg/schema_util_internal.h new file mode 100644 index 000000000..33aad93a4 --- /dev/null +++ b/src/iceberg/schema_util_internal.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/schema_util.h" + +namespace iceberg { + +// Fix `from` field of `FieldProjection` to use pruned field index. +void PruneFieldProjection(FieldProjection& field_projection) { + std::map local_index_to_pruned_index; + for (const auto& child_projection : field_projection.children) { + if (child_projection.kind == FieldProjection::Kind::kProjected) { + local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 0); + } + } + for (size_t pruned_index = 0; auto& [_, value] : local_index_to_pruned_index) { + value = pruned_index++; + } + for (auto& child_projection : field_projection.children) { + if (child_projection.kind == FieldProjection::Kind::kProjected) { + child_projection.from = + local_index_to_pruned_index.at(std::get<1>(child_projection.from)); + } + } +} + +} // namespace iceberg diff --git a/test/avro_schema_test.cc b/test/avro_schema_test.cc index b24354fdd..7d18ae813 100644 --- a/test/avro_schema_test.cc +++ b/test/avro_schema_test.cc @@ -25,6 +25,8 @@ #include #include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/schema.h" #include "matchers.h" namespace iceberg::avro { @@ -515,4 +517,556 @@ TEST(HasIdVisitorTest, ArrayBackedMapWithPartialIds) { EXPECT_FALSE(visitor.AllHaveIds()); } +TEST(AvroSchemaProjectionTest, ProjectIdenticalSchemas) { + // Create an iceberg schema + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/3, "age", std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/4, "data", std::make_shared()), + }); + + // Create equivalent avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "name", "type": ["null", "string"], "field-id": 2}, + {"name": "age", "type": ["null", "int"], "field-id": 3}, + {"name": "data", "type": "double", "field-id": 4} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + for (size_t i = 0; i < projection.fields.size(); ++i) { + ASSERT_EQ(projection.fields[i].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[i].from), i); + } +} + +TEST(AvroSchemaProjectionTest, ProjectSubsetSchema) { + // Create a subset iceberg schema + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/3, "age", std::make_shared()), + }); + + // Create full avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "name", "type": ["null", "string"], "field-id": 2}, + {"name": "age", "type": ["null", "int"], "field-id": 3}, + {"name": "data", "type": "double", "field-id": 4} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].from), 2); +} + +TEST(AvroSchemaProjectionTest, ProjectWithPruning) { + // Create a subset iceberg schema + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/3, "age", std::make_shared()), + }); + + // Create full avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "name", "type": ["null", "string"], "field-id": 2}, + {"name": "age", "type": ["null", "int"], "field-id": 3}, + {"name": "data", "type": "double", "field-id": 4} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].from), 1); +} + +TEST(AvroSchemaProjectionTest, ProjectMissingOptionalField) { + // Create iceberg schema with an extra optional field + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/10, "extra", std::make_shared()), + }); + + // Create avro schema without the extra field + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "name", "type": ["null", "string"], "field-id": 2} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].from), 1); + ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, ProjectMissingRequiredField) { + // Create iceberg schema with a required field that's missing from the avro schema + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/10, "extra", std::make_shared()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "name", "type": ["null", "string"], "field-id": 2} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Missing required field")); +} + +TEST(AvroSchemaProjectionTest, ProjectMetadataColumn) { + // Create iceberg schema with a metadata column + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + MetadataColumns::kFilePath, + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata); +} + +TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIntToLong) { + // Create iceberg schema expecting a long + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + }); + + // Create avro schema with an int + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "int", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); +} + +TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { + // Create iceberg schema expecting a double + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", std::make_shared()), + }); + + // Create avro schema with a float + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "value", "type": "float", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); +} + +TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { + // Create iceberg schema expecting an int + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", std::make_shared()), + }); + + // Create avro schema with a string + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "value", "type": "string", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); +} + +TEST(AvroSchemaProjectionTest, ProjectNestedStructures) { + // Create iceberg schema with nested struct + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/101, "street", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/102, "city", + std::make_shared()), + })), + }); + + // Create equivalent avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "address", "type": ["null", { + "type": "record", + "name": "address_record", + "fields": [ + {"name": "street", "type": ["null", "string"], "field-id": 101}, + {"name": "city", "type": ["null", "string"], "field-id": 102} + ] + }], "field-id": 3} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].from), 1); + + // Verify struct field has children correctly mapped + ASSERT_EQ(projection.fields[1].children.size(), 2); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].children[0].from), 0); + ASSERT_EQ(projection.fields[1].children[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].children[1].from), 1); +} + +TEST(AvroSchemaProjectionTest, ProjectListType) { + // Create iceberg schema with a list + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional( + /*field_id=*/2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", std::make_shared()))), + }); + + // Create equivalent avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "id", "type": "long", "field-id": 1}, + {"name": "numbers", "type": ["null", { + "type": "array", + "items": ["null", "int"], + "element-id": 101 + }], "field-id": 2} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[1].from), 1); +} + +TEST(AvroSchemaProjectionTest, ProjectMapType) { + // Create iceberg schema with a string->int map + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/101, "key", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/102, "value", + std::make_shared()))), + }); + + // Create equivalent avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "counts", "type": ["null", { + "type": "map", + "values": ["null", "int"], + "key-id": 101, + "value-id": 102 + }], "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[0].children.size(), 2); +} + +TEST(AvroSchemaProjectionTest, ProjectMapTypeWithNonStringKey) { + // Create iceberg schema with an int->string map + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/101, "key", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/102, "value", + std::make_shared()))), + }); + + // Create equivalent avro schema (using array-backed map for non-string keys) + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "counts", "type": ["null", { + "type": "array", + "items": { + "type": "record", + "name": "key_value", + "fields": [ + {"name": "key", "type": "int", "field-id": 101}, + {"name": "value", "type": ["null", "string"], "field-id": 102} + ] + }, + "logicalType": "map" + }], "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + ASSERT_EQ(projection.fields[0].children.size(), 2); +} + +TEST(AvroSchemaProjectionTest, ProjectListOfStruct) { + // Create iceberg schema with list of struct + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/102, "x", + std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/103, "y", + std::make_shared()), + })))), + }); + + // Create equivalent avro schema + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "items", "type": ["null", { + "type": "array", + "items": ["null", { + "type": "record", + "name": "element_record", + "fields": [ + {"name": "x", "type": ["null", "int"], "field-id": 102}, + {"name": "y", "type": "string", "field-id": 103} + ] + }], + "element-id": 101 + }], "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); + + // Verify list element struct is properly projected + ASSERT_EQ(projection.fields[0].children.size(), 1); + const auto& element_proj = projection.fields[0].children[0]; + ASSERT_EQ(element_proj.children.size(), 2); + ASSERT_EQ(element_proj.children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(element_proj.children[0].from), 0); + ASSERT_EQ(element_proj.children[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(element_proj.children[1].from), 1); +} + +TEST(AvroSchemaProjectionTest, ProjectDecimalType) { + // Create iceberg schema with decimal + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", + std::make_shared(18, 2)), + }); + + // Create avro schema with decimal + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + { + "name": "value", + "type": { + "type": "fixed", + "name": "decimal_9_2", + "size": 4, + "logicalType": "decimal", + "precision": 9, + "scale": 2 + }, + "field-id": 1 + } + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); +} + +TEST(AvroSchemaProjectionTest, ProjectDecimalIncompatible) { + // Create iceberg schema with decimal having different scale + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", + std::make_shared(18, 3)), + }); + + // Create avro schema with decimal + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + { + "name": "value", + "type": { + "type": "fixed", + "name": "decimal_9_2", + "size": 4, + "logicalType": "decimal", + "precision": 9, + "scale": 2 + }, + "field-id": 1 + } + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); +} + } // namespace iceberg::avro