Skip to content

Commit 1b9120d

Browse files
committed
refactor: clean up code to apply name mapping on avro
1 parent b7fadf5 commit 1b9120d

File tree

2 files changed

+97
-171
lines changed

2 files changed

+97
-171
lines changed

src/iceberg/avro/avro_schema_util.cc

Lines changed: 97 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -772,15 +772,73 @@ Result<SchemaProjection> Project(const Schema& expected_schema,
772772

773773
namespace {
774774

775+
class NamesGuard {
776+
public:
777+
NamesGuard(std::vector<std::string>& names, const std::string& name) : names_(names) {
778+
names_.push_back(name);
779+
}
780+
~NamesGuard() { names_.pop_back(); }
781+
782+
private:
783+
std::vector<std::string>& names_;
784+
};
785+
786+
// Forward declaration
787+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
788+
const NameMapping& mapping,
789+
std::vector<std::string>& names);
790+
775791
void CopyCustomAttributes(const ::avro::CustomAttributes& source,
776792
::avro::CustomAttributes& target) {
777-
for (const auto& attr_pair : source.attributes()) {
778-
target.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);
793+
for (const auto& [key, value] : source.attributes()) { // NOLINT(modernize-type-traits)
794+
target.addAttribute(key, value, /*addQuote=*/false);
795+
}
796+
}
797+
798+
Result<MappedFieldConstRef> FindMappedField(const NameMapping& mapping,
799+
const std::vector<std::string>& names) {
800+
auto field_opt = mapping.Find(names);
801+
if (!field_opt.has_value()) {
802+
return InvalidSchema("Field '{}' not found in name mapping", names.back());
803+
}
804+
805+
const MappedField& field = field_opt.value().get();
806+
if (!field.field_id.has_value()) {
807+
return InvalidSchema("Field ID is missing for field '{}' in name mapping",
808+
names.back());
779809
}
810+
811+
return field_opt.value();
780812
}
781813

782-
Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
783-
const MappedField& field) {
814+
template <typename NodeType>
815+
Status ProcessChildNode(const ::avro::NodePtr& parent_node, size_t child_index,
816+
const std::string& child_name, std::string_view fieldIdPropKy,
817+
const NameMapping& mapping, std::vector<std::string>& names,
818+
NodeType& new_parent_node) {
819+
NamesGuard guard(names, child_name);
820+
ICEBERG_ASSIGN_OR_RAISE(auto mapped_field, FindMappedField(mapping, names));
821+
822+
ICEBERG_ASSIGN_OR_RAISE(
823+
auto new_child_node,
824+
MakeAvroNodeWithFieldIds(parent_node->leafAt(child_index), mapping, names));
825+
826+
::avro::CustomAttributes attributes;
827+
attributes.addAttribute(std::string(fieldIdPropKy),
828+
std::to_string(*mapped_field.get().field_id),
829+
/*addQuote=*/false);
830+
if (parent_node->customAttributes() > child_index) {
831+
CopyCustomAttributes(parent_node->customAttributesAt(child_index), attributes);
832+
}
833+
834+
new_parent_node->addLeaf(new_child_node);
835+
new_parent_node->addCustomAttributesForField(attributes);
836+
return {};
837+
}
838+
839+
Result<::avro::NodePtr> MakeRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
840+
const NameMapping& mapping,
841+
std::vector<std::string>& names) {
784842
auto new_record_node = std::make_shared<::avro::NodeRecord>();
785843
new_record_node->setName(original_node->name());
786844

@@ -792,57 +850,18 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
792850
const std::string& field_name = original_node->nameAt(i);
793851
::avro::NodePtr field_node = original_node->leafAt(i);
794852

795-
// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
796-
// Try to find nested field by name
797-
const MappedField* nested_field = nullptr;
798-
if (field.nested_mapping) {
799-
auto fields_span = field.nested_mapping->fields();
800-
for (const auto& f : fields_span) {
801-
if (f.names.find(field_name) != f.names.end()) {
802-
nested_field = &f;
803-
break;
804-
}
805-
}
806-
}
807-
808-
if (!nested_field) {
809-
return InvalidSchema("Field '{}' not found in nested mapping", field_name);
810-
}
811-
812-
if (!nested_field->field_id.has_value()) {
813-
return InvalidSchema("Field ID is missing for field '{}' in nested mapping",
814-
field_name);
815-
}
816-
817-
// Preserve existing custom attributes for this field
818-
::avro::CustomAttributes attributes;
819-
if (i < original_node->customAttributes()) {
820-
// Copy all existing attributes from the original node
821-
for (const auto& attr_pair : original_node->customAttributesAt(i).attributes()) {
822-
// Copy each existing attribute to preserve original metadata
823-
attributes.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);
824-
}
825-
}
826-
827-
// Add field ID attribute to the new node (preserving existing attributes)
828-
attributes.addAttribute(std::string(kFieldIdProp),
829-
std::to_string(nested_field->field_id.value()),
830-
/*addQuote=*/false);
853+
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, i, field_name, kFieldIdProp,
854+
mapping, names, new_record_node));
831855

