Skip to content

Commit 82a1cd6

Browse files
feat: avro support applying field-ids based on name mapping (#127)
see issue #128 --------- Co-authored-by: Gang Wu <[email protected]>
1 parent cdace37 commit 82a1cd6

File tree

9 files changed

+785
-11
lines changed

9 files changed

+785
-11
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "iceberg/avro/avro_data_util_internal.h"
3737
#include "iceberg/avro/avro_schema_util_internal.h"
3838
#include "iceberg/avro/avro_stream_internal.h"
39+
#include "iceberg/name_mapping.h"
3940
#include "iceberg/schema_internal.h"
4041
#include "iceberg/util/checked_cast.h"
4142
#include "iceberg/util/macros.h"
@@ -92,16 +93,26 @@ class AvroReader::Impl {
9293
// Create a base reader without setting reader schema to enable projection.
9394
auto base_reader =
9495
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
95-
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();
96+
::avro::ValidSchema file_schema = base_reader->dataSchema();
9697

9798
// Validate field ids in the file schema.
9899
HasIdVisitor has_id_visitor;
99100
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
101+
100102
if (has_id_visitor.HasNoIds()) {
101-
// TODO(gangwu): support applying field-ids based on name mapping
102-
return NotImplemented("Avro file schema has no field IDs");
103-
}
104-
if (!has_id_visitor.AllHaveIds()) {
103+
// Apply field IDs based on name mapping if available
104+
if (options.name_mapping) {
105+
ICEBERG_ASSIGN_OR_RAISE(
106+
auto new_root_node,
107+
MakeAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
108+
109+
// Update the file schema to use the new schema with field IDs
110+
file_schema = ::avro::ValidSchema(new_root_node);
111+
} else {
112+
return InvalidSchema(
113+
"Avro file schema has no field IDs and no name mapping provided");
114+
}
115+
} else if (!has_id_visitor.AllHaveIds()) {
105116
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
106117
}
107118

src/iceberg/avro/avro_schema_util.cc

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131

3232
#include "iceberg/avro/avro_register.h"
3333
#include "iceberg/avro/avro_schema_util_internal.h"
34+
#include "iceberg/avro/constants.h"
3435
#include "iceberg/metadata_columns.h"
36+
#include "iceberg/name_mapping.h"
3537
#include "iceberg/schema.h"
3638
#include "iceberg/schema_util_internal.h"
3739
#include "iceberg/util/formatter.h"
@@ -773,4 +775,280 @@ Result<SchemaProjection> Project(const Schema& expected_schema,
773775
return SchemaProjection{std::move(field_projection.children)};
774776
}
775777

778+
namespace {
779+
780+
void CopyCustomAttributes(const ::avro::CustomAttributes& source,
781+
::avro::CustomAttributes& target) {
782+
for (const auto& attr_pair : source.attributes()) {
783+
target.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);
784+
}
785+
}
786+
787+
Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
788+
const MappedField& field) {
789+
auto new_record_node = std::make_shared<::avro::NodeRecord>();
790+
new_record_node->setName(original_node->name());
791+
792+
for (size_t i = 0; i < original_node->leaves(); ++i) {
793+
if (i >= original_node->names()) {
794+
return InvalidSchema("Index {} is out of bounds for names (size: {})", i,
795+
original_node->names());
796+
}
797+
const std::string& field_name = original_node->nameAt(i);
798+
::avro::NodePtr field_node = original_node->leafAt(i);
799+
800+
// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
801+
// Try to find nested field by name
802+
const MappedField* nested_field = nullptr;
803+
if (field.nested_mapping) {
804+
auto fields_span = field.nested_mapping->fields();
805+
for (const auto& f : fields_span) {
806+
if (f.names.find(field_name) != f.names.end()) {
807+
nested_field = &f;
808+
break;
809+
}
810+
}
811+
}
812+
813+
if (!nested_field) {
814+
return InvalidSchema("Field '{}' not found in nested mapping", field_name);
815+
}
816+
817+
if (!nested_field->field_id.has_value()) {
818+
return InvalidSchema("Field ID is missing for field '{}' in nested mapping",
819+
field_name);
820+
}
821+
822+
// Preserve existing custom attributes for this field
823+
::avro::CustomAttributes attributes;
824+
if (i < original_node->customAttributes()) {
825+
// Copy all existing attributes from the original node
826+
for (const auto& attr_pair : original_node->customAttributesAt(i).attributes()) {
827+
// Copy each existing attribute to preserve original metadata
828+
attributes.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);
829+
}
830+
}
831+
832+
// Add field ID attribute to the new node (preserving existing attributes)
833+
attributes.addAttribute(std::string(kFieldIdProp),
834+
std::to_string(nested_field->field_id.value()),
835+
/*addQuote=*/false);
836+
837+
new_record_node->addCustomAttributesForField(attributes);
838+
839+
// Recursively apply field IDs to nested fields
840+
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
841+
MakeAvroNodeWithFieldIds(field_node, *nested_field));
842+
new_record_node->addName(field_name);
843+
new_record_node->addLeaf(new_nested_node);
844+
}
845+
846+
return new_record_node;
847+
}
848+
849+
Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
850+
const MappedField& field) {
851+
if (original_node->leaves() != 1) {
852+
return InvalidSchema("Array type must have exactly one leaf");
853+
}
854+
855+
auto new_array_node = std::make_shared<::avro::NodeArray>();
856+
857+
// Check if this is a map represented as array
858+
if (HasMapLogicalType(original_node)) {
859+
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
860+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), field));
861+
new_array_node->addLeaf(new_element_node);
862+
863+
// Check and add custom attributes
864+
if (original_node->customAttributes() > 0) {
865+
::avro::CustomAttributes merged_attributes;
866+
const auto& original_attrs = original_node->customAttributesAt(0);
867+
CopyCustomAttributes(original_attrs, merged_attributes);
868+
// Add merged attributes if we found any
869+
if (merged_attributes.attributes().size() > 0) {
870+
new_array_node->addCustomAttributesForField(merged_attributes);
871+
}
872+
}
873+
874+
return new_array_node;
875+
}
876+
877+
// For regular arrays, use the first field from nested mapping as element field
878+
if (!field.nested_mapping || field.nested_mapping->fields().empty()) {
879+
return InvalidSchema("Array type requires nested mapping with element field");
880+
}
881+
882+
const auto& element_field = field.nested_mapping->fields()[0];
883+
884+
if (!element_field.field_id.has_value()) {
885+
return InvalidSchema("Field ID is missing for element field in array");
886+
}
887+
888+
ICEBERG_ASSIGN_OR_RAISE(
889+
auto new_element_node,
890+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), element_field));
891+
new_array_node->addLeaf(new_element_node);
892+
893+
// Create merged custom attributes with element field ID
894+
::avro::CustomAttributes merged_attributes;
895+
896+
// First add our element field ID (highest priority)
897+
merged_attributes.addAttribute(std::string(kElementIdProp),
898+
std::to_string(*element_field.field_id),
899+
/*addQuote=*/false);
900+
901+
// Then merge any custom attributes from original node
902+
if (original_node->customAttributes() > 0) {
903+
const auto& original_attrs = original_node->customAttributesAt(0);
904+
CopyCustomAttributes(original_attrs, merged_attributes);
905+
}
906+
907+
// Add all attributes at once
908+
new_array_node->addCustomAttributesForField(merged_attributes);
909+
910+
return new_array_node;
911+
}
912+
913+
Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
914+
const MappedField& field) {
915+
if (original_node->leaves() != 2) {
916+
return InvalidSchema("Map type must have exactly two leaves");
917+
}
918+
919+
auto new_map_node = std::make_shared<::avro::NodeMap>();
920+
921+
// For map types, we need to extract key and value field mappings from the nested
922+
// mapping
923+
if (!field.nested_mapping) {
924+
return InvalidSchema("Map type requires nested mapping for key and value fields");
925+
}
926+
927+
// For map types, use the first two fields from nested mapping as key and value
928+
if (!field.nested_mapping || field.nested_mapping->fields().size() < 2) {
929+
return InvalidSchema("Map type requires nested mapping with key and value fields");
930+
}
931+
932+
const auto& key_mapped_field = field.nested_mapping->fields()[0];
933+
const auto& value_mapped_field = field.nested_mapping->fields()[1];
934+
935+
if (!key_mapped_field.field_id || !value_mapped_field.field_id) {
936+
return InvalidSchema("Map key and value fields must have field IDs");
937+
}
938+
939+
// Add key and value nodes
940+
ICEBERG_ASSIGN_OR_RAISE(
941+
auto new_key_node,
942+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), key_mapped_field));
943+
ICEBERG_ASSIGN_OR_RAISE(
944+
auto new_value_node,
945+
MakeAvroNodeWithFieldIds(original_node->leafAt(1), value_mapped_field));
946+
new_map_node->addLeaf(new_key_node);
947+
new_map_node->addLeaf(new_value_node);
948+
949+
// Create key and value attributes
950+
::avro::CustomAttributes key_attributes;
951+
::avro::CustomAttributes value_attributes;
952+
953+
// Add required field IDs
954+
key_attributes.addAttribute(std::string(kKeyIdProp),
955+
std::to_string(*key_mapped_field.field_id),
956+
/*addQuote=*/false);
957+
value_attributes.addAttribute(std::string(kValueIdProp),
958+
std::to_string(*value_mapped_field.field_id),
959+
/*addQuote=*/false);
960+
961+
// Merge custom attributes from original node if they exist
962+
if (original_node->customAttributes() > 0) {
963+
// Merge attributes for key (index 0)
964+
const auto& original_key_attrs = original_node->customAttributesAt(0);
965+
CopyCustomAttributes(original_key_attrs, key_attributes);
966+
967+
// Merge attributes for value (index 1)
968+
if (original_node->customAttributes() > 1) {
969+
const auto& original_value_attrs = original_node->customAttributesAt(1);
970+
CopyCustomAttributes(original_value_attrs, value_attributes);
971+
}
972+
}
973+
974+
// Add the merged attributes to the new map node
975+
new_map_node->addCustomAttributesForField(key_attributes);
976+
new_map_node->addCustomAttributesForField(value_attributes);
977+
978+
return new_map_node;
979+
}
980+
981+
Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
982+
const MappedField& field) {
983+
if (original_node->leaves() != 2) {
984+
return InvalidSchema("Union type must have exactly two branches");
985+
}
986+
987+
const auto& branch_0 = original_node->leafAt(0);
988+
const auto& branch_1 = original_node->leafAt(1);
989+
990+
bool branch_0_is_null = (branch_0->type() == ::avro::AVRO_NULL);
991+
bool branch_1_is_null = (branch_1->type() == ::avro::AVRO_NULL);
992+
993+
if (branch_0_is_null && !branch_1_is_null) {
994+
// branch_0 is null, branch_1 is not null
995+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, MakeAvroNodeWithFieldIds(branch_1, field));
996+
auto new_union_node = std::make_shared<::avro::NodeUnion>();
997+
new_union_node->addLeaf(branch_0); // null branch
998+
new_union_node->addLeaf(new_branch_1);
999+
return new_union_node;
1000+
} else if (!branch_0_is_null && branch_1_is_null) {
1001+
// branch_0 is not null, branch_1 is null
1002+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, MakeAvroNodeWithFieldIds(branch_0, field));
1003+
auto new_union_node = std::make_shared<::avro::NodeUnion>();
1004+
new_union_node->addLeaf(new_branch_0);
1005+
new_union_node->addLeaf(branch_1); // null branch
1006+
return new_union_node;
1007+
} else if (branch_0_is_null && branch_1_is_null) {
1008+
// Both branches are null - this is invalid
1009+
return InvalidSchema("Union type cannot have two null branches");
1010+
} else {
1011+
// Neither branch is null - this is invalid
1012+
return InvalidSchema("Union type must have exactly one null branch");
1013+
}
1014+
}
1015+
1016+
} // namespace
1017+
1018+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1019+
const MappedField& mapped_field) {
1020+
switch (original_node->type()) {
1021+
case ::avro::AVRO_RECORD:
1022+
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
1023+
case ::avro::AVRO_ARRAY:
1024+
return CreateArrayNodeWithFieldIds(original_node, mapped_field);
1025+
case ::avro::AVRO_MAP:
1026+
return CreateMapNodeWithFieldIds(original_node, mapped_field);
1027+
case ::avro::AVRO_UNION:
1028+
return CreateUnionNodeWithFieldIds(original_node, mapped_field);
1029+
case ::avro::AVRO_BOOL:
1030+
case ::avro::AVRO_INT:
1031+
case ::avro::AVRO_LONG:
1032+
case ::avro::AVRO_FLOAT:
1033+
case ::avro::AVRO_DOUBLE:
1034+
case ::avro::AVRO_STRING:
1035+
case ::avro::AVRO_BYTES:
1036+
case ::avro::AVRO_FIXED:
1037+
// For primitive types, just return a copy
1038+
return original_node;
1039+
case ::avro::AVRO_NULL:
1040+
case ::avro::AVRO_ENUM:
1041+
default:
1042+
return InvalidSchema("Unsupported Avro type for field ID application: {}",
1043+
ToString(original_node));
1044+
}
1045+
}
1046+
1047+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1048+
const NameMapping& mapping) {
1049+
MappedField mapped_field;
1050+
mapped_field.nested_mapping = std::make_shared<MappedFields>(mapping.AsMappedFields());
1051+
return MakeAvroNodeWithFieldIds(original_node, mapped_field);
1052+
}
1053+
7761054
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include <avro/Node.hh>
2525

