Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 97 additions & 164 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,15 +772,73 @@ Result<SchemaProjection> Project(const Schema& expected_schema,

namespace {

class NamesGuard {
public:
NamesGuard(std::vector<std::string>& names, const std::string& name) : names_(names) {
names_.push_back(name);
}
~NamesGuard() { names_.pop_back(); }

private:
std::vector<std::string>& names_;
};

// Forward declaration
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping,
std::vector<std::string>& names);

void CopyCustomAttributes(const ::avro::CustomAttributes& source,
::avro::CustomAttributes& target) {
for (const auto& attr_pair : source.attributes()) {
target.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);
for (const auto& [key, value] : source.attributes()) { // NOLINT(modernize-type-traits)
target.addAttribute(key, value, /*addQuote=*/false);
}
}

Result<MappedFieldConstRef> FindMappedField(const NameMapping& mapping,
const std::vector<std::string>& names) {
auto field_opt = mapping.Find(names);
if (!field_opt.has_value()) {
return InvalidSchema("Field '{}' not found in name mapping", names.back());
}

const MappedField& field = field_opt.value().get();
if (!field.field_id.has_value()) {
return InvalidSchema("Field ID is missing for field '{}' in name mapping",
names.back());
}

return field_opt.value();
}

Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
template <typename NodeType>
Status ProcessChildNode(const ::avro::NodePtr& parent_node, size_t child_index,
const std::string& child_name, std::string_view fieldIdPropKy,
const NameMapping& mapping, std::vector<std::string>& names,
NodeType& new_parent_node) {
NamesGuard guard(names, child_name);
ICEBERG_ASSIGN_OR_RAISE(auto mapped_field, FindMappedField(mapping, names));

ICEBERG_ASSIGN_OR_RAISE(
auto new_child_node,
MakeAvroNodeWithFieldIds(parent_node->leafAt(child_index), mapping, names));

::avro::CustomAttributes attributes;
attributes.addAttribute(std::string(fieldIdPropKy),
std::to_string(*mapped_field.get().field_id),
/*addQuote=*/false);
if (parent_node->customAttributes() > child_index) {
CopyCustomAttributes(parent_node->customAttributesAt(child_index), attributes);
}

new_parent_node->addLeaf(new_child_node);
new_parent_node->addCustomAttributesForField(attributes);
return {};
}

Result<::avro::NodePtr> MakeRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping,
std::vector<std::string>& names) {
auto new_record_node = std::make_shared<::avro::NodeRecord>();
new_record_node->setName(original_node->name());

Expand All @@ -792,57 +850,18 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
const std::string& field_name = original_node->nameAt(i);
::avro::NodePtr field_node = original_node->leafAt(i);

// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
// Try to find nested field by name
const MappedField* nested_field = nullptr;
if (field.nested_mapping) {
auto fields_span = field.nested_mapping->fields();
for (const auto& f : fields_span) {
if (f.names.find(field_name) != f.names.end()) {
nested_field = &f;
break;
}
}
}

if (!nested_field) {
return InvalidSchema("Field '{}' not found in nested mapping", field_name);
}

if (!nested_field->field_id.has_value()) {
return InvalidSchema("Field ID is missing for field '{}' in nested mapping",
field_name);
}

// Preserve existing custom attributes for this field
::avro::CustomAttributes attributes;
if (i < original_node->customAttributes()) {
// Copy all existing attributes from the original node
for (const auto& attr_pair : original_node->customAttributesAt(i).attributes()) {
// Copy each existing attribute to preserve original metadata
attributes.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);
}
}

// Add field ID attribute to the new node (preserving existing attributes)
attributes.addAttribute(std::string(kFieldIdProp),
std::to_string(nested_field->field_id.value()),
/*addQuote=*/false);
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, i, field_name, kFieldIdProp,
mapping, names, new_record_node));

new_record_node->addCustomAttributesForField(attributes);

// Recursively apply field IDs to nested fields
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
MakeAvroNodeWithFieldIds(field_node, *nested_field));
new_record_node->addName(field_name);
new_record_node->addLeaf(new_nested_node);
}

