diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0900068a6..4751e9fd5 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -108,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE) avro/avro_data_util.cc avro/avro_reader.cc avro/avro_schema_util.cc + avro/avro_register.cc avro/avro_stream_internal.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. diff --git a/src/iceberg/avro/avro_register.cc b/src/iceberg/avro/avro_register.cc new file mode 100644 index 000000000..a969948cd --- /dev/null +++ b/src/iceberg/avro/avro_register.cc @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_register.h" + +#include "iceberg/avro/avro_schema_util_internal.h" + +namespace iceberg::avro { + +void RegisterLogicalTypes() { + static std::once_flag flag{}; + std::call_once(flag, []() { + // Register the map logical type with the avro custom logical type registry. + // See https://github.com/apache/avro/pull/3326 for details. + ::avro::CustomLogicalTypeRegistry::instance().registerType( + "map", [](const std::string&) { return std::make_shared(); }); + }); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_register.h b/src/iceberg/avro/avro_register.h new file mode 100644 index 000000000..ce6510757 --- /dev/null +++ b/src/iceberg/avro/avro_register.h @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::avro { + +ICEBERG_BUNDLE_EXPORT void RegisterLogicalTypes(); + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 905d9802f..28e9e7bfc 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -30,6 +29,7 @@ #include #include +#include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/metadata_columns.h" #include "iceberg/schema.h" @@ -49,18 +49,8 @@ constexpr std::string_view kValueIdProp = "value-id"; constexpr std::string_view kElementIdProp = "element-id"; constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc"; -struct MapLogicalType : public ::avro::CustomLogicalType { - MapLogicalType() : ::avro::CustomLogicalType("map") {} -}; - ::avro::LogicalType GetMapLogicalType() { - static std::once_flag flag{}; - std::call_once(flag, []() { - // Register the map logical type with the avro custom logical type registry. - // See https://github.com/apache/avro/pull/3326 for details. - ::avro::CustomLogicalTypeRegistry::instance().registerType( - "map", [](const std::string&) { return std::make_shared(); }); - }); + RegisterLogicalTypes(); return ::avro::LogicalType(std::make_shared()); } diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index 07e949aef..de3922a2f 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -34,6 +34,10 @@ class ValidSchema; namespace iceberg::avro { +struct MapLogicalType : public ::avro::CustomLogicalType { + MapLogicalType() : ::avro::CustomLogicalType("map") {} +}; + /// \brief A visitor that converts an Iceberg type to an Avro node. class ToAvroNodeVisitor { public: diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h index f4a899749..a54f196e6 100644 --- a/src/iceberg/file_format.h +++ b/src/iceberg/file_format.h @@ -27,6 +27,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/util/string_utils.h" namespace iceberg { @@ -54,12 +55,13 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) { } /// \brief Convert a string to a FileFormatType -ICEBERG_EXPORT constexpr Result FileFormatTypeFromString( +ICEBERG_EXPORT inline Result FileFormatTypeFromString( std::string_view str) noexcept { - if (str == "parquet") return FileFormatType::kParquet; - if (str == "avro") return FileFormatType::kAvro; - if (str == "orc") return FileFormatType::kOrc; - if (str == "puffin") return FileFormatType::kPuffin; + auto lower = StringUtils::ToLower(str); + if (lower == "parquet") return FileFormatType::kParquet; + if (lower == "avro") return FileFormatType::kAvro; + if (lower == "orc") return FileFormatType::kOrc; + if (lower == "puffin") return FileFormatType::kPuffin; return InvalidArgument("Invalid file format type: {}", str); } diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc index f670671d6..4a9aaa2df 100644 --- a/src/iceberg/manifest_entry.cc +++ b/src/iceberg/manifest_entry.cc @@ -40,7 +40,7 @@ std::shared_ptr DataFile::Type(std::shared_ptr partition kContent, kFilePath, kFileFormat, - SchemaField::MakeRequired(102, "partition", std::move(partition_type)), + SchemaField::MakeRequired(102, kPartitionField, std::move(partition_type)), kRecordCount, kFileSize, kColumnSizes, @@ -68,7 +68,7 @@ std::shared_ptr ManifestEntry::TypeFromDataFileType( std::shared_ptr datafile_type) { return std::make_shared(std::vector{ kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber, - SchemaField::MakeRequired(2, "data_file", std::move(datafile_type))}); + SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))}); } } // namespace iceberg diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h index 9293fd64e..36fd8f823 100644 --- a/src/iceberg/manifest_entry.h +++ b/src/iceberg/manifest_entry.h @@ -182,7 +182,8 @@ struct ICEBERG_EXPORT DataFile { inline static const SchemaField kFilePath = SchemaField::MakeRequired( 100, "file_path", iceberg::string(), "Location URI with FS scheme"); inline static const SchemaField kFileFormat = SchemaField::MakeRequired( - 101, "file_format", iceberg::int32(), "File format name: avro, orc, or parquet"); + 101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet"); + inline static const std::string kPartitionField = "partition"; inline static const SchemaField kRecordCount = SchemaField::MakeRequired( 103, "record_count", iceberg::int64(), "Number of records in the file"); inline static const SchemaField kFileSize = SchemaField::MakeRequired( @@ -299,6 +300,7 @@ struct ICEBERG_EXPORT ManifestEntry { SchemaField::MakeOptional(3, "sequence_number", iceberg::int64()); inline static const SchemaField kFileSequenceNumber = SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64()); + inline static const std::string kDataFileField = "data_file"; bool operator==(const ManifestEntry& other) const; diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 92d51c631..251e0d71d 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -24,10 +24,10 @@ #include #include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/file_format.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" -#include "iceberg/schema_internal.h" #include "iceberg/type.h" #include "iceberg/util/macros.h" @@ -38,6 +38,99 @@ namespace iceberg { return InvalidArrowData("Nanoarrow error: {}", error.message); \ } +#define PARSE_PRIMITIVE_FIELD(item, array_view, type) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ + auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \ + item = static_cast(value); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_STRING_FIELD(item, array_view) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ + auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \ + item = std::string(value.data, value.size_bytes); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_BINARY_FIELD(item, array_view) \ + for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ + item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + +#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type) \ + for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) { \ + auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx); \ + auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx + 1); \ + for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ + item.emplace_back(static_cast( \ + ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx))); \ + } \ + } + +#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type, assignment) \ + do { \ + if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \ + return InvalidManifest("Field:{} should be a map.", field_name); \ + } \ + auto view_of_map = array_view->children[0]; \ + ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \ + auto view_of_map_key = view_of_map->children[0]; \ + ASSERT_VIEW_TYPE(view_of_map_key, key_type); \ + auto view_of_map_value = view_of_map->children[1]; \ + ASSERT_VIEW_TYPE(view_of_map_value, value_type); \ + for (int64_t row_idx = 0; row_idx < count; row_idx++) { \ + auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \ + auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \ + for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \ + auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \ + item[key] = assignment; \ + } \ + } \ + } while (0) + +#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view) \ + PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \ + ArrowType::NANOARROW_TYPE_INT64, \ + ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx)); + +#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view) \ + PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \ + ArrowType::NANOARROW_TYPE_BINARY, \ + ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx)); + +#define ASSERT_VIEW_TYPE(view, type) \ + if (view->storage_type != type) { \ + return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); \ + } + +#define ASSERT_VIEW_TYPE_AND_CHILDREN(view, type, n_child) \ + if (view->storage_type != type) { \ + return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); \ + } \ + if (view->n_children != n_child) { \ + return InvalidManifest("Sub Field for:{} should have key&value:{} columns.", \ + field_name, n_child); \ + } + +std::vector ArrowArrayViewGetInt8Vector(const ArrowArrayView* view, + int32_t offset_idx) { + auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx); + return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes}; +} + Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, std::vector& manifest_files) { auto manifest_count = view_of_column->length; @@ -79,20 +172,20 @@ Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, if (!ArrowArrayViewIsNull(contains_null, partition_idx)) { partition_field_summary.contains_null = ArrowArrayViewGetIntUnsafe(contains_null, partition_idx); + } else { + return InvalidManifestList("contains_null is null at row {}", partition_idx); } if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) { partition_field_summary.contains_nan = ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx); } if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx); - partition_field_summary.lower_bound = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + partition_field_summary.lower_bound = + ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx); } if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx); - partition_field_summary.upper_bound = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + partition_field_summary.upper_bound = + ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx); } manifest_file.partitions.emplace_back(partition_field_summary); @@ -101,9 +194,9 @@ Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, return {}; } -Result> ParseManifestListEntry(ArrowSchema* schema, - ArrowArray* array_in, - const Schema& iceberg_schema) { +Result> ParseManifestList(ArrowSchema* schema, + ArrowArray* array_in, + const Schema& iceberg_schema) { if (schema->n_children != array_in->n_children) { return InvalidManifestList("Columns size not match between schema:{} and array:{}", schema->n_children, array_in->n_children); @@ -134,112 +227,336 @@ Result> ParseManifestListEntry(ArrowSchema* schema, auto field_name = field.value().get().name(); bool required = !field.value().get().optional(); auto view_of_column = array_view.children[idx]; - -#define PARSE_PRIMITIVE_FIELD(item, type) \ - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ - auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ - manifest_files[row_idx].item = static_cast(value); \ - } else if (required) { \ - return InvalidManifestList("Field {} is required but null at row {}", field_name, \ - row_idx); \ - } \ - } - switch (idx) { case 0: - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); - std::string path_str(value.data, value.size_bytes); - manifest_files[row_idx].manifest_path = path_str; - } - } + PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, view_of_column); break; case 1: - PARSE_PRIMITIVE_FIELD(manifest_length, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, view_of_column, + int64_t); break; case 2: - PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, view_of_column, + int32_t); break; case 3: - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); - manifest_files[row_idx].content = static_cast(value); - } - } + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column, + ManifestFile::Content); break; case 4: - PARSE_PRIMITIVE_FIELD(sequence_number, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column, + int64_t); break; case 5: - PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, view_of_column, + int64_t); break; case 6: - PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, view_of_column, + int64_t); break; case 7: - PARSE_PRIMITIVE_FIELD(added_files_count, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, view_of_column, + int32_t); break; case 8: - PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count, + view_of_column, int32_t); break; case 9: - PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, view_of_column, + int32_t); break; case 10: - PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, view_of_column, + int64_t); break; case 11: - PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, view_of_column, + int64_t); break; case 12: - PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, view_of_column, + int64_t); break; case 13: ICEBERG_RETURN_UNEXPECTED( ParsePartitionFieldSummaryList(view_of_column, manifest_files)); break; case 14: - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); - manifest_files[row_idx].key_metadata = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, view_of_column); + break; + case 15: + PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, view_of_column, + int64_t); + break; + default: + return InvalidManifestList("Unsupported field: {} in manifest file.", field_name); + } + } + return manifest_files; +} + +Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx, + std::vector& manifest_entries) { + if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) { + auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back( + Literal::Boolean(value != 0)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT32) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Int(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT64) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Long(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_FLOAT) { + auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Float(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_DOUBLE) { + auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Double(value)); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_STRING) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back( + Literal::String(std::string(value.data, value.size_bytes))); + } else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BINARY) { + auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx); + manifest_entries[row_idx].data_file->partition.emplace_back( + Literal::Binary(std::vector(buffer.data.as_char, + buffer.data.as_char + buffer.size_bytes))); + } else { + return InvalidManifest("Unsupported field type: {} in data file partition.", + static_cast(view_of_partition->storage_type)); + } + return {}; +} + +Status ParseDataFile(const std::shared_ptr& data_file_schema, + ArrowArrayView* view_of_column, + std::vector& manifest_entries) { + if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidManifest("DataFile field should be a struct."); + } + if (view_of_column->n_children != data_file_schema->fields().size()) { + return InvalidManifest("DataFile schema size:{} not match with ArrayArray columns:{}", + data_file_schema->fields().size(), view_of_column->n_children); + } + for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) { + auto field_name = data_file_schema->GetFieldByIndex(col_idx).value().get().name(); + auto required = !data_file_schema->GetFieldByIndex(col_idx).value().get().optional(); + auto view_of_file_field = view_of_column->children[col_idx]; + auto manifest_entry_count = view_of_file_field->length; + + switch (col_idx) { + case 0: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content, + view_of_file_field, DataFile::Content); + break; + case 1: + PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path, + view_of_file_field); + break; + case 2: + for (size_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, row_idx); + std::string_view path_str(value.data, value.size_bytes); + ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format, + FileFormatTypeFromString(path_str)); + } + } + break; + case 3: { + if (view_of_file_field->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidManifest("Field:{} should be a list.", field_name); + } + auto view_of_partition = view_of_file_field->children[0]; + for (size_t row_idx = 0; row_idx < view_of_partition->length; row_idx++) { + if (ArrowArrayViewIsNull(view_of_partition, row_idx)) { + break; } + ICEBERG_RETURN_UNEXPECTED( + ParseLiteral(view_of_partition, row_idx, manifest_entries)); } + } break; + case 4: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count, + view_of_file_field, int64_t); + break; + case 5: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes, + view_of_file_field, int64_t); + break; + case 6: + // key&value should have the same offset + // HACK(xiao.dong) workaround for arrow bug: + // ArrowArrayViewListChildOffset can not get the correct offset for map + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes, + manifest_entry_count, view_of_file_field); + break; + case 7: + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts, + manifest_entry_count, view_of_file_field); + break; + case 8: + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts, + manifest_entry_count, view_of_file_field); + break; + case 9: + PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts, + manifest_entry_count, view_of_file_field); + break; + case 10: + PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds, + manifest_entry_count, view_of_file_field); + break; + case 11: + PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds, + manifest_entry_count, view_of_file_field); + break; + case 12: + PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata, + view_of_file_field); + break; + case 13: + PARSE_INTEGER_VECTOR_FIELD( + manifest_entries[manifest_idx].data_file->split_offsets, manifest_entry_count, + view_of_file_field, int64_t); + break; + case 14: + PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids, + manifest_entry_count, view_of_file_field, int32_t); break; case 15: - PARSE_PRIMITIVE_FIELD(first_row_id, int64_t); + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id, + view_of_file_field, int32_t); + break; + case 16: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id, + view_of_file_field, int64_t); + break; + case 17: + PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file, + view_of_file_field); + break; + case 18: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset, + view_of_file_field, int64_t); + break; + case 19: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes, + view_of_file_field, int64_t); break; default: - return InvalidManifestList("Unsupported type: {}", field_name); + return InvalidManifest("Unsupported field: {} in data file.", field_name); } } - return manifest_files; + return {}; } -Result> ManifestReaderImpl::Entries() const { return {}; } +Result> ParseManifestEntry(ArrowSchema* schema, + ArrowArray* array_in, + const Schema& iceberg_schema) { + if (schema->n_children != array_in->n_children) { + return InvalidManifest("Columns size not match between schema:{} and array:{}", + schema->n_children, array_in->n_children); + } + if (iceberg_schema.fields().size() != array_in->n_children) { + return InvalidManifest("Columns size not match between schema:{} and array:{}", + iceberg_schema.fields().size(), array_in->n_children); + } + + ArrowError error; + ArrowArrayView array_view; + auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + internal::ArrowArrayViewGuard view_guard(&array_view); + status = ArrowArrayViewSetArray(&array_view, array_in, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + + std::vector manifest_entries; + manifest_entries.resize(array_in->length); + for (size_t i = 0; i < array_in->length; i++) { + manifest_entries[i].data_file = std::make_shared(); + } + + for (int64_t idx = 0; idx < array_in->n_children; idx++) { + const auto& field = iceberg_schema.GetFieldByIndex(idx); + if (!field.has_value()) { + return InvalidManifest("Field not found in schema: {}", idx); + } + auto field_name = field.value().get().name(); + bool required = !field.value().get().optional(); + auto view_of_column = array_view.children[idx]; + + switch (idx) { + case 0: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column, + ManifestStatus); + break; + case 1: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, view_of_column, + int64_t); + break; + case 2: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number, view_of_column, + int64_t); + break; + case 3: + PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number, + view_of_column, int64_t); + break; + case 4: { + auto data_file_schema = + dynamic_pointer_cast(field.value().get().type()); + ICEBERG_RETURN_UNEXPECTED( + ParseDataFile(data_file_schema, view_of_column, manifest_entries)); + break; + } + default: + return InvalidManifest("Unsupported field: {} in manifest entry.", field_name); + } + } + return manifest_entries; +} + +Result> ManifestReaderImpl::Entries() const { + std::vector manifest_entries; + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); + internal::ArrowSchemaGuard schema_guard(&arrow_schema); + while (true) { + ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next()); + if (result.has_value()) { + internal::ArrowArrayGuard array_guard(&result.value()); + ICEBERG_ASSIGN_OR_RAISE( + auto parse_result, + ParseManifestEntry(&arrow_schema, &result.value(), *schema_)); + manifest_entries.insert(manifest_entries.end(), + std::make_move_iterator(parse_result.begin()), + std::make_move_iterator(parse_result.end())); + } else { + // eof + break; + } + } + return manifest_entries; +} Result> ManifestListReaderImpl::Files() const { std::vector manifest_files; ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); internal::ArrowSchemaGuard schema_guard(&arrow_schema); while (true) { - auto result = reader_->Next(); - if (!result.has_value()) { - return InvalidManifestList("Failed to read manifest list entry:{}", - result.error().message); - } - if (result.value().has_value()) { - internal::ArrowArrayGuard array_guard(&result.value().value()); + ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next()); + if (result.has_value()) { + internal::ArrowArrayGuard array_guard(&result.value()); ICEBERG_ASSIGN_OR_RAISE( - auto entries, - ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_)); + auto parse_result, ParseManifestList(&arrow_schema, &result.value(), *schema_)); manifest_files.insert(manifest_files.end(), - std::make_move_iterator(entries.begin()), - std::make_move_iterator(entries.end())); + std::make_move_iterator(parse_result.begin()), + std::make_move_iterator(parse_result.end())); } else { // eof break; diff --git a/src/iceberg/util/string_utils.h b/src/iceberg/util/string_utils.h new file mode 100644 index 000000000..9ff250b66 --- /dev/null +++ b/src/iceberg/util/string_utils.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +ICEBERG_EXPORT class StringUtils { + public: + static std::string ToLower(std::string_view str) { + std::string input(str); + // TODO(xiao.dong) gcc 13.3 didn't support std::ranges::to + std::transform(input.begin(), input.end(), input.begin(), // NOLINT + [](char c) { return std::tolower(c); }); // NOLINT + return input; + } + + static std::string ToUpper(std::string_view str) { + std::string input(str); + // TODO(xiao.dong) gcc 13.3 didn't support std::ranges::to + std::transform(input.begin(), input.end(), input.begin(), // NOLINT + [](char c) { return std::toupper(c); }); // NOLINT + return input; + } +}; + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0597d766d..485efd4cc 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -66,7 +66,8 @@ target_link_libraries(json_serde_test PRIVATE iceberg_static GTest::gtest_main add_test(NAME json_serde_test COMMAND json_serde_test) add_executable(util_test) -target_sources(util_test PRIVATE formatter_test.cc config_test.cc visit_type_test.cc) +target_sources(util_test PRIVATE formatter_test.cc config_test.cc visit_type_test.cc + string_utils_test.cc) target_link_libraries(util_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock) add_test(NAME util_test COMMAND util_test) @@ -78,6 +79,7 @@ if(ICEBERG_BUILD_BUNDLE) avro_schema_test.cc avro_stream_test.cc manifest_list_reader_test.cc + manifest_reader_test.cc test_common.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) diff --git a/test/manifest_reader_test.cc b/test/manifest_reader_test.cc new file mode 100644 index 000000000..6821ae79e --- /dev/null +++ b/test/manifest_reader_test.cc @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_reader.h" + +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/avro/avro_reader.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/schema.h" +#include "temp_file_test_base.h" +#include "test_common.h" + +namespace iceberg { + +class ManifestReaderTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { avro::AvroReader::Register(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + file_io_ = std::make_shared(local_fs_); + + avro::RegisterLogicalTypes(); + } + + std::vector prepare_manifest_entries() { + std::vector manifest_entries; + std::string test_dir_prefix = "/tmp/db/db/iceberg_test/data/"; + std::vector paths = { + "order_ts_hour=2021-01-27-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00001.parquet", + "order_ts_hour=2024-01-27-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00002.parquet", + "order_ts_hour=2023-01-26-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00003.parquet", + "order_ts_hour=2021-01-26-00/" + "00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00004.parquet"}; + std::vector partitions = {447696, 473976, 465192, 447672}; + std::vector>> bounds = { + {{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x12, 0xe2}}, + {4, {0xc0, 'y', 0xe7, 0x98, 0xd6, 0xb9, 0x05, 0x00}}}, + {{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x12, 0xe3}}, + {4, {0xc0, 0x19, '#', '=', 0xe2, 0x0f, 0x06, 0x00}}}, + {{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x0e, '"'}}, + {4, {0xc0, 0xd9, '7', 0x93, 0x1f, 0xf3, 0x05, 0x00}}}, + {{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, + {3, {0x0e, '!'}}, + {4, {0xc0, 0x19, 0x10, '{', 0xc2, 0xb9, 0x05, 0x00}}}, + }; + for (int i = 0; i < 4; ++i) { + ManifestEntry entry; + entry.status = ManifestStatus::kAdded; + entry.snapshot_id = 6387266376565973956; + entry.data_file = std::make_shared(); + entry.data_file->file_path = test_dir_prefix + paths[i]; + entry.data_file->file_format = FileFormatType::kParquet; + entry.data_file->partition.emplace_back(Literal::Int(partitions[i])); + entry.data_file->record_count = 1; + entry.data_file->file_size_in_bytes = 1375; + entry.data_file->column_sizes = {{1, 49}, {2, 49}, {3, 49}, {4, 49}}; + entry.data_file->value_counts = {{1, 1}, {2, 1}, {3, 1}, {4, 1}}; + entry.data_file->null_value_counts = {{1, 0}, {2, 0}, {3, 0}, {4, 0}}; + entry.data_file->split_offsets = {4}; + entry.data_file->sort_order_id = 0; + entry.data_file->upper_bounds = bounds[i]; + entry.data_file->lower_bounds = bounds[i]; + manifest_entries.emplace_back(entry); + } + return manifest_entries; + } + + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; +}; + +TEST_F(ManifestReaderTest, BasicTest) { + iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true); + auto partition_schema = + std::make_shared(std::vector({partition_field})); + std::string path = GetResourcePath("56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro"); + auto manifest_reader_result = + ManifestReader::MakeReader(path, file_io_, partition_schema); + ASSERT_EQ(manifest_reader_result.has_value(), true) + << manifest_reader_result.error().message; + auto manifest_reader = std::move(manifest_reader_result.value()); + auto read_result = manifest_reader->Entries(); + ASSERT_EQ(read_result.has_value(), true) << read_result.error().message; + + auto expected_entries = prepare_manifest_entries(); + ASSERT_EQ(read_result.value(), expected_entries); +} + +} // namespace iceberg diff --git a/test/resources/56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro b/test/resources/56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro new file mode 100644 index 000000000..c671dfdf0 Binary files /dev/null and b/test/resources/56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro differ diff --git a/test/string_utils_test.cc b/test/string_utils_test.cc new file mode 100644 index 000000000..2347debf7 --- /dev/null +++ b/test/string_utils_test.cc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/string_utils.h" + +#include + +namespace iceberg { + +TEST(StringUtilsTest, ToLower) { + ASSERT_EQ(StringUtils::ToLower("AbC"), "abc"); + ASSERT_EQ(StringUtils::ToLower("A-bC"), "a-bc"); + ASSERT_EQ(StringUtils::ToLower("A_bC"), "a_bc"); + ASSERT_EQ(StringUtils::ToLower(""), ""); + ASSERT_EQ(StringUtils::ToLower(" "), " "); + ASSERT_EQ(StringUtils::ToLower("123"), "123"); +} + +TEST(StringUtilsTest, ToUpper) { + ASSERT_EQ(StringUtils::ToUpper("abc"), "ABC"); + ASSERT_EQ(StringUtils::ToUpper("A-bC"), "A-BC"); + ASSERT_EQ(StringUtils::ToUpper("A_bC"), "A_BC"); + ASSERT_EQ(StringUtils::ToUpper(""), ""); + ASSERT_EQ(StringUtils::ToUpper(" "), " "); + ASSERT_EQ(StringUtils::ToUpper("123"), "123"); +} + +} // namespace iceberg