Skip to content

Commit 7ddeb06

Browse files
Refactor the applying field-ids on name mapping code.
1 parent e52a0e6 commit 7ddeb06

File tree

6 files changed

+392
-357
lines changed

6 files changed

+392
-357
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 20 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
#include <avro/GenericDatum.hh>
3333

3434
#include "iceberg/arrow/arrow_fs_file_io.h"
35-
#include "iceberg/avro/avro_constants.h"
3635
#include "iceberg/avro/avro_data_util_internal.h"
3736
#include "iceberg/avro/avro_schema_util_internal.h"
3837
#include "iceberg/avro/avro_stream_internal.h"
@@ -93,7 +92,7 @@ class AvroBatchReader::Impl {
9392
// Create a base reader without setting reader schema to enable projection.
9493
auto base_reader =
9594
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
96-
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();
95+
::avro::ValidSchema file_schema = base_reader->dataSchema();
9796

9897
// Validate field ids in the file schema.
9998
HasIdVisitor has_id_visitor;
@@ -102,8 +101,25 @@ class AvroBatchReader::Impl {
102101
if (has_id_visitor.HasNoIds()) {
103102
// Apply field IDs based on name mapping if available
104103
if (options.name_mapping) {
105-
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(*options.name_mapping,
106-
file_schema.root().get()));
104+
ICEBERG_ASSIGN_OR_RAISE(
105+
auto new_root_node,
106+
CreateAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
107+
108+
// Create a new schema with the updated root node
109+
auto new_schema = ::avro::ValidSchema(new_root_node);
110+
111+
// Verify that all fields now have IDs after applying the name mapping
112+
HasIdVisitor verify_visitor;
113+
ICEBERG_RETURN_UNEXPECTED(verify_visitor.Visit(new_schema));
114+
if (!verify_visitor.AllHaveIds()) {
115+
// TODO(liuxiaoyu): Print detailed error message with missing field IDs
116+
// information in future
117+
return InvalidSchema(
118+
"Not all fields have field IDs after applying name mapping.");
119+
}
120+
121+
// Update the file schema to use the new schema with field IDs
122+
file_schema = new_schema;
107123
} else {
108124
return InvalidSchema(
109125
"Avro file schema has no field IDs and no name mapping provided");
@@ -203,156 +219,6 @@ class AvroBatchReader::Impl {
203219
return arrow_array;
204220
}
205221

206-
// Apply field IDs to Avro schema nodes based on name mapping
207-
Status ApplyFieldIdsFromNameMapping(const NameMapping& name_mapping,
208-
::avro::Node* node) {
209-
switch (node->type()) {
210-
case ::avro::AVRO_RECORD:
211-
return ApplyFieldIdsToRecord(node, name_mapping);
212-
case ::avro::AVRO_ARRAY:
213-
return ApplyFieldIdsToArray(node, name_mapping);
214-
case ::avro::AVRO_MAP:
215-
return ApplyFieldIdsToMap(node, name_mapping);
216-
case ::avro::AVRO_UNION:
217-
return ApplyFieldIdsToUnion(node, name_mapping);
218-
case ::avro::AVRO_BOOL:
219-
case ::avro::AVRO_INT:
220-
case ::avro::AVRO_LONG:
221-
case ::avro::AVRO_FLOAT:
222-
case ::avro::AVRO_DOUBLE:
223-
case ::avro::AVRO_STRING:
224-
case ::avro::AVRO_BYTES:
225-
case ::avro::AVRO_FIXED:
226-
return {};
227-
case ::avro::AVRO_NULL:
228-
case ::avro::AVRO_ENUM:
229-
default:
230-
return InvalidSchema("Unsupported Avro type for field ID application: {}",
231-
static_cast<int>(node->type()));
232-
}
233-
}
234-
235-
Status ApplyFieldIdsToRecord(::avro::Node* node, const NameMapping& name_mapping) {
236-
for (size_t i = 0; i < node->leaves(); ++i) {
237-
const std::string& field_name = node->nameAt(i);
238-
::avro::Node* field_node = node->leafAt(i).get();
239-
240-
// Try to find field ID by name in the name mapping
241-
if (auto field_ref = name_mapping.Find(field_name)) {
242-
if (field_ref->get().field_id.has_value()) {
243-
// Add field ID attribute to the node
244-
::avro::CustomAttributes attributes;
245-
attributes.addAttribute(std::string(kFieldId),
246-
std::to_string(field_ref->get().field_id.value()),
247-
false);
248-
node->addCustomAttributesForField(attributes);
249-
}
250-
251-
// Recursively apply field IDs to nested fields if they exist
252-
if (field_ref->get().nested_mapping &&
253-
field_node->type() == ::avro::AVRO_RECORD) {
254-
const auto& nested_mapping = field_ref->get().nested_mapping;
255-
auto fields_span = nested_mapping->fields();
256-
std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end());
257-
auto nested_name_mapping = NameMapping::Make(std::move(fields_vector));
258-
ICEBERG_RETURN_UNEXPECTED(
259-
ApplyFieldIdsFromNameMapping(*nested_name_mapping, field_node));
260-
} else {
261-
// Recursively apply field IDs to child nodes (only if not already handled by
262-
// nested mapping)
263-
ICEBERG_RETURN_UNEXPECTED(
264-
ApplyFieldIdsFromNameMapping(name_mapping, field_node));
265-
}
266-
} else {
267-
// Recursively apply field IDs to child nodes even if no mapping found
268-
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(name_mapping, field_node));
269-
}
270-
}
271-
return {};
272-
}
273-
274-
Status ApplyFieldIdsToArray(::avro::Node* node, const NameMapping& name_mapping) {
275-
// TODO(liuxiaoyu): Add debug logging to print node information for troubleshooting
276-
// when array type validation fails
277-
if (node->leaves() != 1) {
278-
return InvalidSchema("Array type must have exactly one leaf");
279-
}
280-
281-
// Check if this is a map represented as array
282-
if (node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
283-
node->logicalType().customLogicalType() != nullptr &&
284-
node->logicalType().customLogicalType()->name() == kMapLogicalType) {
285-
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(0).get());
286-
}
287-
288-
// For regular arrays, try to find element field ID
289-
if (auto element_field = name_mapping.Find(std::string(kElement))) {
290-
if (element_field->get().field_id.has_value()) {
291-
::avro::CustomAttributes attributes;
292-
attributes.addAttribute(std::string(kElementId),
293-
std::to_string(element_field->get().field_id.value()),
294-
false);
295-
node->addCustomAttributesForField(attributes);
296-
}
297-
}
298-
299-
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(0).get());
300-
}
301-
302-
Status ApplyFieldIdsToMap(::avro::Node* node, const NameMapping& name_mapping) {
303-
if (node->leaves() != 2) {
304-
return InvalidSchema("Map type must have exactly two leaves");
305-
}
306-
307-
// Try to find key and value field IDs
308-
if (auto key_field = name_mapping.Find(std::string(kKey))) {
309-
if (key_field->get().field_id.has_value()) {
310-
::avro::CustomAttributes attributes;
311-
attributes.addAttribute(std::string(kKeyId),
312-
std::to_string(key_field->get().field_id.value()), false);
313-
node->addCustomAttributesForField(attributes);
314-
}
315-
}
316-
317-
if (auto value_field = name_mapping.Find(std::string(kValue))) {
318-
if (value_field->get().field_id.has_value()) {
319-
::avro::CustomAttributes attributes;
320-
attributes.addAttribute(std::string(kValueId),
321-
std::to_string(value_field->get().field_id.value()),
322-
false);
323-
node->addCustomAttributesForField(attributes);
324-
}
325-
}
326-
327-
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(1).get());
328-
}
329-
330-
Status ApplyFieldIdsToUnion(::avro::Node* node, const NameMapping& name_mapping) {
331-
if (node->leaves() != 2) {
332-
return InvalidSchema("Union type must have exactly two branches");
333-
}
334-
335-
const auto& branch_0 = node->leafAt(0);
336-
const auto& branch_1 = node->leafAt(1);
337-
338-
bool branch_0_is_null = (branch_0->type() == ::avro::AVRO_NULL);
339-
bool branch_1_is_null = (branch_1->type() == ::avro::AVRO_NULL);
340-
341-
if (branch_0_is_null && !branch_1_is_null) {
342-
// branch_0 is null, branch_1 is not null
343-
return ApplyFieldIdsFromNameMapping(name_mapping, branch_1.get());
344-
} else if (!branch_0_is_null && branch_1_is_null) {
345-
// branch_0 is not null, branch_1 is null
346-
return ApplyFieldIdsFromNameMapping(name_mapping, branch_0.get());
347-
} else if (branch_0_is_null && branch_1_is_null) {
348-
// Both branches are null - this is invalid
349-
return InvalidSchema("Union type cannot have two null branches");
350-
} else {
351-
// Neither branch is null - this is invalid
352-
return InvalidSchema("Union type must have exactly one null branch");
353-
}
354-
}
355-
356222
private:
357223
// Max number of rows in the record batch to read.
358224
int64_t batch_size_{};

0 commit comments

Comments
 (0)