26+
#include "iceberg/name_mapping.h"
2627
#include "iceberg/result.h"
2728
#include "iceberg/schema_util.h"
2829
#include "iceberg/type.h"
@@ -148,4 +149,18 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
148149
/// \return True if the node has a map logical type, false otherwise.
149150
bool HasMapLogicalType(const ::avro::NodePtr& node);
150151

152+
/// \brief Create a new Avro node with field IDs from name mapping.
153+
/// \param original_node The original Avro node to copy.
154+
/// \param mapped_field The mapped field to apply field IDs from.
155+
/// \return A new Avro node with field IDs applied, or an error.
156+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
157+
const MappedField& mapped_field);
158+
159+
/// \brief Create a new Avro node with field IDs from name mapping.
160+
/// \param original_node The original Avro node to copy.
161+
/// \param mapping The name mapping to apply field IDs from.
162+
/// \return A new Avro node with field IDs applied, or an error.
163+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
164+
const NameMapping& mapping);
165+
151166
} // namespace iceberg::avro

src/iceberg/avro/constants.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
24+
namespace iceberg::avro {
25+
26+
// Avro logical type constants
27+
constexpr std::string_view kMapLogicalType = "map";
28+
29+
// Name mapping field constants
30+
constexpr std::string_view kElement = "element";
31+
constexpr std::string_view kKey = "key";
32+
constexpr std::string_view kValue = "value";
33+
34+
} // namespace iceberg::avro

0 commit comments

Comments
 (0)