return new_record_node;
}

Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
Result<::avro::NodePtr> MakeArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping,
std::vector<std::string>& names) {
if (original_node->leaves() != 1) {
return InvalidSchema("Array type must have exactly one leaf");
}
Expand All @@ -851,130 +870,42 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi

// Check if this is a map represented as array
if (HasMapLogicalType(original_node)) {
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
MakeAvroNodeWithFieldIds(original_node->leafAt(0), field));
ICEBERG_ASSIGN_OR_RAISE(
auto new_element_node,
MakeAvroNodeWithFieldIds(original_node->leafAt(0), mapping, names));
new_array_node->addLeaf(new_element_node);

// Check and add custom attributes
if (original_node->customAttributes() > 0) {
::avro::CustomAttributes merged_attributes;
const auto& original_attrs = original_node->customAttributesAt(0);
CopyCustomAttributes(original_attrs, merged_attributes);
// Add merged attributes if we found any
if (merged_attributes.attributes().size() > 0) {
new_array_node->addCustomAttributesForField(merged_attributes);
}
new_array_node->addCustomAttributesForField(original_node->customAttributesAt(0));
}

return new_array_node;
}

// For regular arrays, use the first field from nested mapping as element field
if (!field.nested_mapping || field.nested_mapping->fields().empty()) {
return InvalidSchema("Array type requires nested mapping with element field");
}

const auto& element_field = field.nested_mapping->fields()[0];

if (!element_field.field_id.has_value()) {
return InvalidSchema("Field ID is missing for element field in array");
}

ICEBERG_ASSIGN_OR_RAISE(
auto new_element_node,
MakeAvroNodeWithFieldIds(original_node->leafAt(0), element_field));
new_array_node->addLeaf(new_element_node);

// Create merged custom attributes with element field ID
::avro::CustomAttributes merged_attributes;

// First add our element field ID (highest priority)
merged_attributes.addAttribute(std::string(kElementIdProp),
std::to_string(*element_field.field_id),
/*addQuote=*/false);

// Then merge any custom attributes from original node
if (original_node->customAttributes() > 0) {
const auto& original_attrs = original_node->customAttributesAt(0);
CopyCustomAttributes(original_attrs, merged_attributes);
}

// Add all attributes at once
new_array_node->addCustomAttributesForField(merged_attributes);
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 0, "element", kElementIdProp,
mapping, names, new_array_node));

