Skip to content

Commit 883255b

Browse files
fix build error and set name_mapping to ReaderOptions
1 parent a74234b commit 883255b

File tree

5 files changed

+114
-46
lines changed

5 files changed

+114
-46
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ if(ICEBERG_BUILD_BUNDLE)
9898
avro/avro_data_util.cc
9999
avro/avro_reader.cc
100100
avro/avro_schema_util.cc
101-
avro/avro_stream_internal.cc
102-
json_internal.cc)
101+
avro/avro_stream_internal.cc)
103102

104103
# Libraries to link with exported libiceberg_bundle.{so,a}.
105104
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/avro/avro_constants.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
24+
namespace iceberg::avro {
25+
26+
// Avro field ID attribute constants
27+
constexpr std::string_view kFieldId = "field-id";
28+
constexpr std::string_view kElementId = "element-id";
29+
constexpr std::string_view kKeyId = "key-id";
30+
constexpr std::string_view kValueId = "value-id";
31+
32+
// Avro logical type constants
33+
constexpr std::string_view kMapLogicalType = "map";
34+
35+
// Name mapping field constants
36+
constexpr std::string_view kElement = "element";
37+
constexpr std::string_view kKey = "key";
38+
constexpr std::string_view kValue = "value";
39+
40+
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.cc

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
#include <nlohmann/json.hpp>
3434

3535
#include "iceberg/arrow/arrow_fs_file_io.h"
36+
#include "iceberg/avro/avro_constants.h"
3637
#include "iceberg/avro/avro_data_util_internal.h"
3738
#include "iceberg/avro/avro_schema_util_internal.h"
3839
#include "iceberg/avro/avro_stream_internal.h"
39-
#include "iceberg/json_internal.h"
4040
#include "iceberg/name_mapping.h"
4141
#include "iceberg/schema_internal.h"
4242
#include "iceberg/util/checked_cast.h"
@@ -102,14 +102,9 @@ class AvroBatchReader::Impl {
102102

103103
if (has_id_visitor.HasNoIds()) {
104104
// Apply field IDs based on name mapping if available
105-
auto name_mapping_iter = options.properties.find("name_mapping");
106-
if (name_mapping_iter != options.properties.end()) {
107-
// Parse name mapping from JSON string
108-
ICEBERG_ASSIGN_OR_RAISE(auto name_mapping,
109-
iceberg::NameMappingFromJson(
110-
nlohmann::json::parse(name_mapping_iter->second)));
111-
ICEBERG_RETURN_UNEXPECTED(
112-
ApplyFieldIdsFromNameMapping(file_schema.root(), *name_mapping));
105+
if (options.name_mapping) {
106+
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(*options.name_mapping,
107+
file_schema.root().get()));
113108
} else {
114109
return NotImplemented(
115110
"Avro file schema has no field IDs and no name mapping provided");
@@ -210,8 +205,8 @@ class AvroBatchReader::Impl {
210205
}
211206

212207
// Apply field IDs to Avro schema nodes based on name mapping
213-
Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node,
214-
const NameMapping& name_mapping) {
208+
Status ApplyFieldIdsFromNameMapping(const NameMapping& name_mapping,
209+
::avro::Node* node) {
215210
switch (node->type()) {
216211
case ::avro::AVRO_RECORD:
217212
return ApplyFieldIdsToRecord(node, name_mapping);
@@ -238,19 +233,19 @@ class AvroBatchReader::Impl {
238233
}
239234
}
240235

241-
Status ApplyFieldIdsToRecord(const ::avro::NodePtr& node,
242-
const NameMapping& name_mapping) {
236+
Status ApplyFieldIdsToRecord(::avro::Node* node, const NameMapping& name_mapping) {
243237
for (size_t i = 0; i < node->leaves(); ++i) {
244238
const std::string& field_name = node->nameAt(i);
245-
::avro::NodePtr field_node = node->leafAt(i);
239+
::avro::Node* field_node = node->leafAt(i).get();
246240

247241
// Try to find field ID by name in the name mapping
248242
if (auto field_ref = name_mapping.Find(field_name)) {
249243
if (field_ref->get().field_id.has_value()) {
250244
// Add field ID attribute to the node
251245
::avro::CustomAttributes attributes;
252-
attributes.addAttribute(
253-
"field-id", std::to_string(field_ref->get().field_id.value()), false);
246+
attributes.addAttribute(std::string(kFieldId),
247+
std::to_string(field_ref->get().field_id.value()),
248+
false);
254249
node->addCustomAttributesForField(attributes);
255250
}
256251

@@ -262,92 +257,101 @@ class AvroBatchReader::Impl {
262257
std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end());
263258
auto nested_name_mapping = NameMapping::Make(std::move(fields_vector));
264259
ICEBERG_RETURN_UNEXPECTED(
265-
ApplyFieldIdsFromNameMapping(field_node, *nested_name_mapping));
260+
ApplyFieldIdsFromNameMapping(*nested_name_mapping, field_node));
266261
} else {
267262
// Recursively apply field IDs to child nodes (only if not already handled by
268263
// nested mapping)
269264
ICEBERG_RETURN_UNEXPECTED(
270-
ApplyFieldIdsFromNameMapping(field_node, name_mapping));
265+
ApplyFieldIdsFromNameMapping(name_mapping, field_node));
271266
}
272267
} else {
273268
// Recursively apply field IDs to child nodes even if no mapping found
274-
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(field_node, name_mapping));
269+
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(name_mapping, field_node));
275270
}
276271
}
277272
return {};
278273
}
279274

280-
Status ApplyFieldIdsToArray(const ::avro::NodePtr& node,
281-
const NameMapping& name_mapping) {
275+
Status ApplyFieldIdsToArray(::avro::Node* node, const NameMapping& name_mapping) {
276+
// TODO(liuxiaoyu): Add debug logging to print node information for troubleshooting
277+
// when array type validation fails
282278
if (node->leaves() != 1) {
283279
return InvalidSchema("Array type must have exactly one leaf");
284280
}
285281

286282
// Check if this is a map represented as array
287283
if (node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
288284
node->logicalType().customLogicalType() != nullptr &&
289-
node->logicalType().customLogicalType()->name() == "map") {
290-
return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping);
285+
node->logicalType().customLogicalType()->name() == kMapLogicalType) {
286+
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(0).get());
291287
}
292288

293289
// For regular arrays, try to find element field ID
294-
if (auto element_field = name_mapping.Find("element")) {
290+
if (auto element_field = name_mapping.Find(std::string(kElement))) {
295291
if (element_field->get().field_id.has_value()) {
296292
::avro::CustomAttributes attributes;
297-
attributes.addAttribute(
298-
"element-id", std::to_string(element_field->get().field_id.value()), false);
293+
attributes.addAttribute(std::string(kElementId),
294+
std::to_string(element_field->get().field_id.value()),
295+
false);
299296
node->addCustomAttributesForField(attributes);
300297
}
301298
}
302299

303-
return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping);
300+
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(0).get());
304301
}
305302

