From 1b9120d061577e384337897dd446b2231320a796 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 26 Aug 2025 14:54:21 +0800 Subject: [PATCH] refactor: clean up code to apply name mapping on avro --- src/iceberg/avro/avro_schema_util.cc | 261 +++++++------------ src/iceberg/avro/avro_schema_util_internal.h | 7 - 2 files changed, 97 insertions(+), 171 deletions(-) diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 8e8e8fe3f..d411b4ecb 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -772,15 +772,73 @@ Result Project(const Schema& expected_schema, namespace { +class NamesGuard { + public: + NamesGuard(std::vector& names, const std::string& name) : names_(names) { + names_.push_back(name); + } + ~NamesGuard() { names_.pop_back(); } + + private: + std::vector& names_; +}; + +// Forward declaration +Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node, + const NameMapping& mapping, + std::vector& names); + void CopyCustomAttributes(const ::avro::CustomAttributes& source, ::avro::CustomAttributes& target) { - for (const auto& attr_pair : source.attributes()) { - target.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false); + for (const auto& [key, value] : source.attributes()) { // NOLINT(modernize-type-traits) + target.addAttribute(key, value, /*addQuote=*/false); + } +} + +Result FindMappedField(const NameMapping& mapping, + const std::vector& names) { + auto field_opt = mapping.Find(names); + if (!field_opt.has_value()) { + return InvalidSchema("Field '{}' not found in name mapping", names.back()); + } + + const MappedField& field = field_opt.value().get(); + if (!field.field_id.has_value()) { + return InvalidSchema("Field ID is missing for field '{}' in name mapping", + names.back()); } + + return field_opt.value(); } -Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node, - const MappedField& field) { +template +Status ProcessChildNode(const ::avro::NodePtr& parent_node, size_t child_index, + const std::string& child_name, std::string_view fieldIdPropKy, + const NameMapping& mapping, std::vector& names, + NodeType& new_parent_node) { + NamesGuard guard(names, child_name); + ICEBERG_ASSIGN_OR_RAISE(auto mapped_field, FindMappedField(mapping, names)); + + ICEBERG_ASSIGN_OR_RAISE( + auto new_child_node, + MakeAvroNodeWithFieldIds(parent_node->leafAt(child_index), mapping, names)); + + ::avro::CustomAttributes attributes; + attributes.addAttribute(std::string(fieldIdPropKy), + std::to_string(*mapped_field.get().field_id), + /*addQuote=*/false); + if (parent_node->customAttributes() > child_index) { + CopyCustomAttributes(parent_node->customAttributesAt(child_index), attributes); + } + + new_parent_node->addLeaf(new_child_node); + new_parent_node->addCustomAttributesForField(attributes); + return {}; +} + +Result<::avro::NodePtr> MakeRecordNodeWithFieldIds(const ::avro::NodePtr& original_node, + const NameMapping& mapping, + std::vector& names) { auto new_record_node = std::make_shared<::avro::NodeRecord>(); new_record_node->setName(original_node->name()); @@ -792,57 +850,18 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig const std::string& field_name = original_node->nameAt(i); ::avro::NodePtr field_node = original_node->leafAt(i); - // TODO(liuxiaoyu): Add support for case sensitivity in name matching. - // Try to find nested field by name - const MappedField* nested_field = nullptr; - if (field.nested_mapping) { - auto fields_span = field.nested_mapping->fields(); - for (const auto& f : fields_span) { - if (f.names.find(field_name) != f.names.end()) { - nested_field = &f; - break; - } - } - } - - if (!nested_field) { - return InvalidSchema("Field '{}' not found in nested mapping", field_name); - } - - if (!nested_field->field_id.has_value()) { - return InvalidSchema("Field ID is missing for field '{}' in nested mapping", - field_name); - } - - // Preserve existing custom attributes for this field - ::avro::CustomAttributes attributes; - if (i < original_node->customAttributes()) { - // Copy all existing attributes from the original node - for (const auto& attr_pair : original_node->customAttributesAt(i).attributes()) { - // Copy each existing attribute to preserve original metadata - attributes.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false); - } - } - - // Add field ID attribute to the new node (preserving existing attributes) - attributes.addAttribute(std::string(kFieldIdProp), - std::to_string(nested_field->field_id.value()), - /*addQuote=*/false); + ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, i, field_name, kFieldIdProp, + mapping, names, new_record_node)); - new_record_node->addCustomAttributesForField(attributes); - - // Recursively apply field IDs to nested fields - ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node, - MakeAvroNodeWithFieldIds(field_node, *nested_field)); new_record_node->addName(field_name); - new_record_node->addLeaf(new_nested_node); } return new_record_node; } -Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node, - const MappedField& field) { +Result<::avro::NodePtr> MakeArrayNodeWithFieldIds(const ::avro::NodePtr& original_node, + const NameMapping& mapping, + std::vector& names) { if (original_node->leaves() != 1) { return InvalidSchema("Array type must have exactly one leaf"); } @@ -851,130 +870,42 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi // Check if this is a map represented as array if (HasMapLogicalType(original_node)) { - ICEBERG_ASSIGN_OR_RAISE(auto new_element_node, - MakeAvroNodeWithFieldIds(original_node->leafAt(0), field)); + ICEBERG_ASSIGN_OR_RAISE( + auto new_element_node, + MakeAvroNodeWithFieldIds(original_node->leafAt(0), mapping, names)); new_array_node->addLeaf(new_element_node); - // Check and add custom attributes if (original_node->customAttributes() > 0) { - ::avro::CustomAttributes merged_attributes; - const auto& original_attrs = original_node->customAttributesAt(0); - CopyCustomAttributes(original_attrs, merged_attributes); - // Add merged attributes if we found any - if (merged_attributes.attributes().size() > 0) { - new_array_node->addCustomAttributesForField(merged_attributes); - } + new_array_node->addCustomAttributesForField(original_node->customAttributesAt(0)); } return new_array_node; } - // For regular arrays, use the first field from nested mapping as element field - if (!field.nested_mapping || field.nested_mapping->fields().empty()) { - return InvalidSchema("Array type requires nested mapping with element field"); - } - - const auto& element_field = field.nested_mapping->fields()[0]; - - if (!element_field.field_id.has_value()) { - return InvalidSchema("Field ID is missing for element field in array"); - } - - ICEBERG_ASSIGN_OR_RAISE( - auto new_element_node, - MakeAvroNodeWithFieldIds(original_node->leafAt(0), element_field)); - new_array_node->addLeaf(new_element_node); - - // Create merged custom attributes with element field ID - ::avro::CustomAttributes merged_attributes; - - // First add our element field ID (highest priority) - merged_attributes.addAttribute(std::string(kElementIdProp), - std::to_string(*element_field.field_id), - /*addQuote=*/false); - - // Then merge any custom attributes from original node - if (original_node->customAttributes() > 0) { - const auto& original_attrs = original_node->customAttributesAt(0); - CopyCustomAttributes(original_attrs, merged_attributes); - } - - // Add all attributes at once - new_array_node->addCustomAttributesForField(merged_attributes); + ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 0, "element", kElementIdProp, + mapping, names, new_array_node)); return new_array_node; } -Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node, - const MappedField& field) { +Result<::avro::NodePtr> MakeMapNodeWithFieldIds(const ::avro::NodePtr& original_node, + const NameMapping& mapping, + std::vector& names) { if (original_node->leaves() != 2) { return InvalidSchema("Map type must have exactly two leaves"); } auto new_map_node = std::make_shared<::avro::NodeMap>(); - - // For map types, we need to extract key and value field mappings from the nested - // mapping - if (!field.nested_mapping) { - return InvalidSchema("Map type requires nested mapping for key and value fields"); - } - - // For map types, use the first two fields from nested mapping as key and value - if (!field.nested_mapping || field.nested_mapping->fields().size() < 2) { - return InvalidSchema("Map type requires nested mapping with key and value fields"); - } - - const auto& key_mapped_field = field.nested_mapping->fields()[0]; - const auto& value_mapped_field = field.nested_mapping->fields()[1]; - - if (!key_mapped_field.field_id || !value_mapped_field.field_id) { - return InvalidSchema("Map key and value fields must have field IDs"); - } - - // Add key and value nodes - ICEBERG_ASSIGN_OR_RAISE( - auto new_key_node, - MakeAvroNodeWithFieldIds(original_node->leafAt(0), key_mapped_field)); - ICEBERG_ASSIGN_OR_RAISE( - auto new_value_node, - MakeAvroNodeWithFieldIds(original_node->leafAt(1), value_mapped_field)); - new_map_node->addLeaf(new_key_node); - new_map_node->addLeaf(new_value_node); - - // Create key and value attributes - ::avro::CustomAttributes key_attributes; - ::avro::CustomAttributes value_attributes; - - // Add required field IDs - key_attributes.addAttribute(std::string(kKeyIdProp), - std::to_string(*key_mapped_field.field_id), - /*addQuote=*/false); - value_attributes.addAttribute(std::string(kValueIdProp), - std::to_string(*value_mapped_field.field_id), - /*addQuote=*/false); - - // Merge custom attributes from original node if they exist - if (original_node->customAttributes() > 0) { - // Merge attributes for key (index 0) - const auto& original_key_attrs = original_node->customAttributesAt(0); - CopyCustomAttributes(original_key_attrs, key_attributes); - - // Merge attributes for value (index 1) - if (original_node->customAttributes() > 1) { - const auto& original_value_attrs = original_node->customAttributesAt(1); - CopyCustomAttributes(original_value_attrs, value_attributes); - } - } - - // Add the merged attributes to the new map node - new_map_node->addCustomAttributesForField(key_attributes); - new_map_node->addCustomAttributesForField(value_attributes); - + ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 0, "key", kKeyIdProp, mapping, + names, new_map_node)); + ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 1, "value", kValueIdProp, + mapping, names, new_map_node)); return new_map_node; } -Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node, - const MappedField& field) { +Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& original_node, + const NameMapping& mapping, + std::vector& names) { if (original_node->leaves() != 2) { return InvalidSchema("Union type must have exactly two branches"); } @@ -987,14 +918,16 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi if (branch_0_is_null && !branch_1_is_null) { // branch_0 is null, branch_1 is not null - ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, MakeAvroNodeWithFieldIds(branch_1, field)); + ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, + MakeAvroNodeWithFieldIds(branch_1, mapping, names)); auto new_union_node = std::make_shared<::avro::NodeUnion>(); new_union_node->addLeaf(branch_0); // null branch new_union_node->addLeaf(new_branch_1); return new_union_node; } else if (!branch_0_is_null && branch_1_is_null) { // branch_0 is not null, branch_1 is null - ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, MakeAvroNodeWithFieldIds(branch_0, field)); + ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, + MakeAvroNodeWithFieldIds(branch_0, mapping, names)); auto new_union_node = std::make_shared<::avro::NodeUnion>(); new_union_node->addLeaf(new_branch_0); new_union_node->addLeaf(branch_1); // null branch @@ -1008,19 +941,18 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi } } -} // namespace - Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node, - const MappedField& mapped_field) { + const NameMapping& mapping, + std::vector& names) { switch (original_node->type()) { case ::avro::AVRO_RECORD: - return CreateRecordNodeWithFieldIds(original_node, mapped_field); + return MakeRecordNodeWithFieldIds(original_node, mapping, names); case ::avro::AVRO_ARRAY: - return CreateArrayNodeWithFieldIds(original_node, mapped_field); + return MakeArrayNodeWithFieldIds(original_node, mapping, names); case ::avro::AVRO_MAP: - return CreateMapNodeWithFieldIds(original_node, mapped_field); + return MakeMapNodeWithFieldIds(original_node, mapping, names); case ::avro::AVRO_UNION: - return CreateUnionNodeWithFieldIds(original_node, mapped_field); + return MakeUnionNodeWithFieldIds(original_node, mapping, names); case ::avro::AVRO_BOOL: case ::avro::AVRO_INT: case ::avro::AVRO_LONG: @@ -1039,11 +971,12 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original } } +} // namespace + Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node, const NameMapping& mapping) { - MappedField mapped_field; - mapped_field.nested_mapping = std::make_shared(mapping.AsMappedFields()); - return MakeAvroNodeWithFieldIds(original_node, mapped_field); + std::vector names; + return MakeAvroNodeWithFieldIds(original_node, mapping, names); } } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index ac0083458..16478703a 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -149,13 +149,6 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type); /// \return True if the node has a map logical type, false otherwise. bool HasMapLogicalType(const ::avro::NodePtr& node); -/// \brief Create a new Avro node with field IDs from name mapping. -/// \param original_node The original Avro node to copy. -/// \param mapped_field The mapped field to apply field IDs from. -/// \return A new Avro node with field IDs applied, or an error. -Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node, - const MappedField& mapped_field); - /// \brief Create a new Avro node with field IDs from name mapping. /// \param original_node The original Avro node to copy. /// \param mapping The name mapping to apply field IDs from.