From 316f42a954313a67821e66baa9bc3c9bed07c0d0 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 4 Aug 2025 23:34:40 +0800 Subject: [PATCH 1/4] feat(parquet): add schema projection for parquet --- src/iceberg/parquet/parquet_reader.cc | 2 +- src/iceberg/parquet/parquet_schema_util.cc | 378 ++++++++++++++- .../parquet/parquet_schema_util_internal.h | 16 +- src/iceberg/schema_util.h | 2 +- test/parquet_schema_test.cc | 458 +++++++++++++++++- 5 files changed, 842 insertions(+), 14 deletions(-) diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 282ce5ee8..6903310ad 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -218,7 +218,7 @@ class ParquetReader::Impl { } // Create the record batch reader - ICEBERG_ASSIGN_OR_RAISE(auto column_indices, SelectedColumnIndices(projection_)); + auto column_indices = SelectedColumnIndices(projection_); ICEBERG_ARROW_ASSIGN_OR_RETURN( context_->record_batch_reader_, reader_->GetRecordBatchReader(row_group_indices, column_indices)); diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 05c712273..d0a88b773 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -17,20 +17,392 @@ * under the License. */ +#include +#include +#include +#include #include +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional FieldIdFromMetadata( + const std::shared_ptr& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey.data()); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + try { + field_id = std::stoi(field_id_str); + } catch (const std::invalid_argument& e) { + return std::nullopt; + } catch (const std::out_of_range& e) { + return std::nullopt; + } + return field_id < 0 ? std::nullopt : std::make_optional(field_id); +} + +std::optional GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + return {}; + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast(expected_type); + const auto& arrow_decimal = + internal::checked_cast(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast(*arrow_type); + if (fixed_binary.byte_width() == 16) { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (const auto [iter, inserted] = field_context_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id.value()), + std::forward_as_tuple(i, parquet_field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Parquet schema: {}", + field_id.value()); + } + } + + FieldProjection result; + result.children.reserve(struct_type.fields().size()); + + for (const auto& expected_field : struct_type.fields()) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { + const auto& parquet_field = iter->second.parquet_field; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_field.type(), parquet_field)); + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), + parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared(parquet_field.column_index); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + return InvalidSchema("Missing required field with id: {}", field_id); + } + + result.children.emplace_back(std::move(child_projection)); + } + + PruneFieldProjection(result); + return result; +} + +Result ProjectList( + const ListType& list_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 1) { + return InvalidSchema("List type must have exactly one field, got {}", + parquet_fields.size()); + } + + const auto& parquet_field = parquet_fields[0]; + auto element_field_id = GetFieldId(parquet_field); + if (!element_field_id) { + return InvalidSchema("List element field missing field id"); + } + + const auto& expected_element_field = list_type.fields().back(); + if (expected_element_field.field_id() != element_field_id.value()) { + return InvalidSchema("List element field id mismatch, expected {}, got {}", + expected_element_field.field_id(), element_field_id.value()); + } + + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_element_field.type(), parquet_field)); + + FieldProjection element_projection; + if (expected_element_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + element_projection, + ProjectNested(*expected_element_field.type(), parquet_field.children)); + } else { + element_projection.attributes = + std::make_shared(parquet_field.column_index); + } + + element_projection.kind = FieldProjection::Kind::kProjected; + element_projection.from = size_t{0}; + + FieldProjection result; + result.children.emplace_back(std::move(element_projection)); + return result; +} + +Result ProjectMap( + const MapType& map_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 2) { + return InvalidSchema("Map type must have exactly two fields, got {}", + parquet_fields.size()); + } + + auto key_field_id = GetFieldId(parquet_fields[0]); + if (!key_field_id) { + return InvalidSchema("Map key field missing field id"); + } + auto value_field_id = GetFieldId(parquet_fields[1]); + if (!value_field_id) { + return InvalidSchema("Map value field missing field id"); + } + + const auto& expected_key_field = map_type.key(); + const auto& expected_value_field = map_type.value(); + if (expected_key_field.field_id() != key_field_id.value()) { + return InvalidSchema("Map key field id mismatch, expected {}, got {}", + expected_key_field.field_id(), key_field_id.value()); + } + if (expected_value_field.field_id() != value_field_id.value()) { + return InvalidSchema("Map value field id mismatch, expected {}, got {}", + expected_value_field.field_id(), value_field_id.value()); + } + + FieldProjection result; + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + FieldProjection sub_projection; + const auto& sub_node = parquet_fields[i]; + const auto& expected_sub_field = map_type.fields()[i]; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_sub_field.type(), sub_node)); + if (expected_sub_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + sub_projection, ProjectNested(*expected_sub_field.type(), sub_node.children)); + } else { + sub_projection.attributes = + std::make_shared(sub_node.column_index); + } + sub_projection.kind = FieldProjection::Kind::kProjected; + sub_projection.from = i; + result.children.emplace_back(std::move(sub_projection)); + } + + return result; +} + +Result ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (!expected_type.is_nested()) { + return InvalidSchema("Expected a nested type, but got {}", expected_type); + } + + switch (expected_type.type_id()) { + case TypeId::kStruct: + return ProjectStruct(internal::checked_cast(expected_type), + parquet_fields); + case TypeId::kList: + return ProjectList(internal::checked_cast(expected_type), + parquet_fields); + case TypeId::kMap: + if (parquet_fields.size() != 1 || + parquet_fields[0].field->type()->id() != ::arrow::Type::STRUCT || + parquet_fields[0].children.size() != 2) { + return InvalidSchema( + "Map type must have exactly one struct field with two children"); + } + return ProjectMap(internal::checked_cast(expected_type), + parquet_fields[0].children); + default: + return InvalidSchema("Unsupported nested type: {}", expected_type); + } +} + +void CollectColumnIds(const FieldProjection& field_projection, + std::vector* column_ids) { + if (field_projection.attributes) { + auto parquet_attributes = internal::checked_cast( + *field_projection.attributes); + if (parquet_attributes.column_id) { + column_ids->push_back(parquet_attributes.column_id.value()); + } + } + for (const auto& child : field_projection.children) { + CollectColumnIds(child, column_ids); + } +} + +} // namespace + Result Project(const Schema& expected_schema, const ::parquet::arrow::SchemaManifest& parquet_schema) { - return NotImplemented("NYI"); + ICEBERG_ASSIGN_OR_RAISE(auto field_projection, + ProjectNested(static_cast(expected_schema), + parquet_schema.schema_fields)); + return SchemaProjection{std::move(field_projection.children)}; } -Result> SelectedColumnIndices(const SchemaProjection& projection) { - return NotImplemented("NYI"); +std::vector SelectedColumnIndices(const SchemaProjection& projection) { + std::vector column_ids; + for (const auto& field : projection.fields) { + CollectColumnIds(field, &column_ids); + } + std::ranges::sort(column_ids); + return column_ids; } bool HasFieldIds(const ::parquet::schema::NodePtr& node) { diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h index 84e71ab4f..8e06b0bcf 100644 --- a/src/iceberg/parquet/parquet_schema_util_internal.h +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -19,6 +19,8 @@ #pragma once +#include + #include #include "iceberg/schema.h" @@ -26,6 +28,15 @@ namespace iceberg::parquet { +/// \brief Parquet specific attributes for the field. +struct ParquetExtraAttributes : public FieldProjection::ExtraAttributes { + explicit ParquetExtraAttributes(int32_t column_id) : column_id(column_id) {} + ~ParquetExtraAttributes() override = default; + + /// \brief The column id of projected Parquet column. + std::optional column_id; +}; + /// \brief Project an Iceberg Schema onto a Parquet Schema. /// /// This function creates a projection from an Iceberg Schema to a Parquet schema. @@ -34,7 +45,8 @@ namespace iceberg::parquet { /// /// \param expected_schema The Iceberg Schema that defines the expected structure. /// \param parquet_schema The Parquet schema to read data from. -/// \return The schema projection result with column indices of projected Parquet columns. +/// \return The schema projection result with column indices of projected Parquet columns +/// specified via ParquetExtraAttributes. Result Project(const Schema& expected_schema, const ::parquet::arrow::SchemaManifest& parquet_schema); @@ -42,7 +54,7 @@ Result Project(const Schema& expected_schema, /// /// \param projection The schema projection result. /// \return The selected column indices. -Result> SelectedColumnIndices(const SchemaProjection& projection); +std::vector SelectedColumnIndices(const SchemaProjection& projection); /// \brief Check whether the Parquet schema has field IDs. /// diff --git a/src/iceberg/schema_util.h b/src/iceberg/schema_util.h index 8ba21a7c9..9df402336 100644 --- a/src/iceberg/schema_util.h +++ b/src/iceberg/schema_util.h @@ -70,7 +70,7 @@ struct ICEBERG_EXPORT FieldProjection { /// \brief The children of the field if it is a nested field. std::vector children; /// \brief Format-specific attributes for the field. - std::shared_ptr extra; + std::shared_ptr attributes; }; /// \brief A schema partner to carry projection information. diff --git a/test/parquet_schema_test.cc b/test/parquet_schema_test.cc index 1a280ff4d..53059952a 100644 --- a/test/parquet_schema_test.cc +++ b/test/parquet_schema_test.cc @@ -17,29 +17,115 @@ * under the License. */ +#include #include +#include +#include #include -#include +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/schema.h" +#include "matchers.h" namespace iceberg::parquet { namespace { -::parquet::schema::NodePtr MakeInt32Node(const std::string& name, int field_id = -1) { +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +::parquet::schema::NodePtr MakeInt32Node(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::INT32, /*primitive_length=*/-1, + field_id); +} + +::parquet::schema::NodePtr MakeInt64Node(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::INT64, /*primitive_length=*/-1, + field_id); +} + +::parquet::schema::NodePtr MakeStringNode(const std::string& name, int field_id = -1, + bool optional = true) { return ::parquet::schema::PrimitiveNode::Make( - name, ::parquet::Repetition::REQUIRED, ::parquet::LogicalType::None(), - ::parquet::Type::INT32, /*primitive_length=*/-1, field_id); + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::String(), ::parquet::Type::BYTE_ARRAY, + /*primitive_length=*/-1, field_id); +} + +::parquet::schema::NodePtr MakeDoubleNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::DOUBLE, /*primitive_length=*/-1, + field_id); +} + +::parquet::schema::NodePtr MakeFloatNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::FLOAT, /*primitive_length=*/-1, + field_id); } ::parquet::schema::NodePtr MakeGroupNode(const std::string& name, const ::parquet::schema::NodeVector& fields, - int field_id = -1) { - return ::parquet::schema::GroupNode::Make(name, ::parquet::Repetition::REQUIRED, fields, - /*logical_type=*/nullptr, field_id); + int field_id = -1, bool optional = true) { + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + fields, /*logical_type=*/nullptr, field_id); } +::parquet::schema::NodePtr MakeListNode(const std::string& name, + const ::parquet::schema::NodePtr& element_node, + int field_id = -1, bool optional = true) { + auto list_group = ::parquet::schema::GroupNode::Make( + "element", ::parquet::Repetition::REPEATED, {element_node}); + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + {list_group}, ::parquet::LogicalType::List(), field_id); +} + +::parquet::schema::NodePtr MakeMapNode(const std::string& name, + const ::parquet::schema::NodePtr& key_node, + const ::parquet::schema::NodePtr& value_node, + int field_id = -1, bool optional = true) { + auto key_value_group = ::parquet::schema::GroupNode::Make( + "key_value", ::parquet::Repetition::REPEATED, {key_node, value_node}); + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + {key_value_group}, ::parquet::LogicalType::Map(), field_id); +} + +// Helper to create SchemaManifest from Parquet schema +::parquet::arrow::SchemaManifest MakeSchemaManifest( + const ::parquet::schema::NodePtr& parquet_schema) { + auto parquet_schema_descriptor = std::make_shared<::parquet::SchemaDescriptor>(); + parquet_schema_descriptor->Init(parquet_schema); + + ::parquet::arrow::SchemaManifest manifest; + auto status = ::parquet::arrow::SchemaManifest::Make( + parquet_schema_descriptor.get(), /*key_value_metadata=*/nullptr, + ::parquet::default_arrow_reader_properties(), &manifest); + if (!status.ok()) { + throw std::runtime_error("Failed to create SchemaManifest: " + status.ToString()); + } + return manifest; +} + +#define ASSERT_PROJECTED_FIELD(field_projection, index) \ + ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kProjected); \ + ASSERT_EQ(std::get<1>(field_projection.from), index); + +#define ASSERT_PROJECTED_NULL_FIELD(field_projection) \ + ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kNull); + } // namespace TEST(HasFieldIds, PrimitiveNode) { @@ -63,4 +149,362 @@ TEST(HasFieldIds, GroupNode) { EXPECT_TRUE(HasFieldIds(group_node_with_partial_field_id)); } +TEST(ParquetSchemaProjectionTest, ProjectIdenticalSchemas) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "age", iceberg::int32()), + SchemaField::MakeRequired(/*field_id=*/4, "data", iceberg::float64()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("age", /*field_id=*/3), MakeDoubleNode("data", /*field_id=*/4)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + for (size_t i = 0; i < projection.fields.size(); ++i) { + ASSERT_PROJECTED_FIELD(projection.fields[i], i); + } + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1, 2, 3})); +} + +TEST(ParquetSchemaProjectionTest, ProjectSubsetSchema) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/3, "age", iceberg::int32()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("age", /*field_id=*/3), MakeDoubleNode("data", /*field_id=*/4)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMissingOptionalField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/10, "extra", iceberg::string()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[2]); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMissingRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeRequired(/*field_id=*/10, "extra", iceberg::string()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Missing required field")); +} + +TEST(ParquetSchemaProjectionTest, ProjectMetadataColumn) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + MetadataColumns::kFilePath, + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt64Node("id", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0})); +} + +TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIntToLong) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt32Node("id", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); +} + +TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::float64()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeFloatNode("value", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); +} + +TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeStringNode("value", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read Iceberg type")); +} + +TEST(ParquetSchemaProjectionTest, ProjectNestedStructures) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/101, "street", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/102, "city", iceberg::string()), + })), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeInt64Node("id", /*field_id=*/1), + MakeListNode("address", MakeStringNode("street", /*field_id=*/100), + /*field_id=*/2), + MakeGroupNode("address", + {MakeStringNode("street", /*field_id=*/101), + MakeStringNode("city", /*field_id=*/102)}, + /*field_id=*/3), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(projection.fields[1].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2, 3})); +} + +TEST(ParquetSchemaProjectionTest, ProjectListType) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional( + /*field_id=*/2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeInt64Node("id", /*field_id=*/1), + MakeListNode("numbers", MakeInt32Node("element", /*field_id=*/101), + /*field_id=*/2), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[0], 0); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMapType) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/101, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/102, "value", iceberg::int32()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeMapNode("counts", + MakeStringNode("key", /*field_id=*/101, /*optional=*/false), + MakeInt32Node("value", /*field_id=*/102), /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectListOfStruct) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", + std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/104, "z", iceberg::int32()), + SchemaField::MakeOptional(/*field_id=*/102, "x", iceberg::int32()), + })))), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", + { + MakeListNode("items", + MakeGroupNode("element", + {MakeInt32Node("x", /*field_id=*/102), + MakeInt32Node("y", /*field_id=*/103), + MakeInt32Node("z", /*field_id=*/104), + MakeInt32Node("m", /*field_id=*/105)}, + /*field_id=*/101), + /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + // Verify list element struct is properly projected + ASSERT_EQ(projection.fields[0].children.size(), 1); + const auto& element_proj = projection.fields[0].children[0]; + ASSERT_EQ(element_proj.children.size(), 2); + ASSERT_PROJECTED_FIELD(element_proj.children[0], 1); + ASSERT_PROJECTED_FIELD(element_proj.children[1], 0); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2})); +} + +TEST(ParquetSchemaProjectionTest, ProjectDecimalType) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::decimal(18, 2)), + }); + + auto decimal_node = ::parquet::schema::PrimitiveNode::Make( + "value", ::parquet::Repetition::REQUIRED, ::parquet::LogicalType::Decimal(9, 2), + ::parquet::Type::FIXED_LEN_BYTE_ARRAY, /*primitive_length=*/4, /*field_id=*/1); + auto parquet_schema = MakeGroupNode("iceberg_schema", {decimal_node}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); +} + +TEST(ParquetSchemaProjectionTest, ProjectDecimalIncompatible) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::decimal(18, 3)), + }); + + auto decimal_node = ::parquet::schema::PrimitiveNode::Make( + "value", ::parquet::Repetition::REQUIRED, ::parquet::LogicalType::Decimal(9, 2), + ::parquet::Type::FIXED_LEN_BYTE_ARRAY, /*primitive_length=*/4, /*field_id=*/1); + auto parquet_schema = MakeGroupNode("iceberg_schema", {decimal_node}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); +} + +TEST(ParquetSchemaProjectionTest, ProjectDuplicateFieldIds) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", { + MakeInt64Node("id", /*field_id=*/1), + MakeStringNode("name", /*field_id=*/1) // Duplicate field ID + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Duplicate field id")); +} + } // namespace iceberg::parquet From b42bda53da1523807c51adf4595595c7e504d19c Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 5 Aug 2025 10:22:58 +0800 Subject: [PATCH 2/4] bump clang-tidy version for cpp-linter --- .github/workflows/cpp-linter.yml | 2 +- .github/workflows/pre-commit.yml | 2 +- src/iceberg/parquet/parquet_schema_util.cc | 95 +++++++++++----------- test/parquet_schema_test.cc | 36 ++++---- 4 files changed, 70 insertions(+), 65 deletions(-) diff --git a/.github/workflows/cpp-linter.yml b/.github/workflows/cpp-linter.yml index 2085a4942..90f446cc2 100644 --- a/.github/workflows/cpp-linter.yml +++ b/.github/workflows/cpp-linter.yml @@ -47,7 +47,7 @@ jobs: with: style: file tidy-checks: '' - version: 19 + version: 22 files-changed-only: true lines-changed-only: true thread-comments: true diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 99426a1d6..ee33419c3 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -30,4 +30,4 @@ jobs: steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 - - uses: pre-commit/action@v3.0.1 + - uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1 diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index d0a88b773..a7db28846 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -17,6 +17,8 @@ * under the License. */ +#include + #include #include #include @@ -42,26 +44,25 @@ std::optional FieldIdFromMetadata( if (!metadata) { return std::nullopt; } - int key = metadata->FindKey(kParquetFieldIdKey.data()); + int key = metadata->FindKey(kParquetFieldIdKey); if (key < 0) { return std::nullopt; } std::string field_id_str = metadata->value(key); int32_t field_id = -1; - try { - field_id = std::stoi(field_id_str); - } catch (const std::invalid_argument& e) { - return std::nullopt; - } catch (const std::out_of_range& e) { + auto [_, ec] = std::from_chars(field_id_str.data(), + field_id_str.data() + field_id_str.size(), field_id); + if (ec != std::errc() || field_id < 0) { return std::nullopt; } - return field_id < 0 ? std::nullopt : std::make_optional(field_id); + return field_id; } std::optional GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { return FieldIdFromMetadata(parquet_field.field->metadata()); } +// TODO(gangwu): support v3 unknown type Status ValidateParquetSchemaEvolution( const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { const auto& arrow_type = parquet_field.field->type(); @@ -189,7 +190,7 @@ Status ValidateParquetSchemaEvolution( // Forward declaration Result ProjectNested( - const Type& expected_type, + const Type& nested_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields); Result ProjectStruct( @@ -212,7 +213,7 @@ Result ProjectStruct( std::piecewise_construct, std::forward_as_tuple(field_id.value()), std::forward_as_tuple(i, parquet_field)); !inserted) [[unlikely]] { - return InvalidSchema("Duplicate field id found in Parquet schema: {}", + return InvalidSchema("Duplicate field id {} found in Parquet schema", field_id.value()); } } @@ -220,17 +221,17 @@ Result ProjectStruct( FieldProjection result; result.children.reserve(struct_type.fields().size()); - for (const auto& expected_field : struct_type.fields()) { - int32_t field_id = expected_field.field_id(); + for (const auto& field : struct_type.fields()) { + int32_t field_id = field.field_id(); FieldProjection child_projection; if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { const auto& parquet_field = iter->second.parquet_field; ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*expected_field.type(), parquet_field)); - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), - parquet_field.children)); + ValidateParquetSchemaEvolution(*field.type(), parquet_field)); + if (field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectNested(*field.type(), parquet_field.children)); } else { child_projection.attributes = std::make_shared(parquet_field.column_index); @@ -239,7 +240,7 @@ Result ProjectStruct( child_projection.kind = FieldProjection::Kind::kProjected; } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; - } else if (expected_field.optional()) { + } else if (field.optional()) { child_projection.kind = FieldProjection::Kind::kNull; } else { return InvalidSchema("Missing required field with id: {}", field_id); @@ -260,26 +261,25 @@ Result ProjectList( parquet_fields.size()); } - const auto& parquet_field = parquet_fields[0]; + const auto& parquet_field = parquet_fields.back(); auto element_field_id = GetFieldId(parquet_field); if (!element_field_id) { return InvalidSchema("List element field missing field id"); } - const auto& expected_element_field = list_type.fields().back(); - if (expected_element_field.field_id() != element_field_id.value()) { + const auto& element_field = list_type.fields().back(); + if (element_field.field_id() != element_field_id.value()) { return InvalidSchema("List element field id mismatch, expected {}, got {}", - expected_element_field.field_id(), element_field_id.value()); + element_field.field_id(), element_field_id.value()); } ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*expected_element_field.type(), parquet_field)); + ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); FieldProjection element_projection; - if (expected_element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - element_projection, - ProjectNested(*expected_element_field.type(), parquet_field.children)); + if (element_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(element_projection, + ProjectNested(*element_field.type(), parquet_field.children)); } else { element_projection.attributes = std::make_shared(parquet_field.column_index); @@ -310,28 +310,29 @@ Result ProjectMap( return InvalidSchema("Map value field missing field id"); } - const auto& expected_key_field = map_type.key(); - const auto& expected_value_field = map_type.value(); - if (expected_key_field.field_id() != key_field_id.value()) { + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + if (key_field.field_id() != key_field_id.value()) { return InvalidSchema("Map key field id mismatch, expected {}, got {}", - expected_key_field.field_id(), key_field_id.value()); + key_field.field_id(), key_field_id.value()); } - if (expected_value_field.field_id() != value_field_id.value()) { + if (value_field.field_id() != value_field_id.value()) { return InvalidSchema("Map value field id mismatch, expected {}, got {}", - expected_value_field.field_id(), value_field_id.value()); + value_field.field_id(), value_field_id.value()); } FieldProjection result; + result.children.reserve(2); for (size_t i = 0; i < parquet_fields.size(); ++i) { FieldProjection sub_projection; const auto& sub_node = parquet_fields[i]; - const auto& expected_sub_field = map_type.fields()[i]; + const auto& sub_field = map_type.fields()[i]; ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*expected_sub_field.type(), sub_node)); - if (expected_sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - sub_projection, ProjectNested(*expected_sub_field.type(), sub_node.children)); + ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); + if (sub_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(sub_projection, + ProjectNested(*sub_field.type(), sub_node.children)); } else { sub_projection.attributes = std::make_shared(sub_node.column_index); @@ -345,18 +346,18 @@ Result ProjectMap( } Result ProjectNested( - const Type& expected_type, + const Type& nested_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { - if (!expected_type.is_nested()) { - return InvalidSchema("Expected a nested type, but got {}", expected_type); + if (!nested_type.is_nested()) { + return InvalidSchema("Expected a nested type, but got {}", nested_type); } - switch (expected_type.type_id()) { + switch (nested_type.type_id()) { case TypeId::kStruct: - return ProjectStruct(internal::checked_cast(expected_type), + return ProjectStruct(internal::checked_cast(nested_type), parquet_fields); case TypeId::kList: - return ProjectList(internal::checked_cast(expected_type), + return ProjectList(internal::checked_cast(nested_type), parquet_fields); case TypeId::kMap: if (parquet_fields.size() != 1 || @@ -365,20 +366,20 @@ Result ProjectNested( return InvalidSchema( "Map type must have exactly one struct field with two children"); } - return ProjectMap(internal::checked_cast(expected_type), + return ProjectMap(internal::checked_cast(nested_type), parquet_fields[0].children); default: - return InvalidSchema("Unsupported nested type: {}", expected_type); + return InvalidSchema("Unsupported nested type: {}", nested_type); } } void CollectColumnIds(const FieldProjection& field_projection, std::vector* column_ids) { if (field_projection.attributes) { - auto parquet_attributes = internal::checked_cast( + const auto& attributes = internal::checked_cast( *field_projection.attributes); - if (parquet_attributes.column_id) { - column_ids->push_back(parquet_attributes.column_id.value()); + if (attributes.column_id) { + column_ids->push_back(attributes.column_id.value()); } } for (const auto& child : field_projection.children) { diff --git a/test/parquet_schema_test.cc b/test/parquet_schema_test.cc index 53059952a..4a465ca97 100644 --- a/test/parquet_schema_test.cc +++ b/test/parquet_schema_test.cc @@ -18,7 +18,6 @@ */ #include -#include #include #include #include @@ -128,26 +127,31 @@ ::parquet::arrow::SchemaManifest MakeSchemaManifest( } // namespace -TEST(HasFieldIds, PrimitiveNode) { +TEST(HasFieldIdsTest, PrimitiveNode) { EXPECT_FALSE(HasFieldIds(MakeInt32Node("test_field"))); EXPECT_TRUE(HasFieldIds(MakeInt32Node("test_field", /*field_id=*/1))); + EXPECT_FALSE(HasFieldIds(MakeInt32Node("test_field", /*field_id=*/-1))); } -TEST(HasFieldIds, GroupNode) { - auto group_node_without_field_id = - MakeGroupNode("test_group", {MakeInt32Node("c1"), MakeInt32Node("c2")}); - EXPECT_FALSE(HasFieldIds(group_node_without_field_id)); - - auto group_node_with_full_field_id = MakeGroupNode( - "test_group", - {MakeInt32Node("c1", /*field_id=*/2), MakeInt32Node("c2", /*field_id=*/3)}, - /*field_id=*/1); - EXPECT_TRUE(HasFieldIds(group_node_with_full_field_id)); - - auto group_node_with_partial_field_id = MakeGroupNode( - "test_group", {MakeInt32Node("c1", /*field_id=*/1), MakeInt32Node("c2")}); - EXPECT_TRUE(HasFieldIds(group_node_with_partial_field_id)); +// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) +TEST(HasFieldIdsTest, GroupNode) { + EXPECT_FALSE( + HasFieldIds(MakeGroupNode("group_without_field_id", { + MakeInt32Node("c1"), + MakeInt32Node("c2"), + }))); + EXPECT_TRUE(HasFieldIds( + MakeGroupNode("group_with_full_field_id", { + MakeInt32Node("c1", /*field_id=*/1), + MakeInt32Node("c2", /*field_id=*/2), + }))); + EXPECT_TRUE(HasFieldIds(MakeGroupNode("group_with_partial_field_id", + { + MakeInt32Node("c1", /*field_id=*/1), + MakeInt32Node("c2"), + }))); } +// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks) TEST(ParquetSchemaProjectionTest, ProjectIdenticalSchemas) { Schema expected_schema({ From 36b01c541310acdf6ec2cc3040ab2042c575d417 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 8 Aug 2025 22:20:28 +0800 Subject: [PATCH 3/4] fix time64 unit and add more test --- src/iceberg/parquet/parquet_schema_util.cc | 6 +- test/parquet_schema_test.cc | 121 +++++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index a7db28846..8af5bb27c 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -101,7 +101,11 @@ Status ValidateParquetSchemaEvolution( break; case TypeId::kTime: if (arrow_type->id() == ::arrow::Type::TIME64) { - return {}; + const auto& time_type = + internal::checked_cast(*arrow_type); + if (time_type.unit() == ::arrow::TimeUnit::MICRO) { + return {}; + } } break; case TypeId::kTimestamp: diff --git a/test/parquet_schema_test.cc b/test/parquet_schema_test.cc index 4a465ca97..a5b0ca222 100644 --- a/test/parquet_schema_test.cc +++ b/test/parquet_schema_test.cc @@ -21,10 +21,12 @@ #include #include #include +#include #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" #include "iceberg/schema.h" +#include "iceberg/type.h" #include "matchers.h" namespace iceberg::parquet { @@ -511,4 +513,123 @@ TEST(ParquetSchemaProjectionTest, ProjectDuplicateFieldIds) { ASSERT_THAT(projection_result, HasErrorMessage("Duplicate field id")); } +TEST(ParquetSchemaProjectionTest, ProjectPrimitiveType) { + struct TestCase { + std::shared_ptr iceberg_type; + ::parquet::Type::type parquet_type; + std::shared_ptr parquet_logical_type; + int32_t primitive_length = -1; + }; + + std::vector test_cases = { + TestCase{.iceberg_type = float64(), .parquet_type = ::parquet::Type::DOUBLE}, + TestCase{.iceberg_type = float32(), .parquet_type = ::parquet::Type::FLOAT}, + TestCase{.iceberg_type = int64(), .parquet_type = ::parquet::Type::INT64}, + TestCase{.iceberg_type = int32(), .parquet_type = ::parquet::Type::INT32}, + TestCase{.iceberg_type = string(), + .parquet_type = ::parquet::Type::BYTE_ARRAY, + .parquet_logical_type = ::parquet::LogicalType::String()}, + TestCase{.iceberg_type = binary(), .parquet_type = ::parquet::Type::BYTE_ARRAY}, + TestCase{.iceberg_type = boolean(), .parquet_type = ::parquet::Type::BOOLEAN}, + TestCase{.iceberg_type = date(), + .parquet_type = ::parquet::Type::INT32, + .parquet_logical_type = ::parquet::LogicalType::Date()}, + TestCase{ + .iceberg_type = time(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Time( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::MICROS)}, + TestCase{ + .iceberg_type = timestamp(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/false, ::parquet::LogicalType::TimeUnit::MICROS)}, + TestCase{ + .iceberg_type = timestamp_tz(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::MICROS)}, + TestCase{.iceberg_type = decimal(4, 2), + .parquet_type = ::parquet::Type::INT32, + .parquet_logical_type = ::parquet::LogicalType::Decimal(4, 2)}, + TestCase{.iceberg_type = decimal(38, 18), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .parquet_logical_type = ::parquet::LogicalType::Decimal(38, 18), + .primitive_length = 16}, + TestCase{.iceberg_type = uuid(), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .parquet_logical_type = ::parquet::LogicalType::UUID(), + .primitive_length = 16}, + TestCase{.iceberg_type = fixed(8), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .primitive_length = 8}}; + + for (const auto& test_case : test_cases) { + Schema expected_schema({SchemaField::MakeRequired(/*field_id=*/1, "test_field", + test_case.iceberg_type)}); + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {::parquet::schema::PrimitiveNode::Make( + "test_field", ::parquet::Repetition::REQUIRED, test_case.parquet_logical_type, + test_case.parquet_type, test_case.primitive_length, + /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + } +} + +TEST(ParquetSchemaProjectionTest, UnsuportedProjection) { + struct TestCase { + std::shared_ptr iceberg_type; + ::parquet::Type::type parquet_type; + std::shared_ptr parquet_logical_type; + int32_t primitive_length = -1; + }; + + std::vector test_cases = { + TestCase{.iceberg_type = float32(), .parquet_type = ::parquet::Type::DOUBLE}, + TestCase{.iceberg_type = int32(), .parquet_type = ::parquet::Type::INT64}, + TestCase{.iceberg_type = date(), .parquet_type = ::parquet::Type::INT32}, + TestCase{.iceberg_type = time(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Time( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::NANOS)}, + TestCase{ + .iceberg_type = timestamp(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/false, ::parquet::LogicalType::TimeUnit::NANOS)}, + TestCase{.iceberg_type = timestamp_tz(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::NANOS)}, + TestCase{.iceberg_type = decimal(4, 2), + .parquet_type = ::parquet::Type::INT32, + .parquet_logical_type = ::parquet::LogicalType::Decimal(4, 1)}, + TestCase{.iceberg_type = fixed(8), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .primitive_length = 4}}; + + for (const auto& test_case : test_cases) { + Schema expected_schema({SchemaField::MakeRequired(/*field_id=*/1, "test_field", + test_case.iceberg_type)}); + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {::parquet::schema::PrimitiveNode::Make( + "test_field", ::parquet::Repetition::REQUIRED, test_case.parquet_logical_type, + test_case.parquet_type, test_case.primitive_length, + /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); + } +} + } // namespace iceberg::parquet From bc9d8be0d1b108ebdb90f4729fd751c7556a4e6a Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 12 Aug 2025 13:57:34 +0800 Subject: [PATCH 4/4] fix arrow uuid extension type --- src/iceberg/parquet/parquet_schema_util.cc | 17 +++++++++-------- test/parquet_schema_test.cc | 9 ++++++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 8af5bb27c..361489973 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -151,10 +152,10 @@ Status ValidateParquetSchemaEvolution( } break; case TypeId::kUuid: - if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { - const auto& fixed_binary = - internal::checked_cast(*arrow_type); - if (fixed_binary.byte_width() == 16) { + if (arrow_type->id() == ::arrow::Type::EXTENSION) { + const auto& extension_type = + internal::checked_cast(*arrow_type); + if (extension_type.extension_name() == "arrow.uuid") { return {}; } } @@ -213,10 +214,10 @@ Result ProjectStruct( if (!field_id) { continue; } - if (const auto [iter, inserted] = field_context_map.emplace( - std::piecewise_construct, std::forward_as_tuple(field_id.value()), - std::forward_as_tuple(i, parquet_field)); - !inserted) [[unlikely]] { + if (!field_context_map + .emplace(field_id.value(), + FieldContext{.local_index = i, .parquet_field = parquet_field}) + .second) [[unlikely]] { return InvalidSchema("Duplicate field id {} found in Parquet schema", field_id.value()); } diff --git a/test/parquet_schema_test.cc b/test/parquet_schema_test.cc index a5b0ca222..99f0b690b 100644 --- a/test/parquet_schema_test.cc +++ b/test/parquet_schema_test.cc @@ -110,10 +110,13 @@ ::parquet::arrow::SchemaManifest MakeSchemaManifest( auto parquet_schema_descriptor = std::make_shared<::parquet::SchemaDescriptor>(); parquet_schema_descriptor->Init(parquet_schema); + auto properties = ::parquet::default_arrow_reader_properties(); + properties.set_arrow_extensions_enabled(true); + ::parquet::arrow::SchemaManifest manifest; - auto status = ::parquet::arrow::SchemaManifest::Make( - parquet_schema_descriptor.get(), /*key_value_metadata=*/nullptr, - ::parquet::default_arrow_reader_properties(), &manifest); + auto status = ::parquet::arrow::SchemaManifest::Make(parquet_schema_descriptor.get(), + /*key_value_metadata=*/nullptr, + properties, &manifest); if (!status.ok()) { throw std::runtime_error("Failed to create SchemaManifest: " + status.ToString()); }