return new_array_node;
}

Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
Result<::avro::NodePtr> MakeMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping,
std::vector<std::string>& names) {
if (original_node->leaves() != 2) {
return InvalidSchema("Map type must have exactly two leaves");
}

auto new_map_node = std::make_shared<::avro::NodeMap>();

// For map types, we need to extract key and value field mappings from the nested
// mapping
if (!field.nested_mapping) {
return InvalidSchema("Map type requires nested mapping for key and value fields");
}

// For map types, use the first two fields from nested mapping as key and value
if (!field.nested_mapping || field.nested_mapping->fields().size() < 2) {
return InvalidSchema("Map type requires nested mapping with key and value fields");
}

const auto& key_mapped_field = field.nested_mapping->fields()[0];
const auto& value_mapped_field = field.nested_mapping->fields()[1];

if (!key_mapped_field.field_id || !value_mapped_field.field_id) {
return InvalidSchema("Map key and value fields must have field IDs");
}

// Add key and value nodes
ICEBERG_ASSIGN_OR_RAISE(
auto new_key_node,
MakeAvroNodeWithFieldIds(original_node->leafAt(0), key_mapped_field));
ICEBERG_ASSIGN_OR_RAISE(
auto new_value_node,
MakeAvroNodeWithFieldIds(original_node->leafAt(1), value_mapped_field));
new_map_node->addLeaf(new_key_node);
new_map_node->addLeaf(new_value_node);

// Create key and value attributes
::avro::CustomAttributes key_attributes;
::avro::CustomAttributes value_attributes;

// Add required field IDs
key_attributes.addAttribute(std::string(kKeyIdProp),
std::to_string(*key_mapped_field.field_id),
/*addQuote=*/false);
value_attributes.addAttribute(std::string(kValueIdProp),
std::to_string(*value_mapped_field.field_id),
/*addQuote=*/false);

// Merge custom attributes from original node if they exist
if (original_node->customAttributes() > 0) {
// Merge attributes for key (index 0)
const auto& original_key_attrs = original_node->customAttributesAt(0);
CopyCustomAttributes(original_key_attrs, key_attributes);

// Merge attributes for value (index 1)
if (original_node->customAttributes() > 1) {
const auto& original_value_attrs = original_node->customAttributesAt(1);
CopyCustomAttributes(original_value_attrs, value_attributes);
}
}

// Add the merged attributes to the new map node
new_map_node->addCustomAttributesForField(key_attributes);
new_map_node->addCustomAttributesForField(value_attributes);

ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 0, "key", kKeyIdProp, mapping,
names, new_map_node));
ICEBERG_RETURN_UNEXPECTED(ProcessChildNode(original_node, 1, "value", kValueIdProp,
mapping, names, new_map_node));
return new_map_node;
}

Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping,
std::vector<std::string>& names) {
if (original_node->leaves() != 2) {
return InvalidSchema("Union type must have exactly two branches");
}
Expand All @@ -987,14 +918,16 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi

if (branch_0_is_null && !branch_1_is_null) {
// branch_0 is null, branch_1 is not null
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, MakeAvroNodeWithFieldIds(branch_1, field));
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
MakeAvroNodeWithFieldIds(branch_1, mapping, names));
auto new_union_node = std::make_shared<::avro::NodeUnion>();
new_union_node->addLeaf(branch_0); // null branch
new_union_node->addLeaf(new_branch_1);
return new_union_node;
} else if (!branch_0_is_null && branch_1_is_null) {
// branch_0 is not null, branch_1 is null
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, MakeAvroNodeWithFieldIds(branch_0, field));
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
MakeAvroNodeWithFieldIds(branch_0, mapping, names));
auto new_union_node = std::make_shared<::avro::NodeUnion>();
new_union_node->addLeaf(new_branch_0);
new_union_node->addLeaf(branch_1); // null branch
Expand All @@ -1008,19 +941,18 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
}
}

} // namespace

Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& mapped_field) {
const NameMapping& mapping,
std::vector<std::string>& names) {
switch (original_node->type()) {
case ::avro::AVRO_RECORD:
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
return MakeRecordNodeWithFieldIds(original_node, mapping, names);
case ::avro::AVRO_ARRAY:
return CreateArrayNodeWithFieldIds(original_node, mapped_field);
return MakeArrayNodeWithFieldIds(original_node, mapping, names);
case ::avro::AVRO_MAP:
return CreateMapNodeWithFieldIds(original_node, mapped_field);
return MakeMapNodeWithFieldIds(original_node, mapping, names);
case ::avro::AVRO_UNION:
return CreateUnionNodeWithFieldIds(original_node, mapped_field);
return MakeUnionNodeWithFieldIds(original_node, mapping, names);
case ::avro::AVRO_BOOL:
case ::avro::AVRO_INT:
case ::avro::AVRO_LONG:
Expand All @@ -1039,11 +971,12 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original
}
}

} // namespace

Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping) {
MappedField mapped_field;
mapped_field.nested_mapping = std::make_shared<MappedFields>(mapping.AsMappedFields());
return MakeAvroNodeWithFieldIds(original_node, mapped_field);
std::vector<std::string> names;
return MakeAvroNodeWithFieldIds(original_node, mapping, names);
}

} // namespace iceberg::avro
7 changes: 0 additions & 7 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
/// \return True if the node has a map logical type, false otherwise.
bool HasMapLogicalType(const ::avro::NodePtr& node);

/// \brief Create a new Avro node with field IDs from name mapping.
/// \param original_node The original Avro node to copy.
/// \param mapped_field The mapped field to apply field IDs from.
/// \return A new Avro node with field IDs applied, or an error.
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& mapped_field);

/// \brief Create a new Avro node with field IDs from name mapping.
/// \param original_node The original Avro node to copy.
/// \param mapping The name mapping to apply field IDs from.
Expand Down
Loading