306-
Status ApplyFieldIdsToMap(const ::avro::NodePtr& node,
307-
const NameMapping& name_mapping) {
303+
Status ApplyFieldIdsToMap(::avro::Node* node, const NameMapping& name_mapping) {
308304
if (node->leaves() != 2) {
309305
return InvalidSchema("Map type must have exactly two leaves");
310306
}
311307

312308
// Try to find key and value field IDs
313-
if (auto key_field = name_mapping.Find("key")) {
309+
if (auto key_field = name_mapping.Find(std::string(kKey))) {
314310
if (key_field->get().field_id.has_value()) {
315311
::avro::CustomAttributes attributes;
316-
attributes.addAttribute("key-id",
312+
attributes.addAttribute(std::string(kKeyId),
317313
std::to_string(key_field->get().field_id.value()), false);
318314
node->addCustomAttributesForField(attributes);
319315
}
320316
}
321317

322-
if (auto value_field = name_mapping.Find("value")) {
318+
if (auto value_field = name_mapping.Find(std::string(kValue))) {
323319
if (value_field->get().field_id.has_value()) {
324320
::avro::CustomAttributes attributes;
325-
attributes.addAttribute(
326-
"value-id", std::to_string(value_field->get().field_id.value()), false);
321+
attributes.addAttribute(std::string(kValueId),
322+
std::to_string(value_field->get().field_id.value()),
323+
false);
327324
node->addCustomAttributesForField(attributes);
328325
}
329326
}
330327

331-
return ApplyFieldIdsFromNameMapping(node->leafAt(1), name_mapping);
328+
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(1).get());
332329
}
333330

334-
Status ApplyFieldIdsToUnion(const ::avro::NodePtr& node,
335-
const NameMapping& name_mapping) {
331+
Status ApplyFieldIdsToUnion(::avro::Node* node, const NameMapping& name_mapping) {
336332
if (node->leaves() != 2) {
337333
return InvalidSchema("Union type must have exactly two branches");
338334
}
339335

340336
const auto& branch_0 = node->leafAt(0);
341337
const auto& branch_1 = node->leafAt(1);
342338

343-
if (branch_0->type() == ::avro::AVRO_NULL) {
344-
return ApplyFieldIdsFromNameMapping(branch_1, name_mapping);
339+
bool branch_0_is_null = (branch_0->type() == ::avro::AVRO_NULL);
340+
bool branch_1_is_null = (branch_1->type() == ::avro::AVRO_NULL);
341+
342+
if (branch_0_is_null && !branch_1_is_null) {
343+
// branch_0 is null, branch_1 is not null
344+
return ApplyFieldIdsFromNameMapping(name_mapping, branch_1.get());
345+
} else if (!branch_0_is_null && branch_1_is_null) {
346+
// branch_0 is not null, branch_1 is null
347+
return ApplyFieldIdsFromNameMapping(name_mapping, branch_0.get());
348+
} else if (branch_0_is_null && branch_1_is_null) {
349+
// Both branches are null - this is invalid
350+
return InvalidSchema("Union type cannot have two null branches");
351+
} else {
352+
// Neither branch is null - this is invalid
353+
return InvalidSchema("Union type must have exactly one null branch");
345354
}
346-
if (branch_1->type() == ::avro::AVRO_NULL) {
347-
return ApplyFieldIdsFromNameMapping(branch_0, name_mapping);
348-
}
349-
350-
return InvalidSchema("Union type must have exactly one null branch");
351355
}
352356

353357
private:

src/iceberg/file_reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "iceberg/arrow_c_data.h"
3131
#include "iceberg/file_format.h"
32+
#include "iceberg/name_mapping.h"
3233
#include "iceberg/result.h"
3334
#include "iceberg/type_fwd.h"
3435

@@ -140,6 +141,9 @@ struct ICEBERG_EXPORT ReaderOptions {
140141
/// \brief The filter to apply to the data. Reader implementations may ignore this if
141142
/// the file format does not support filtering.
142143
std::shared_ptr<class Expression> filter;
144+
/// \brief Name mapping for schema evolution compatibility. Used when reading files
145+
/// that may have different field names than the current schema.
146+
std::shared_ptr<class NameMapping> name_mapping;
143147
/// \brief Format-specific or implementation-specific properties.
144148
std::unordered_map<std::string, std::string> properties;
145149
};

test/avro_reader_test.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,25 @@ TEST_F(AvroReaderTest, NameMappingWithNestedFields) {
173173
testing::UnorderedElementsAre("description"));
174174
}
175175

176+
TEST_F(AvroReaderTest, NameMappingFromReaderOptionsWorks) {
177+
// Create a name mapping
178+
auto name_mapping = CreateTestNameMapping();
179+
ASSERT_TRUE(name_mapping != nullptr);
180+
EXPECT_EQ(name_mapping->AsMappedFields().Size(), 3);
181+
182+
// Create reader options with name mapping
183+
ReaderOptions options;
184+
options.name_mapping = std::move(name_mapping);
185+
186+
// Verify that the name mapping is accessible
187+
ASSERT_TRUE(options.name_mapping != nullptr);
188+
EXPECT_EQ(options.name_mapping->AsMappedFields().Size(), 3);
189+
190+
// Test that the name mapping works correctly
191+
auto field_by_id = options.name_mapping->Find(1);
192+
ASSERT_TRUE(field_by_id.has_value());
193+
EXPECT_EQ(field_by_id->get().field_id, 1);
194+
EXPECT_THAT(field_by_id->get().names, testing::UnorderedElementsAre("id"));
195+
}
196+
176197
} // namespace iceberg::avro

0 commit comments

Comments
 (0)