From ce30233b7d79f683ac94ca54578596f4a6ff7f51 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 9 Jul 2025 17:16:33 +0800 Subject: [PATCH 01/10] feat: support manifest reader - add manifest reader - refactor of arrow array parser - add basic case --- src/iceberg/avro/avro_schema_util.cc | 8 +- src/iceberg/avro/avro_schema_util_internal.h | 2 + src/iceberg/file_format.h | 10 +- src/iceberg/manifest_entry.cc | 4 +- src/iceberg/manifest_entry.h | 4 +- src/iceberg/manifest_reader_internal.cc | 462 +++++++++++++++--- src/iceberg/util/string_utils.h | 44 ++ test/CMakeLists.txt | 1 + test/manifest_reader_test.cc | 121 +++++ ...357cd7-391f-4df8-aa24-e7e667da8870-m4.avro | Bin 0 -> 7533 bytes 10 files changed, 587 insertions(+), 69 deletions(-) create mode 100644 src/iceberg/util/string_utils.h create mode 100644 test/manifest_reader_test.cc create mode 100644 test/resources/56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 905d9802f..87b1e54bf 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -58,8 +58,7 @@ ::avro::LogicalType GetMapLogicalType() { std::call_once(flag, []() { // Register the map logical type with the avro custom logical type registry. // See https://github.com/apache/avro/pull/3326 for details. - ::avro::CustomLogicalTypeRegistry::instance().registerType( - "map", [](const std::string&) { return std::make_shared(); }); + RegisterLogicalTypes(); }); return ::avro::LogicalType(std::make_shared()); } @@ -73,6 +72,11 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) { } // namespace +void RegisterLogicalTypes() { + ::avro::CustomLogicalTypeRegistry::instance().registerType( + "map", [](const std::string&) { return std::make_shared(); }); +} + std::string ToString(const ::avro::NodePtr& node) { std::stringstream ss; ss << *node; diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index 07e949aef..e1015e514 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -144,4 +144,6 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type); /// \return True if the node has a map logical type, false otherwise. bool HasMapLogicalType(const ::avro::NodePtr& node); +void RegisterLogicalTypes(); + } // namespace iceberg::avro diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h index f4a899749..506eca326 100644 --- a/src/iceberg/file_format.h +++ b/src/iceberg/file_format.h @@ -27,6 +27,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/util/string_utils.h" namespace iceberg { @@ -56,10 +57,11 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) { /// \brief Convert a string to a FileFormatType ICEBERG_EXPORT constexpr Result FileFormatTypeFromString( std::string_view str) noexcept { - if (str == "parquet") return FileFormatType::kParquet; - if (str == "avro") return FileFormatType::kAvro; - if (str == "orc") return FileFormatType::kOrc; - if (str == "puffin") return FileFormatType::kPuffin; + auto lower = internal::StringUtils::to_lower(str); + if (lower == "parquet") return FileFormatType::kParquet; + if (lower == "avro") return FileFormatType::kAvro; + if (lower == "orc") return FileFormatType::kOrc; + if (lower == "puffin") return FileFormatType::kPuffin; return InvalidArgument("Invalid file format type: {}", str); } diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc index f670671d6..4a9aaa2df 100644 --- a/src/iceberg/manifest_entry.cc +++ b/src/iceberg/manifest_entry.cc @@ -40,7 +40,7 @@ std::shared_ptr DataFile::Type(std::shared_ptr partition kContent, kFilePath, kFileFormat, - SchemaField::MakeRequired(102, "partition", std::move(partition_type)), + SchemaField::MakeRequired(102, kPartitionField, std::move(partition_type)), kRecordCount, kFileSize, kColumnSizes, @@ -68,7 +68,7 @@ std::shared_ptr ManifestEntry::TypeFromDataFileType( std::shared_ptr datafile_type) { return std::make_shared(std::vector{ kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber, - SchemaField::MakeRequired(2, "data_file", std::move(datafile_type))}); + SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))}); } } // namespace iceberg diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h index 9293fd64e..36fd8f823 100644 --- a/src/iceberg/manifest_entry.h +++ b/src/iceberg/manifest_entry.h @@ -182,7 +182,8 @@ struct ICEBERG_EXPORT DataFile { inline static const SchemaField kFilePath = SchemaField::MakeRequired( 100, "file_path", iceberg::string(), "Location URI with FS scheme"); inline static const SchemaField kFileFormat = SchemaField::MakeRequired( - 101, "file_format", iceberg::int32(), "File format name: avro, orc, or parquet"); + 101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet"); + inline static const std::string kPartitionField = "partition"; inline static const SchemaField kRecordCount = SchemaField::MakeRequired( 103, "record_count", iceberg::int64(), "Number of records in the file"); inline static const SchemaField kFileSize = SchemaField::MakeRequired( @@ -299,6 +300,7 @@ struct ICEBERG_EXPORT ManifestEntry { SchemaField::MakeOptional(3, "sequence_number", iceberg::int64()); inline static const SchemaField kFileSequenceNumber = SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64()); + inline static const std::string kDataFileField = "data_file"; bool operator==(const ManifestEntry& other) const; diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 92d51c631..7ead8f480 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -24,10 +24,10 @@ #include #include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/file_format.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" -#include "iceberg/schema_internal.h" #include "iceberg/type.h" #include "iceberg/util/macros.h" @@ -38,6 +38,121 @@ namespace iceberg { return InvalidArrowData("Nanoarrow error: {}", error.message); \ } +#define PARSE_PRIMITIVE_FIELD(item, array_view, type) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ + auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \ + item = static_cast(value); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_ENUM_FIELD(item, array_view, type) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ + auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \ + item = static_cast(value); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_STRING_FIELD(item, array_view) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ + auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \ + std::string path_str(value.data, value.size_bytes); \ + item = path_str; \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_BINARY_FIELD(item, array_view) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ + auto buffer = ArrowArrayViewGetBytesUnsafe(array_view, row_idx); \ + item = std::vector(buffer.data.as_char, \ + buffer.data.as_char + buffer.size_bytes); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_PRIMITIVE_VECTOR_FIELD(item, count, array_view) \ + for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) { \ + auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx); \ + auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx + 1); \ + for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ + item.emplace_back( \ + ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx)); \ + } \ + } + +#define PARSE_PRIMITIVE_MAP_FIELD(item, count, array_view) \ + do { \ + if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \ + return InvalidManifest("Field:{} should be a map.", field_name); \ + } \ + auto view_of_map = array_view->children[0]; \ + ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \ + auto view_of_map_key = view_of_map->children[0]; \ + ASSERT_VIEW_TYPE(view_of_map_key, ArrowType::NANOARROW_TYPE_INT32); \ + auto view_of_map_value = view_of_map->children[1]; \ + ASSERT_VIEW_TYPE(view_of_map_value, ArrowType::NANOARROW_TYPE_INT64); \ + for (int64_t row_idx = 0; row_idx < count; row_idx++) { \ + auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \ + auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \ + for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ + auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \ + auto value = ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx); \ + item[key] = value; \ + } \ + } \ + } while (0) + +#define PARSE_BINARY_MAP_FIELD(item, count, array_view) \ + do { \ + if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \ + return InvalidManifest("Field:{} should be a map.", field_name); \ + } \ + auto view_of_map = array_view->children[0]; \ + ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \ + auto view_of_map_key = view_of_map->children[0]; \ + ASSERT_VIEW_TYPE(view_of_map_key, ArrowType::NANOARROW_TYPE_INT32); \ + auto view_of_map_value = view_of_map->children[1]; \ + ASSERT_VIEW_TYPE(view_of_map_value, ArrowType::NANOARROW_TYPE_BINARY); \ + for (int64_t row_idx = 0; row_idx < count; row_idx++) { \ + auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \ + auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \ + for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ + auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \ + auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_map_value, offset_idx); \ + item[key] = std::vector(buffer.data.as_char, \ + buffer.data.as_char + buffer.size_bytes); \ + } \ + } \ + } while (0) + +#define ASSERT_VIEW_TYPE(view, type) \ + if (view->storage_type != type) { \ + return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); \ + } + +#define ASSERT_VIEW_TYPE_AND_CHILDREN(view, type, n_child) \ + if (view->storage_type != type) { \ + return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); \ + } \ + if (view->n_children != n_child) { \ + return InvalidManifest("Sub Field for:{} should have key&value:{} columns.", \ + field_name, n_child); \ + } + Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, std::vector& manifest_files) { auto manifest_count = view_of_column->length; @@ -79,6 +194,8 @@ Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, if (!ArrowArrayViewIsNull(contains_null, partition_idx)) { partition_field_summary.contains_null = ArrowArrayViewGetIntUnsafe(contains_null, partition_idx); + } else { + return InvalidManifestList("contains_null is null at row {}", partition_idx); } if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) { partition_field_summary.contains_nan = @@ -101,9 +218,9 @@ Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, return {}; } -Result> ParseManifestListEntry(ArrowSchema* schema, - ArrowArray* array_in, - const Schema& iceberg_schema) { +Result> ParseManifestList(ArrowSchema* schema, + ArrowArray* array_in, + const Schema& iceberg_schema) { if (schema->n_children != array_in->n_children) { return InvalidManifestList("Columns size not match between schema:{} and array:{}", schema->n_children, array_in->n_children); @@ -134,112 +251,337 @@ Result> ParseManifestListEntry(ArrowSchema* schema, auto field_name = field.value().get().name(); bool required = !field.value().get().optional(); auto view_of_column = array_view.children[idx]; - -#define PARSE_PRIMITIVE_FIELD(item, type) \ - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ - auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ - manifest_files[row_idx].item = static_cast(value); \ - } else if (required) { \ - return InvalidManifestList("Field {} is required but null at row {}", field_name, \ - row_idx); \ - } \ - } - switch (idx) { case 0: - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); - std::string path_str(value.data, value.size_bytes); - manifest_files[row_idx].manifest_path = path_str; - } - } + PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, view_of_column); break; case 1: - PARSE_PRIMITIVE_FIELD(manifest_length, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, view_of_column, + int64_t); break; case 2: - PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, view_of_column, + int32_t); break; case 3: - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); - manifest_files[row_idx].content = static_cast(value); - } - } + PARSE_ENUM_FIELD(manifest_files[row_idx].content, view_of_column, + ManifestFile::Content); break; case 4: - PARSE_PRIMITIVE_FIELD(sequence_number, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column, + int64_t); break; case 5: - PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, view_of_column, + int64_t); break; case 6: - PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, view_of_column, + int64_t); break; case 7: - PARSE_PRIMITIVE_FIELD(added_files_count, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, view_of_column, + int32_t); break; case 8: - PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count, + view_of_column, int32_t); break; case 9: - PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, view_of_column, + int32_t); break; case 10: - PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, view_of_column, + int64_t); break; case 11: - PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, view_of_column, + int64_t); break; case 12: - PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, view_of_column, + int64_t); break; case 13: ICEBERG_RETURN_UNEXPECTED( ParsePartitionFieldSummaryList(view_of_column, manifest_files)); break; case 14: - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); - manifest_files[row_idx].key_metadata = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, view_of_column); + break; + case 15: + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, view_of_column, + int64_t); + break; + default: + return InvalidManifestList("Unsupported field: {} in manifest file.", field_name); + } + } + return manifest_files; +} + +Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx, + std::vector& manifest_entries) { + if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) { + auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back( + Literal::Boolean(value != 0)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT32) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Int(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT64) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Long(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_FLOAT) { + auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Float(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_DOUBLE) { + auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Double(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_STRING) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back( + Literal::String(std::string(value.data, value.size_bytes))); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BINARY) { + auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back( + Literal::Binary(std::vector(buffer.data.as_char, + buffer.data.as_char + buffer.size_bytes))); + } else { + return InvalidManifest("Unsupported field type: {} in data file partition.", + static_cast(view_of_partition->storage_type)); + } + return {}; +} + +Status ParseDataFile(const std::shared_ptr& data_file_schema, + ArrowArrayView* view_of_column, + std::vector& manifest_entries) { + if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidManifest("DataFile field should be a struct."); + } + if (view_of_column->n_children != data_file_schema->fields().size()) { + return InvalidManifest("DataFile schema size:{} not match with ArrayArray columns:{}", + data_file_schema->fields().size(), view_of_column->n_children); + } + for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) { + auto field_name = data_file_schema->GetFieldByIndex(col_idx).value().get().name(); + auto required = !data_file_schema->GetFieldByIndex(col_idx).value().get().optional(); + auto view_of_file_field = view_of_column->children[col_idx]; + auto manifest_entry_count = view_of_file_field->length; + + switch (col_idx) { + case 0: + PARSE_ENUM_FIELD(manifest_entries[row_idx].data_file->content, view_of_file_field, + DataFile::Content); + break; + case 1: + PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path, + view_of_file_field); + break; + case 2: + for (size_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, row_idx); + std::string_view path_str(value.data, value.size_bytes); + ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format, + FileFormatTypeFromString(path_str)); } } break; + case 3: { + if (view_of_file_field->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidManifest("Field:{} should be a list.", field_name); + } + auto view_of_partition = view_of_file_field->children[0]; + for (size_t row_idx = 0; row_idx < view_of_partition->length; row_idx++) { + if (ArrowArrayViewIsNull(view_of_partition, row_idx)) { + break; + } + ICEBERG_RETURN_UNEXPECTED( + ParseLiteral(view_of_partition, row_idx, manifest_entries)); + } + } break; + case 4: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count, + view_of_file_field, int64_t); + break; + case 5: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes, + view_of_file_field, int64_t); + break; + case 6: + // key&value should have the same offset + // HACK(xiao.dong) workaround for arrow bug: + // ArrowArrayViewListChildOffset can not get the correct offset for map + PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes, + manifest_entry_count, view_of_file_field); + break; + case 7: + PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts, + manifest_entry_count, view_of_file_field); + break; + case 8: + PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts, + manifest_entry_count, view_of_file_field); + break; + case 9: + PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts, + manifest_entry_count, view_of_file_field); + break; + case 10: + PARSE_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds, + manifest_entry_count, view_of_file_field); + break; + case 11: + PARSE_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds, + manifest_entry_count, view_of_file_field); + break; + case 12: + PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata, + view_of_file_field); + break; + case 13: + PARSE_PRIMITIVE_VECTOR_FIELD( + manifest_entries[manifest_idx].data_file->split_offsets, manifest_entry_count, + view_of_file_field); + break; + case 14: + PARSE_PRIMITIVE_VECTOR_FIELD( + manifest_entries[manifest_idx].data_file->equality_ids, manifest_entry_count, + view_of_file_field); + break; case 15: - PARSE_PRIMITIVE_FIELD(first_row_id, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id, + view_of_file_field, int32_t); + break; + case 16: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id, + view_of_file_field, int64_t); + break; + case 17: + PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file, + view_of_file_field); + break; + case 18: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset, + view_of_file_field, int64_t); + break; + case 19: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes, + view_of_file_field, int64_t); break; default: - return InvalidManifestList("Unsupported type: {}", field_name); + return InvalidManifest("Unsupported field: {} in data file.", field_name); } } - return manifest_files; + return {}; } -Result> ManifestReaderImpl::Entries() const { return {}; } +Result> ParseManifestEntry(ArrowSchema* schema, + ArrowArray* array_in, + const Schema& iceberg_schema) { + if (schema->n_children != array_in->n_children) { + return InvalidManifest("Columns size not match between schema:{} and array:{}", + schema->n_children, array_in->n_children); + } + if (iceberg_schema.fields().size() != array_in->n_children) { + return InvalidManifest("Columns size not match between schema:{} and array:{}", + iceberg_schema.fields().size(), array_in->n_children); + } + + ArrowError error; + ArrowArrayView array_view; + auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + internal::ArrowArrayViewGuard view_guard(&array_view); + status = ArrowArrayViewSetArray(&array_view, array_in, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + + std::vector manifest_entries; + manifest_entries.resize(array_in->length); + for (size_t i = 0; i < array_in->length; i++) { + manifest_entries[i].data_file = std::make_shared(); + } + + for (int64_t idx = 0; idx < array_in->n_children; idx++) { + const auto& field = iceberg_schema.GetFieldByIndex(idx); + if (!field.has_value()) { + return InvalidManifest("Field not found in schema: {}", idx); + } + auto field_name = field.value().get().name(); + bool required = !field.value().get().optional(); + auto view_of_column = array_view.children[idx]; + + switch (idx) { + case 0: + PARSE_ENUM_FIELD(manifest_entries[row_idx].status, view_of_column, + ManifestStatus); + break; + case 1: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, view_of_column, + int64_t); + break; + case 2: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number, view_of_column, + int64_t); + break; + case 3: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number, + view_of_column, int64_t); + break; + case 4: { + auto data_file_schema = + dynamic_pointer_cast(field.value().get().type()); + ICEBERG_RETURN_UNEXPECTED( + ParseDataFile(data_file_schema, view_of_column, manifest_entries)); + break; + } + default: + return InvalidManifest("Unsupported field: {} in manifest entry.", field_name); + } + } + return manifest_entries; +} + +Result> ManifestReaderImpl::Entries() const { + std::vector manifest_entries; + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); + internal::ArrowSchemaGuard schema_guard(&arrow_schema); + while (true) { + ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next()); + if (result.has_value()) { + internal::ArrowArrayGuard array_guard(&result.value()); + ICEBERG_ASSIGN_OR_RAISE( + auto parse_result, + ParseManifestEntry(&arrow_schema, &result.value(), *schema_)); + manifest_entries.insert(manifest_entries.end(), + std::make_move_iterator(parse_result.begin()), + std::make_move_iterator(parse_result.end())); + } else { + // eof + break; + } + } + return manifest_entries; +} Result> ManifestListReaderImpl::Files() const { std::vector manifest_files; ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); internal::ArrowSchemaGuard schema_guard(&arrow_schema); while (true) { - auto result = reader_->Next(); - if (!result.has_value()) { - return InvalidManifestList("Failed to read manifest list entry:{}", - result.error().message); - } - if (result.value().has_value()) { - internal::ArrowArrayGuard array_guard(&result.value().value()); + ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next()); + if (result.has_value()) { + internal::ArrowArrayGuard array_guard(&result.value()); ICEBERG_ASSIGN_OR_RAISE( - auto entries, - ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_)); + auto parse_result, ParseManifestList(&arrow_schema, &result.value(), *schema_)); manifest_files.insert(manifest_files.end(), - std::make_move_iterator(entries.begin()), - std::make_move_iterator(entries.end())); + std::make_move_iterator(parse_result.begin()), + std::make_move_iterator(parse_result.end())); } else { // eof break; diff --git a/src/iceberg/util/string_utils.h b/src/iceberg/util/string_utils.h new file mode 100644 index 000000000..0d1747ef7 --- /dev/null +++ b/src/iceberg/util/string_utils.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +namespace iceberg::internal { + +class StringUtils { + public: + static std::string to_lower(std::string_view str) { + std::string input(str); + std::transform(input.begin(), input.end(), input.begin(), + [](char c) { return std::tolower(c); }); + return input; + } + + static std::string to_upper(std::string_view str) { + std::string input(str); + std::transform(input.begin(), input.end(), input.begin(), + [](char c) { return std::toupper(c); }); + return input; + } +}; + +} // namespace iceberg::internal diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0597d766d..57dbe890c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -78,6 +78,7 @@ if(ICEBERG_BUILD_BUNDLE) avro_schema_test.cc avro_stream_test.cc manifest_list_reader_test.cc + manifest_reader_test.cc test_common.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) diff --git a/test/manifest_reader_test.cc b/test/manifest_reader_test.cc new file mode 100644 index 000000000..e312ddf36 --- /dev/null +++ b/test/manifest_reader_test.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_reader.h" + +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/avro/avro_reader.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/schema.h" +#include "temp_file_test_base.h" +#include "test_common.h" + +namespace iceberg { + +class ManifestReaderTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { avro::AvroReader::Register(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + file_io_ = std::make_shared(local_fs_); + + avro::RegisterLogicalTypes(); + } + + std::vector prepare_manifest_entries() { + std::vector manifest_entries; + std::string test_dir_prefix = "/tmp/db/db/iceberg_test/data/"; + std::vector paths = { + "order_ts_hour=2021-01-27-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00001.parquet", + "order_ts_hour=2024-01-27-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00002.parquet", + "order_ts_hour=2023-01-26-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00003.parquet", + "order_ts_hour=2021-01-26-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00004.parquet"}; + std::vector partitions = {447696, 473976, 465192, 447672}; + std::vector>> bounds = { + {{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x12, 0xe2}}, + {4, {0xc0, 'y', 0xe7, 0x98, 0xd6, 0xb9, 0x05, 0x00}}}, + {{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x12, 0xe3}}, + {4, {0xc0, 0x19, '#', '=', 0xe2, 0x0f, 0x06, 0x00}}}, + {{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x0e, '"'}}, + {4, {0xc0, 0xd9, '7', 0x93, 0x1f, 0xf3, 0x05, 0x00}}}, + {{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x0e, '!'}}, + {4, {0xc0, 0x19, 0x10, '{', 0xc2, 0xb9, 0x05, 0x00}}}, + }; + for (int i = 0; i < 4; ++i) { + ManifestEntry entry; + entry.status = ManifestStatus::kAdded; + entry.snapshot_id = 6387266376565973956; + entry.data_file = std::make_shared(); + entry.data_file->file_path = test_dir_prefix + paths[i]; + entry.data_file->file_format = FileFormatType::kParquet; + entry.data_file->partition.emplace_back(Literal::Int(partitions[i])); + entry.data_file->record_count = 1; + entry.data_file->file_size_in_bytes = 1375; + entry.data_file->column_sizes = {{1, 49}, {2, 49}, {3, 49}, {4, 49}}; + entry.data_file->value_counts = {{1, 1}, {2, 1}, {3, 1}, {4, 1}}; + entry.data_file->null_value_counts = {{1, 0}, {2, 0}, {3, 0}, {4, 0}}; + entry.data_file->split_offsets = {4}; + entry.data_file->sort_order_id = 0; + entry.data_file->upper_bounds = bounds[i]; + entry.data_file->lower_bounds = bounds[i]; + manifest_entries.emplace_back(entry); + } + return manifest_entries; + } + + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; +}; + +TEST_F(ManifestReaderTest, BasicTest) { + iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true); + auto partition_schema = + std::make_shared(std::vector({partition_field})); + std::string path = GetResourcePath("56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro"); + auto manifest_reader_result = + ManifestReader::MakeReader(path, file_io_, partition_schema); + ASSERT_EQ(manifest_reader_result.has_value(), true) + << manifest_reader_result.error().message; + auto manifest_reader = std::move(manifest_reader_result.value()); + auto read_result = manifest_reader->Entries(); + ASSERT_EQ(read_result.has_value(), true) << read_result.error().message; + + auto expected_entries = prepare_manifest_entries(); + ASSERT_EQ(read_result.value(), expected_entries); +} + +} // namespace iceberg diff --git a/test/resources/56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro b/test/resources/56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro new file mode 100644 index 0000000000000000000000000000000000000000..c671dfdf08df90d030f38f5ab0a9681c7875d6ca GIT binary patch literal 7533 zcmcIpZD<@t7;clQwN|P1g9-(Qjbg-$z1?lnBv4Ueg2kkbw*D&NI=461tlita?(RjS zMC%t??2i(~s-V`O^>+mkLB*D;h!sl-E&8hz1rhu}i?pKPJ2Sg8b9=X&>or0tbZ_SU zc;3%vx~2H=(p4)1Ym!v({IbWjaJovgK`jXVniXm}jsMbPm$borPAl8QDFxc#_Qy5) zMbBw2uF&(IUm|`1zU4HZ?5^29fuGCR2}mv$>3Hr=Z6?QW8ser^3qr4QQI}0|nXAPW zujW!?(^Qv;WmmA%t><&dxLp{rJ|QOzGGv5yg#;n4RKv%#nTeSj@s#h~$05iS+%+GV&`Aa3YS!*>P2=m-c6YgBUtz0F)<=Q!*Xv-qrwtHGofa-{i%?V4M2 z9JtQ_p2%q>QpPnW)CTGINF$S^tCAo}#I;DltyPM|7wTJ=W-8LkwnGZRC5v3Ih%Dg{ z7pPoeS}MrC0cRS4wLCWj*u{N~BU|z;NZ29%V}QIe3MUDong&sRBUPJ2`o^jk*rDyY zs6-qRk^mBCBlw3MJDiTc#acPIrfNL{q>95ysU1qnh6;fAFat(C3o``|J-7)yVuzDx zWGiC%L_L<_*YkM=6(+ar`4z1Am5@9Ft{{E}(YzT%G*@%T^DX)ps$w5vPw%T*)Z~B@ zA@g2XpN_qrZ;n1M%`hlTdbK9$MPreft9kHyVkSmwJhlkY0HLr}bx4lGg;5a)q=X<= zG?2n}6jX_oaMDnqt#H5k%H|Oiqnuj z0!Ut^v4U-0h#i)DD1hTse0*Gujhb1O=hP~0fgYDq8{XWbcC(9687X}UZYpR_VW%cbHTF_*LhoncbiAFmg`I7D%v5j{nrV#2 z7_L&RxrfLuq0oar6d?uX~0+qot zzdfi}KIb5lET>k2hJ)x7K-evz&3JP*cB+x|R)Ga|5dk!G3Sjeok)3OX+MX87HN$8t z*BDYvIOHJTDAF@cSKG3S-9-n>><6&Sfi|+uY$$@PY%>Si1dHwsv|b^v5JOG0M_$31 z$!4m}!|@)01+WO1!*UrnoJ;6wMjy?y4Q)VY^raVSlg?~xXbU^$FJk^?f@Jh6*3_!i z#RZ9WGqas;W~hxKQH)JMGTUos%->c(QnMYpNTEVPOt*xFOwzbFvl=SDScI1)9)$Jw@wry+M7_py7D?wY&-}S#|*Pm<=to>0YWO!!|XI1J4JvBwehB ziy0vTv=z?6#tPIDbY!YnuXbafl!;HtO{vhxBI33}DK6c|g|ZQtc+-eE32v2;6EEo9 zjT`#ywC`cQi_%>XHcuQP41+eJ3$QRoHe{O(0iRL#a5}iIXVlX2fVEx&pgK4tYxz3g zGe!IWNJ)pGi8Z%}uDOst9r?AkMW7|`j9BTL>qPFg#^!@m>dw`_!ba|(p zss0Ths3WKHYQ{5wWb%#Vu_XbL!ZT*nrWE-aq2`M;Ky|(mR8|DaqlhVuk`%PLu*IjO zA}o0sP6kaxMpNuaP{=HDDmBRGAErUj($;j0v`AB>J{cyF{ZL)awJjVbAebWIlVnxO ztWRE+_=~_wrBAW~5`0vepk`k@HuxJtHb0e0WJ2;PNifq^RHZUisij}Qnc%2WejS=3 zM3e>QG#J5r3hEo!Kj4vuKiIGhBl-z+@P%VAMuPt;RZz5`rE-8;WR%i}0pSCaE4hzU z(3%e*h%%ZcJ{XX8i{nE!)e}y^t7qKUo3Xl!^vr;&PxA~4^in-*;9$ZH6Vf7@Pfh67 zMS5XEBBgj?Lg+5tiv`J%@XP2GekFL&Mfzm)rTSz5cCk-I+r{iDntkYzN*1gfm9|Lp zW!Wbfij>?^$r$M+V=5)nzWFMl5aye5h(SK6+#$t$O&GviOeTqtYUJ!n@`V&VqB}Zv zK;LTX_i`8;(FbTT(F;<1LJF*lXcIyq&67Ep;0{t)^B^|04iZsX$D8;9#$}TWVdr|; z%91=H#n5Pm@L1LJ2`N<_!cD0|n()Nh^nXzZAO-Met=M|l&i~3%_+PlFAtv{i(8q2- z_c6$h=>0u_XLyUSF$(K|^4iEA+OLT=)#Y>nUNZA!@sBq?xbC~(?)vD!^REnFSpM@e ztN8OzZ#{v({`JcH`wSzW|F}56?yH^?TQ+sPef)>{cjo8Lou8dQyZYakkImlu?>}=d z&(EEoJ$ruNKPR>AU7cM+6P-g7UBN_W=S=6S6TK(L^s&+25AQQh_v-rL`Cff&?9|xk zQvao^AMAPQ>e)S`L)T4oPV9ep=#!_9%^f{CzxvZ9UmpBy{Pe0rmmT=#)xY){yYsgm zzUA<(yKm8#yf&H}b?#nqth58~u+Hq*u_Rb_=i&UGr7!MzbIr4#f1#awVd%`=KmK{s z>8baPRqws<)ZQx{ZU3R}?$PcQE4ua_JpcQt*`o&^U3u=^ci#V{>xsV0&tCt)9o_qm z+=EBT<7=#CpFelojmNGS?(IB&;I=jR(;v2W{QbwF3#AL+p4fKJ__ZHAeRTia!TgcY f8x9=rUb$j;{MoCv-TeLDb=~`KIPy$K1CIXzI~L%e literal 0 HcmV?d00001 From 1b41460dd510dea14aa0f2fc879b11517c0116fe Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 22 Jul 2025 11:44:05 +0800 Subject: [PATCH 02/10] add string util case --- src/iceberg/file_format.h | 4 +-- src/iceberg/util/string_utils.h | 15 ++++++----- test/CMakeLists.txt | 3 ++- test/string_utils_test.cc | 44 +++++++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 11 deletions(-) create mode 100644 test/string_utils_test.cc diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h index 506eca326..409867545 100644 --- a/src/iceberg/file_format.h +++ b/src/iceberg/file_format.h @@ -55,9 +55,9 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) { } /// \brief Convert a string to a FileFormatType -ICEBERG_EXPORT constexpr Result FileFormatTypeFromString( +ICEBERG_EXPORT Result FileFormatTypeFromString( std::string_view str) noexcept { - auto lower = internal::StringUtils::to_lower(str); + auto lower = internal::StringUtils::ToLower(str); if (lower == "parquet") return FileFormatType::kParquet; if (lower == "avro") return FileFormatType::kAvro; if (lower == "orc") return FileFormatType::kOrc; diff --git a/src/iceberg/util/string_utils.h b/src/iceberg/util/string_utils.h index 0d1747ef7..27b85cd0b 100644 --- a/src/iceberg/util/string_utils.h +++ b/src/iceberg/util/string_utils.h @@ -20,24 +20,23 @@ #pragma once #include +#include #include namespace iceberg::internal { class StringUtils { public: - static std::string to_lower(std::string_view str) { + static std::string ToLower(std::string_view str) { std::string input(str); - std::transform(input.begin(), input.end(), input.begin(), - [](char c) { return std::tolower(c); }); - return input; + return input | std::views::transform([](char c) { return std::tolower(c); }) | + std::ranges::to(); } - static std::string to_upper(std::string_view str) { + static std::string ToUpper(std::string_view str) { std::string input(str); - std::transform(input.begin(), input.end(), input.begin(), - [](char c) { return std::toupper(c); }); - return input; + return input | std::views::transform([](char c) { return std::toupper(c); }) | + std::ranges::to(); } }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 57dbe890c..485efd4cc 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -66,7 +66,8 @@ target_link_libraries(json_serde_test PRIVATE iceberg_static GTest::gtest_main add_test(NAME json_serde_test COMMAND json_serde_test) add_executable(util_test) -target_sources(util_test PRIVATE formatter_test.cc config_test.cc visit_type_test.cc) +target_sources(util_test PRIVATE formatter_test.cc config_test.cc visit_type_test.cc + string_utils_test.cc) target_link_libraries(util_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock) add_test(NAME util_test COMMAND util_test) diff --git a/test/string_utils_test.cc b/test/string_utils_test.cc new file mode 100644 index 000000000..c643e7d50 --- /dev/null +++ b/test/string_utils_test.cc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/string_utils.h" + +#include + +namespace iceberg { + +TEST(StringUtilsTest, ToLower) { + ASSERT_EQ(internal::StringUtils::ToLower("AbC"), "abc"); + ASSERT_EQ(internal::StringUtils::ToLower("A-bC"), "a-bc"); + ASSERT_EQ(internal::StringUtils::ToLower("A_bC"), "a_bc"); + ASSERT_EQ(internal::StringUtils::ToLower(""), ""); + ASSERT_EQ(internal::StringUtils::ToLower(" "), " "); + ASSERT_EQ(internal::StringUtils::ToLower("123"), "123"); +} + +TEST(StringUtilsTest, ToUpper) { + ASSERT_EQ(internal::StringUtils::ToUpper("abc"), "ABC"); + ASSERT_EQ(internal::StringUtils::ToUpper("A-bC"), "A-BC"); + ASSERT_EQ(internal::StringUtils::ToUpper("A_bC"), "A_BC"); + ASSERT_EQ(internal::StringUtils::ToUpper(""), ""); + ASSERT_EQ(internal::StringUtils::ToUpper(" "), " "); + ASSERT_EQ(internal::StringUtils::ToUpper("123"), "123"); +} + +} // namespace iceberg From 23e10244790685ab37796740023643e5d5539edd Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 22 Jul 2025 11:50:11 +0800 Subject: [PATCH 03/10] fix inline --- src/iceberg/file_format.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h index 409867545..2a88b16be 100644 --- a/src/iceberg/file_format.h +++ b/src/iceberg/file_format.h @@ -55,7 +55,7 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) { } /// \brief Convert a string to a FileFormatType -ICEBERG_EXPORT Result FileFormatTypeFromString( +ICEBERG_EXPORT inline Result FileFormatTypeFromString( std::string_view str) noexcept { auto lower = internal::StringUtils::ToLower(str); if (lower == "parquet") return FileFormatType::kParquet; From 76a51319db3dce36fa675d96b78189205c7ea6b1 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 22 Jul 2025 12:15:47 +0800 Subject: [PATCH 04/10] fallback to use transform since gcc13 do not support range --- src/iceberg/util/string_utils.h | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/iceberg/util/string_utils.h b/src/iceberg/util/string_utils.h index 27b85cd0b..4a5380e0c 100644 --- a/src/iceberg/util/string_utils.h +++ b/src/iceberg/util/string_utils.h @@ -29,14 +29,18 @@ class StringUtils { public: static std::string ToLower(std::string_view str) { std::string input(str); - return input | std::views::transform([](char c) { return std::tolower(c); }) | - std::ranges::to(); + // TODO(xiao.dong) gcc 13.3 didn't support std::ranges::to + std::transform(input.begin(), input.end(), input.begin(), // NOLINT + [](char c) { return std::tolower(c); }); // NOLINT + return input; } static std::string ToUpper(std::string_view str) { std::string input(str); - return input | std::views::transform([](char c) { return std::toupper(c); }) | - std::ranges::to(); + // TODO(xiao.dong) gcc 13.3 didn't support std::ranges::to + std::transform(input.begin(), input.end(), input.begin(), // NOLINT + [](char c) { return std::toupper(c); }); // NOLINT + return input; } }; From a116bcf86310117c104a38b4f8080f8396cce94a Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 22 Jul 2025 14:14:46 +0800 Subject: [PATCH 05/10] fix type --- src/iceberg/manifest_reader_internal.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 7ead8f480..56dbde5fe 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -84,13 +84,13 @@ namespace iceberg { } \ } -#define PARSE_PRIMITIVE_VECTOR_FIELD(item, count, array_view) \ +#define PARSE_PRIMITIVE_VECTOR_FIELD(item, count, array_view, type) \ for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) { \ auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx); \ auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx + 1); \ for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ - item.emplace_back( \ - ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx)); \ + item.emplace_back(static_cast( \ + ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx))); \ } \ } @@ -445,12 +445,12 @@ Status ParseDataFile(const std::shared_ptr& data_file_schema, case 13: PARSE_PRIMITIVE_VECTOR_FIELD( manifest_entries[manifest_idx].data_file->split_offsets, manifest_entry_count, - view_of_file_field); + view_of_file_field, int64_t); break; case 14: PARSE_PRIMITIVE_VECTOR_FIELD( manifest_entries[manifest_idx].data_file->equality_ids, manifest_entry_count, - view_of_file_field); + view_of_file_field, int32_t); break; case 15: PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id, From d4dc829534b2601e4424930e746d38f5629e1cbd Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 24 Jul 2025 14:22:16 +0800 Subject: [PATCH 06/10] merge same macros and seperate avro type register --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/avro/avro_register.cc | 29 +++++ src/iceberg/avro/avro_register.h | 32 +++++ src/iceberg/avro/avro_schema_util.cc | 11 +- src/iceberg/avro/avro_schema_util_internal.h | 2 - src/iceberg/file_format.h | 2 +- src/iceberg/manifest_reader_internal.cc | 119 +++++++----------- ...string_utils.h => string_utils_internal.h} | 4 +- test/manifest_reader_test.cc | 1 + test/string_utils_test.cc | 4 +- 10 files changed, 117 insertions(+), 88 deletions(-) create mode 100644 src/iceberg/avro/avro_register.cc create mode 100644 src/iceberg/avro/avro_register.h rename src/iceberg/util/{string_utils.h => string_utils_internal.h} (95%) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0900068a6..4751e9fd5 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -108,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE) avro/avro_data_util.cc avro/avro_reader.cc avro/avro_schema_util.cc + avro/avro_register.cc avro/avro_stream_internal.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. diff --git a/src/iceberg/avro/avro_register.cc b/src/iceberg/avro/avro_register.cc new file mode 100644 index 000000000..52395e2f5 --- /dev/null +++ b/src/iceberg/avro/avro_register.cc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "avro_register.h" + +namespace iceberg::avro { + +void RegisterLogicalTypes() { + ::avro::CustomLogicalTypeRegistry::instance().registerType( + "map", [](const std::string&) { return std::make_shared(); }); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_register.h b/src/iceberg/avro/avro_register.h new file mode 100644 index 000000000..6e7c66b6f --- /dev/null +++ b/src/iceberg/avro/avro_register.h @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +namespace iceberg::avro { + +struct MapLogicalType : public ::avro::CustomLogicalType { + MapLogicalType() : ::avro::CustomLogicalType("map") {} +}; + +void RegisterLogicalTypes(); + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 87b1e54bf..abe19e38f 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -30,6 +29,7 @@ #include #include +#include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/metadata_columns.h" #include "iceberg/schema.h" @@ -49,10 +49,6 @@ constexpr std::string_view kValueIdProp = "value-id"; constexpr std::string_view kElementIdProp = "element-id"; constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc"; -struct MapLogicalType : public ::avro::CustomLogicalType { - MapLogicalType() : ::avro::CustomLogicalType("map") {} -}; - ::avro::LogicalType GetMapLogicalType() { static std::once_flag flag{}; std::call_once(flag, []() { @@ -72,11 +68,6 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) { } // namespace -void RegisterLogicalTypes() { - ::avro::CustomLogicalTypeRegistry::instance().registerType( - "map", [](const std::string&) { return std::make_shared(); }); -} - std::string ToString(const ::avro::NodePtr& node) { std::stringstream ss; ss << *node; diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index e1015e514..07e949aef 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -144,6 +144,4 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type); /// \return True if the node has a map logical type, false otherwise. bool HasMapLogicalType(const ::avro::NodePtr& node); -void RegisterLogicalTypes(); - } // namespace iceberg::avro diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h index 2a88b16be..509e6673b 100644 --- a/src/iceberg/file_format.h +++ b/src/iceberg/file_format.h @@ -27,7 +27,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" -#include "iceberg/util/string_utils.h" +#include "iceberg/util/string_utils_internal.h" namespace iceberg { diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 56dbde5fe..251e0d71d 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -49,23 +49,11 @@ namespace iceberg { } \ } -#define PARSE_ENUM_FIELD(item, array_view, type) \ - for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ - if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ - auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \ - item = static_cast(value); \ - } else if (required) { \ - return InvalidManifestList("Field {} is required but null at row {}", field_name, \ - row_idx); \ - } \ - } - #define PARSE_STRING_FIELD(item, array_view) \ for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \ - std::string path_str(value.data, value.size_bytes); \ - item = path_str; \ + item = std::string(value.data, value.size_bytes); \ } else if (required) { \ return InvalidManifestList("Field {} is required but null at row {}", field_name, \ row_idx); \ @@ -75,16 +63,14 @@ namespace iceberg { #define PARSE_BINARY_FIELD(item, array_view) \ for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ - auto buffer = ArrowArrayViewGetBytesUnsafe(array_view, row_idx); \ - item = std::vector(buffer.data.as_char, \ - buffer.data.as_char + buffer.size_bytes); \ + item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \ } else if (required) { \ return InvalidManifestList("Field {} is required but null at row {}", field_name, \ row_idx); \ } \ } -#define PARSE_PRIMITIVE_VECTOR_FIELD(item, count, array_view, type) \ +#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type) \ for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) { \ auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx); \ auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx + 1); \ @@ -94,7 +80,7 @@ namespace iceberg { } \ } -#define PARSE_PRIMITIVE_MAP_FIELD(item, count, array_view) \ +#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type, assignment) \ do { \ if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \ return InvalidManifest("Field:{} should be a map.", field_name); \ @@ -102,42 +88,28 @@ namespace iceberg { auto view_of_map = array_view->children[0]; \ ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \ auto view_of_map_key = view_of_map->children[0]; \ - ASSERT_VIEW_TYPE(view_of_map_key, ArrowType::NANOARROW_TYPE_INT32); \ + ASSERT_VIEW_TYPE(view_of_map_key, key_type); \ auto view_of_map_value = view_of_map->children[1]; \ - ASSERT_VIEW_TYPE(view_of_map_value, ArrowType::NANOARROW_TYPE_INT64); \ + ASSERT_VIEW_TYPE(view_of_map_value, value_type); \ for (int64_t row_idx = 0; row_idx < count; row_idx++) { \ auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \ auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \ for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \ - auto value = ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx); \ - item[key] = value; \ + item[key] = assignment; \ } \ } \ } while (0) -#define PARSE_BINARY_MAP_FIELD(item, count, array_view) \ - do { \ - if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \ - return InvalidManifest("Field:{} should be a map.", field_name); \ - } \ - auto view_of_map = array_view->children[0]; \ - ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \ - auto view_of_map_key = view_of_map->children[0]; \ - ASSERT_VIEW_TYPE(view_of_map_key, ArrowType::NANOARROW_TYPE_INT32); \ - auto view_of_map_value = view_of_map->children[1]; \ - ASSERT_VIEW_TYPE(view_of_map_value, ArrowType::NANOARROW_TYPE_BINARY); \ - for (int64_t row_idx = 0; row_idx < count; row_idx++) { \ - auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \ - auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \ - for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ - auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \ - auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_map_value, offset_idx); \ - item[key] = std::vector(buffer.data.as_char, \ - buffer.data.as_char + buffer.size_bytes); \ - } \ - } \ - } while (0) +#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view) \ + PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \ + ArrowType::NANOARROW_TYPE_INT64, \ + ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx)); + +#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view) \ + PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \ + ArrowType::NANOARROW_TYPE_BINARY, \ + ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx)); #define ASSERT_VIEW_TYPE(view, type) \ if (view->storage_type != type) { \ @@ -153,6 +125,12 @@ namespace iceberg { field_name, n_child); \ } +std::vector ArrowArrayViewGetInt8Vector(const ArrowArrayView* view, + int32_t offset_idx) { + auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx); + return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes}; +} + Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, std::vector& manifest_files) { auto manifest_count = view_of_column->length; @@ -202,14 +180,12 @@ Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx); } if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx); - partition_field_summary.lower_bound = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + partition_field_summary.lower_bound = + ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx); } if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx); - partition_field_summary.upper_bound = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + partition_field_summary.upper_bound = + ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx); } manifest_file.partitions.emplace_back(partition_field_summary); @@ -264,8 +240,8 @@ Result> ParseManifestList(ArrowSchema* schema, int32_t); break; case 3: - PARSE_ENUM_FIELD(manifest_files[row_idx].content, view_of_column, - ManifestFile::Content); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column, + ManifestFile::Content); break; case 4: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column, @@ -373,8 +349,8 @@ Status ParseDataFile(const std::shared_ptr& data_file_schema, switch (col_idx) { case 0: - PARSE_ENUM_FIELD(manifest_entries[row_idx].data_file->content, view_of_file_field, - DataFile::Content); + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content, + view_of_file_field, DataFile::Content); break; case 1: PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path, @@ -415,42 +391,41 @@ Status ParseDataFile(const std::shared_ptr& data_file_schema, // key&value should have the same offset // HACK(xiao.dong) workaround for arrow bug: // ArrowArrayViewListChildOffset can not get the correct offset for map - PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes, - manifest_entry_count, view_of_file_field); + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes, + manifest_entry_count, view_of_file_field); break; case 7: - PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts, - manifest_entry_count, view_of_file_field); + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts, + manifest_entry_count, view_of_file_field); break; case 8: - PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts, - manifest_entry_count, view_of_file_field); + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts, + manifest_entry_count, view_of_file_field); break; case 9: - PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts, - manifest_entry_count, view_of_file_field); + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts, + manifest_entry_count, view_of_file_field); break; case 10: - PARSE_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds, - manifest_entry_count, view_of_file_field); + PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds, + manifest_entry_count, view_of_file_field); break; case 11: - PARSE_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds, - manifest_entry_count, view_of_file_field); + PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds, + manifest_entry_count, view_of_file_field); break; case 12: PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata, view_of_file_field); break; case 13: - PARSE_PRIMITIVE_VECTOR_FIELD( + PARSE_INTEGER_VECTOR_FIELD( manifest_entries[manifest_idx].data_file->split_offsets, manifest_entry_count, view_of_file_field, int64_t); break; case 14: - PARSE_PRIMITIVE_VECTOR_FIELD( - manifest_entries[manifest_idx].data_file->equality_ids, manifest_entry_count, - view_of_file_field, int32_t); + PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids, + manifest_entry_count, view_of_file_field, int32_t); break; case 15: PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id, @@ -518,8 +493,8 @@ Result> ParseManifestEntry(ArrowSchema* schema, switch (idx) { case 0: - PARSE_ENUM_FIELD(manifest_entries[row_idx].status, view_of_column, - ManifestStatus); + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column, + ManifestStatus); break; case 1: PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, view_of_column, diff --git a/src/iceberg/util/string_utils.h b/src/iceberg/util/string_utils_internal.h similarity index 95% rename from src/iceberg/util/string_utils.h rename to src/iceberg/util/string_utils_internal.h index 4a5380e0c..a59a1a573 100644 --- a/src/iceberg/util/string_utils.h +++ b/src/iceberg/util/string_utils_internal.h @@ -23,6 +23,8 @@ #include #include +#include "iceberg/iceberg_export.h" + namespace iceberg::internal { class StringUtils { @@ -33,7 +35,7 @@ class StringUtils { std::transform(input.begin(), input.end(), input.begin(), // NOLINT [](char c) { return std::tolower(c); }); // NOLINT return input; - } + } // namespace iceberg::internal static std::string ToUpper(std::string_view str) { std::string input(str); diff --git a/test/manifest_reader_test.cc b/test/manifest_reader_test.cc index e312ddf36..6821ae79e 100644 --- a/test/manifest_reader_test.cc +++ b/test/manifest_reader_test.cc @@ -24,6 +24,7 @@ #include "iceberg/arrow/arrow_fs_file_io.h" #include "iceberg/avro/avro_reader.h" +#include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/schema.h" diff --git a/test/string_utils_test.cc b/test/string_utils_test.cc index c643e7d50..54bd96136 100644 --- a/test/string_utils_test.cc +++ b/test/string_utils_test.cc @@ -17,10 +17,10 @@ * under the License. */ -#include "iceberg/util/string_utils.h" - #include +#include "iceberg/util/string_utils_internal.h" + namespace iceberg { TEST(StringUtilsTest, ToLower) { From a4afb268244d0c96105a264162b2289d6748eff8 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 24 Jul 2025 14:56:50 +0800 Subject: [PATCH 07/10] fix string util --- src/iceberg/file_format.h | 4 +-- ...string_utils_internal.h => string_utils.h} | 8 +++--- test/string_utils_test.cc | 28 +++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) rename src/iceberg/util/{string_utils_internal.h => string_utils.h} (92%) diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h index 509e6673b..a54f196e6 100644 --- a/src/iceberg/file_format.h +++ b/src/iceberg/file_format.h @@ -27,7 +27,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" -#include "iceberg/util/string_utils_internal.h" +#include "iceberg/util/string_utils.h" namespace iceberg { @@ -57,7 +57,7 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) { /// \brief Convert a string to a FileFormatType ICEBERG_EXPORT inline Result FileFormatTypeFromString( std::string_view str) noexcept { - auto lower = internal::StringUtils::ToLower(str); + auto lower = StringUtils::ToLower(str); if (lower == "parquet") return FileFormatType::kParquet; if (lower == "avro") return FileFormatType::kAvro; if (lower == "orc") return FileFormatType::kOrc; diff --git a/src/iceberg/util/string_utils_internal.h b/src/iceberg/util/string_utils.h similarity index 92% rename from src/iceberg/util/string_utils_internal.h rename to src/iceberg/util/string_utils.h index a59a1a573..558fc293c 100644 --- a/src/iceberg/util/string_utils_internal.h +++ b/src/iceberg/util/string_utils.h @@ -25,9 +25,9 @@ #include "iceberg/iceberg_export.h" -namespace iceberg::internal { +namespace iceberg { -class StringUtils { +class ICEBERG_EXPORT StringUtils { public: static std::string ToLower(std::string_view str) { std::string input(str); @@ -35,7 +35,7 @@ class StringUtils { std::transform(input.begin(), input.end(), input.begin(), // NOLINT [](char c) { return std::tolower(c); }); // NOLINT return input; - } // namespace iceberg::internal + } static std::string ToUpper(std::string_view str) { std::string input(str); @@ -46,4 +46,4 @@ class StringUtils { } }; -} // namespace iceberg::internal +} // namespace iceberg diff --git a/test/string_utils_test.cc b/test/string_utils_test.cc index 54bd96136..2347debf7 100644 --- a/test/string_utils_test.cc +++ b/test/string_utils_test.cc @@ -17,28 +17,28 @@ * under the License. */ -#include +#include "iceberg/util/string_utils.h" -#include "iceberg/util/string_utils_internal.h" +#include namespace iceberg { TEST(StringUtilsTest, ToLower) { - ASSERT_EQ(internal::StringUtils::ToLower("AbC"), "abc"); - ASSERT_EQ(internal::StringUtils::ToLower("A-bC"), "a-bc"); - ASSERT_EQ(internal::StringUtils::ToLower("A_bC"), "a_bc"); - ASSERT_EQ(internal::StringUtils::ToLower(""), ""); - ASSERT_EQ(internal::StringUtils::ToLower(" "), " "); - ASSERT_EQ(internal::StringUtils::ToLower("123"), "123"); + ASSERT_EQ(StringUtils::ToLower("AbC"), "abc"); + ASSERT_EQ(StringUtils::ToLower("A-bC"), "a-bc"); + ASSERT_EQ(StringUtils::ToLower("A_bC"), "a_bc"); + ASSERT_EQ(StringUtils::ToLower(""), ""); + ASSERT_EQ(StringUtils::ToLower(" "), " "); + ASSERT_EQ(StringUtils::ToLower("123"), "123"); } TEST(StringUtilsTest, ToUpper) { - ASSERT_EQ(internal::StringUtils::ToUpper("abc"), "ABC"); - ASSERT_EQ(internal::StringUtils::ToUpper("A-bC"), "A-BC"); - ASSERT_EQ(internal::StringUtils::ToUpper("A_bC"), "A_BC"); - ASSERT_EQ(internal::StringUtils::ToUpper(""), ""); - ASSERT_EQ(internal::StringUtils::ToUpper(" "), " "); - ASSERT_EQ(internal::StringUtils::ToUpper("123"), "123"); + ASSERT_EQ(StringUtils::ToUpper("abc"), "ABC"); + ASSERT_EQ(StringUtils::ToUpper("A-bC"), "A-BC"); + ASSERT_EQ(StringUtils::ToUpper("A_bC"), "A_BC"); + ASSERT_EQ(StringUtils::ToUpper(""), ""); + ASSERT_EQ(StringUtils::ToUpper(" "), " "); + ASSERT_EQ(StringUtils::ToUpper("123"), "123"); } } // namespace iceberg From 6244509b31018a5a8591caa99fe3314312f74bfa Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 24 Jul 2025 15:08:58 +0800 Subject: [PATCH 08/10] fix string util ns --- src/iceberg/util/string_utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/util/string_utils.h b/src/iceberg/util/string_utils.h index 558fc293c..9ff250b66 100644 --- a/src/iceberg/util/string_utils.h +++ b/src/iceberg/util/string_utils.h @@ -27,7 +27,7 @@ namespace iceberg { -class ICEBERG_EXPORT StringUtils { +ICEBERG_EXPORT class StringUtils { public: static std::string ToLower(std::string_view str) { std::string input(str); From f5ff4c286aeb5dfc2afbc801c6c326197393a786 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 24 Jul 2025 19:20:21 +0800 Subject: [PATCH 09/10] fix comments --- src/iceberg/avro/avro_register.cc | 11 +++++++++-- src/iceberg/avro/avro_register.h | 8 ++------ src/iceberg/avro/avro_schema_util.cc | 7 +------ src/iceberg/avro/avro_schema_util_internal.h | 4 ++++ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/iceberg/avro/avro_register.cc b/src/iceberg/avro/avro_register.cc index 52395e2f5..8e2048b4b 100644 --- a/src/iceberg/avro/avro_register.cc +++ b/src/iceberg/avro/avro_register.cc @@ -19,11 +19,18 @@ #include "avro_register.h" +#include "iceberg/avro/avro_schema_util_internal.h" + namespace iceberg::avro { void RegisterLogicalTypes() { - ::avro::CustomLogicalTypeRegistry::instance().registerType( - "map", [](const std::string&) { return std::make_shared(); }); + static std::once_flag flag{}; + std::call_once(flag, []() { + // Register the map logical type with the avro custom logical type registry. + // See https://github.com/apache/avro/pull/3326 for details. + ::avro::CustomLogicalTypeRegistry::instance().registerType( + "map", [](const std::string&) { return std::make_shared(); }); + }); } } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_register.h b/src/iceberg/avro/avro_register.h index 6e7c66b6f..ce6510757 100644 --- a/src/iceberg/avro/avro_register.h +++ b/src/iceberg/avro/avro_register.h @@ -19,14 +19,10 @@ #pragma once -#include +#include "iceberg/iceberg_bundle_export.h" namespace iceberg::avro { -struct MapLogicalType : public ::avro::CustomLogicalType { - MapLogicalType() : ::avro::CustomLogicalType("map") {} -}; - -void RegisterLogicalTypes(); +ICEBERG_BUNDLE_EXPORT void RegisterLogicalTypes(); } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index abe19e38f..28e9e7bfc 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -50,12 +50,7 @@ constexpr std::string_view kElementIdProp = "element-id"; constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc"; ::avro::LogicalType GetMapLogicalType() { - static std::once_flag flag{}; - std::call_once(flag, []() { - // Register the map logical type with the avro custom logical type registry. - // See https://github.com/apache/avro/pull/3326 for details. - RegisterLogicalTypes(); - }); + RegisterLogicalTypes(); return ::avro::LogicalType(std::make_shared()); } diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index 07e949aef..de3922a2f 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -34,6 +34,10 @@ class ValidSchema; namespace iceberg::avro { +struct MapLogicalType : public ::avro::CustomLogicalType { + MapLogicalType() : ::avro::CustomLogicalType("map") {} +}; + /// \brief A visitor that converts an Iceberg type to an Avro node. class ToAvroNodeVisitor { public: From eb6c2f5ad4dea11819ff120f6829e3b296f7e7ce Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 25 Jul 2025 15:55:22 +0800 Subject: [PATCH 10/10] fix include --- src/iceberg/avro/avro_register.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/avro/avro_register.cc b/src/iceberg/avro/avro_register.cc index 8e2048b4b..a969948cd 100644 --- a/src/iceberg/avro/avro_register.cc +++ b/src/iceberg/avro/avro_register.cc @@ -17,7 +17,7 @@ * under the License. */ -#include "avro_register.h" +#include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h"