832-
new_record_node->addCustomAttributesForField(attributes);
833-
834-
// Recursively apply field IDs to nested fields
835-
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
836-
MakeAvroNodeWithFieldIds(field_node, *nested_field));
837856
new_record_node->addName(field_name);
838-
new_record_node->addLeaf(new_nested_node);
839857
}
840858

841859
return new_record_node;
842860
}
843861

844-
Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
845-
const MappedField& field) {
862+
Result<::avro::NodePtr> MakeArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
863+
const NameMapping& mapping,
864+
std::vector<std::string>& names) {
846865
if (original_node->leaves() != 1) {
847866
return InvalidSchema("Array type must have exactly one leaf");
848867
}
@@ -851,130 +870,42 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi
851870

852871
// Check if this is a map represented as array
853872
if (HasMapLogicalType(original_node)) {
854-
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
855-
MakeAvroNodeWithFieldIds(original_node->leafAt(0), field));
873+
ICEBERG_ASSIGN_OR_RAISE(
874+
auto new_element_node,
875+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), mapping, names));
856876
new_array_node->addLeaf(new_element_node);
857877

858-
// Check and add custom attributes
859878
if (original_node->customAttributes() > 0) {
860-
::avro::CustomAttributes merged_attributes;
861-
const auto& original_attrs = original_node->customAttributesAt(0);
862-
CopyCustomAttributes(original_attrs, merged_attributes);
863-
// Add merged attributes if we found any
864-
if (merged_attributes.attributes().size() > 0) {
865-
new_array_node->addCustomAttributesForField(merged_attributes);
866-
}
879+
new_array_node->addCustomAttributesForField(original_node->customAttributesAt(0));
867880
}
868881

869882
return new_array_node;
870883
}
871884

872-
// For regular arrays, use the first field from nested mapping as element field
873-
if (!field.nested_mapping || field.nested_mapping->fields().empty()) {
874-
return InvalidSchema("Array type requires nested mapping with element field");
875-
}
876-
877-
const auto& element_field = field.nested_mapping->fields()[0];
878-
879-
if (!element_field.field_id.has_value()) {
880-
return InvalidSchema("Field ID is missing for element field in array");
881-
}
882-
883-
ICEBERG_ASSIGN_OR_RAISE(
884-
auto new_element_node,
885-
MakeAvroNodeWithFieldIds(original_node->leafAt(0), element_field));
886-
new_array_node->addLeaf(new_element_node);
887-
888-
// Create merged custom attributes with element field ID
889-
::avro::CustomAttributes merged_attributes;
890-
891-
// First add our element field ID (highest priority)
892-
merged_attributes.addAttribute(std::string(kElementIdProp),
893-
std::to_string(*element_field.field_id),
894-
/*addQuote=*/false);
895-
896-
// Then merge any custom attributes from original node
897-
if (original_node->customAttributes() > 0) {
898-
const auto& original_attrs = original_node->customAttributesAt(0);
899-
CopyCustomAttributes(original_attrs, merged_attributes);
900-
}
901-
902-
// Add all attributes at once
903-
new_array_node->addCustomAttributesForField(merged_attributes);
885+
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 0, "element", kElementIdProp,
886+
mapping, names, new_array_node));
904887

905888
return new_array_node;
906889
}
907890

908-
Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
909-
const MappedField& field) {
891+
Result<::avro::NodePtr> MakeMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
892+
const NameMapping& mapping,
893+
std::vector<std::string>& names) {
910894
if (original_node->leaves() != 2) {
911895
return InvalidSchema("Map type must have exactly two leaves");
912896
}
913897

914898
auto new_map_node = std::make_shared<::avro::NodeMap>();
915-
916-
// For map types, we need to extract key and value field mappings from the nested
917-
// mapping
918-
if (!field.nested_mapping) {
919-
return InvalidSchema("Map type requires nested mapping for key and value fields");
920-
}
921-
922-
// For map types, use the first two fields from nested mapping as key and value
923-
if (!field.nested_mapping || field.nested_mapping->fields().size() < 2) {
924-
return InvalidSchema("Map type requires nested mapping with key and value fields");
925-
}
926-
927-
const auto& key_mapped_field = field.nested_mapping->fields()[0];
928-
const auto& value_mapped_field = field.nested_mapping->fields()[1];
929-
930-
if (!key_mapped_field.field_id || !value_mapped_field.field_id) {
931-
return InvalidSchema("Map key and value fields must have field IDs");
932-
}
933-
934-
// Add key and value nodes
935-
ICEBERG_ASSIGN_OR_RAISE(
936-
auto new_key_node,
937-
MakeAvroNodeWithFieldIds(original_node->leafAt(0), key_mapped_field));
938-
ICEBERG_ASSIGN_OR_RAISE(
939-
auto new_value_node,
940-
MakeAvroNodeWithFieldIds(original_node->leafAt(1), value_mapped_field));
941-
new_map_node->addLeaf(new_key_node);
942-
new_map_node->addLeaf(new_value_node);
943-
944-
// Create key and value attributes
945-
::avro::CustomAttributes key_attributes;
946-
::avro::CustomAttributes value_attributes;
947-
948-
// Add required field IDs
949-
key_attributes.addAttribute(std::string(kKeyIdProp),
950-
std::to_string(*key_mapped_field.field_id),
951-
/*addQuote=*/false);
952-
value_attributes.addAttribute(std::string(kValueIdProp),
953-
std::to_string(*value_mapped_field.field_id),
954-
/*addQuote=*/false);
955-
956-
// Merge custom attributes from original node if they exist
957-
if (original_node->customAttributes() > 0) {
958-
// Merge attributes for key (index 0)
959-
const auto& original_key_attrs = original_node->customAttributesAt(0);
960-
CopyCustomAttributes(original_key_attrs, key_attributes);
961-
962-
// Merge attributes for value (index 1)
963-
if (original_node->customAttributes() > 1) {
964-
const auto& original_value_attrs = original_node->customAttributesAt(1);
965-
CopyCustomAttributes(original_value_attrs, value_attributes);
966-
}
967-
}
968-
969-
// Add the merged attributes to the new map node
970-
new_map_node->addCustomAttributesForField(key_attributes);
971-
new_map_node->addCustomAttributesForField(value_attributes);
972-
899+
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 0, "key", kKeyIdProp, mapping,
900+
names, new_map_node));
901+
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 1, "value", kValueIdProp,
902+
mapping, names, new_map_node));
973903
return new_map_node;
974904
}
975905

976-
Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
977-
const MappedField& field) {
906+
Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
907+
const NameMapping& mapping,
908+
std::vector<std::string>& names) {
978909
if (original_node->leaves() != 2) {
979910
return InvalidSchema("Union type must have exactly two branches");
980911
}
@@ -987,14 +918,16 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
987918

988919
if (branch_0_is_null && !branch_1_is_null) {
989920
// branch_0 is null, branch_1 is not null
990-
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, MakeAvroNodeWithFieldIds(branch_1, field));
921+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
922+
MakeAvroNodeWithFieldIds(branch_1, mapping, names));
991923
auto new_union_node = std::make_shared<::avro::NodeUnion>();
992924
new_union_node->addLeaf(branch_0); // null branch
993925
new_union_node->addLeaf(new_branch_1);
994926
return new_union_node;
995927
} else if (!branch_0_is_null && branch_1_is_null) {
996928
// branch_0 is not null, branch_1 is null
997-
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, MakeAvroNodeWithFieldIds(branch_0, field));
929+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
930+
MakeAvroNodeWithFieldIds(branch_0, mapping, names));
998931
auto new_union_node = std::make_shared<::avro::NodeUnion>();
999932
new_union_node->addLeaf(new_branch_0);
1000933
new_union_node->addLeaf(branch_1); // null branch
@@ -1008,19 +941,18 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
1008941
}
1009942
}
1010943

1011-
} // namespace
1012-
1013944
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1014-
const MappedField& mapped_field) {
945+
const NameMapping& mapping,
946+
std::vector<std::string>& names) {
1015947
switch (original_node->type()) {
1016948
case ::avro::AVRO_RECORD:
1017-
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
949+
return MakeRecordNodeWithFieldIds(original_node, mapping, names);
1018950
case ::avro::AVRO_ARRAY:
1019-
return CreateArrayNodeWithFieldIds(original_node, mapped_field);
951+
return MakeArrayNodeWithFieldIds(original_node, mapping, names);
1020952
case ::avro::AVRO_MAP:
1021-
return CreateMapNodeWithFieldIds(original_node, mapped_field);
953+
return MakeMapNodeWithFieldIds(original_node, mapping, names);
1022954
case ::avro::AVRO_UNION:
1023-
return CreateUnionNodeWithFieldIds(original_node, mapped_field);
955+
return MakeUnionNodeWithFieldIds(original_node, mapping, names);
1024956
case ::avro::AVRO_BOOL:
1025957
case ::avro::AVRO_INT:
1026958
case ::avro::AVRO_LONG:
@@ -1039,11 +971,12 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original
1039971
}
1040972
}
1041973

974+
} // namespace
975+
1042976
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1043977
const NameMapping& mapping) {
1044-
MappedField mapped_field;
1045-
mapped_field.nested_mapping = std::make_shared<MappedFields>(mapping.AsMappedFields());
1046-
return MakeAvroNodeWithFieldIds(original_node, mapped_field);
978+
std::vector<std::string> names;
979+
return MakeAvroNodeWithFieldIds(original_node, mapping, names);
1047980
}
1048981

1049982
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,6 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
149149
/// \return True if the node has a map logical type, false otherwise.
150150
bool HasMapLogicalType(const ::avro::NodePtr& node);
151151

152-
/// \brief Create a new Avro node with field IDs from name mapping.
153-
/// \param original_node The original Avro node to copy.
154-
/// \param mapped_field The mapped field to apply field IDs from.
155-
/// \return A new Avro node with field IDs applied, or an error.
156-
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
157-
const MappedField& mapped_field);
158-
159152
/// \brief Create a new Avro node with field IDs from name mapping.
160153
/// \param original_node The original Avro node to copy.
161154
/// \param mapping The name mapping to apply field IDs from.

0 commit comments

Comments
 (0)