diff --git a/.github/workflows/cpp-linter.yml b/.github/workflows/cpp-linter.yml index 2085a4942..90f446cc2 100644 --- a/.github/workflows/cpp-linter.yml +++ b/.github/workflows/cpp-linter.yml @@ -47,7 +47,7 @@ jobs: with: style: file tidy-checks: '' - version: 19 + version: 22 files-changed-only: true lines-changed-only: true thread-comments: true diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 99426a1d6..ee33419c3 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -30,4 +30,4 @@ jobs: steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 - - uses: pre-commit/action@v3.0.1 + - uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1 diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 282ce5ee8..6903310ad 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -218,7 +218,7 @@ class ParquetReader::Impl { } // Create the record batch reader - ICEBERG_ASSIGN_OR_RAISE(auto column_indices, SelectedColumnIndices(projection_)); + auto column_indices = SelectedColumnIndices(projection_); ICEBERG_ARROW_ASSIGN_OR_RETURN( context_->record_batch_reader_, reader_->GetRecordBatchReader(row_group_indices, column_indices)); diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 05c712273..361489973 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -17,20 +17,398 @@ * under the License. */ +#include + +#include +#include +#include +#include +#include #include +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional FieldIdFromMetadata( + const std::shared_ptr& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + auto [_, ec] = std::from_chars(field_id_str.data(), + field_id_str.data() + field_id_str.size(), field_id); + if (ec != std::errc() || field_id < 0) { + return std::nullopt; + } + return field_id; +} + +std::optional GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +// TODO(gangwu): support v3 unknown type +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + const auto& time_type = + internal::checked_cast(*arrow_type); + if (time_type.unit() == ::arrow::TimeUnit::MICRO) { + return {}; + } + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast(expected_type); + const auto& arrow_decimal = + internal::checked_cast(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::EXTENSION) { + const auto& extension_type = + internal::checked_cast(*arrow_type); + if (extension_type.extension_name() == "arrow.uuid") { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result ProjectNested( + const Type& nested_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (!field_context_map + .emplace(field_id.value(), + FieldContext{.local_index = i, .parquet_field = parquet_field}) + .second) [[unlikely]] { + return InvalidSchema("Duplicate field id {} found in Parquet schema", + field_id.value()); + } + } + + FieldProjection result; + result.children.reserve(struct_type.fields().size()); + + for (const auto& field : struct_type.fields()) { + int32_t field_id = field.field_id(); + FieldProjection child_projection; + + if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { + const auto& parquet_field = iter->second.parquet_field; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*field.type(), parquet_field)); + if (field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectNested(*field.type(), parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared(parquet_field.column_index); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + return InvalidSchema("Missing required field with id: {}", field_id); + } + + result.children.emplace_back(std::move(child_projection)); + } + + PruneFieldProjection(result); + return result; +} + +Result ProjectList( + const ListType& list_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 1) { + return InvalidSchema("List type must have exactly one field, got {}", + parquet_fields.size()); + } + + const auto& parquet_field = parquet_fields.back(); + auto element_field_id = GetFieldId(parquet_field); + if (!element_field_id) { + return InvalidSchema("List element field missing field id"); + } + + const auto& element_field = list_type.fields().back(); + if (element_field.field_id() != element_field_id.value()) { + return InvalidSchema("List element field id mismatch, expected {}, got {}", + element_field.field_id(), element_field_id.value()); + } + + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); + + FieldProjection element_projection; + if (element_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(element_projection, + ProjectNested(*element_field.type(), parquet_field.children)); + } else { + element_projection.attributes = + std::make_shared(parquet_field.column_index); + } + + element_projection.kind = FieldProjection::Kind::kProjected; + element_projection.from = size_t{0}; + + FieldProjection result; + result.children.emplace_back(std::move(element_projection)); + return result; +} + +Result ProjectMap( + const MapType& map_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 2) { + return InvalidSchema("Map type must have exactly two fields, got {}", + parquet_fields.size()); + } + + auto key_field_id = GetFieldId(parquet_fields[0]); + if (!key_field_id) { + return InvalidSchema("Map key field missing field id"); + } + auto value_field_id = GetFieldId(parquet_fields[1]); + if (!value_field_id) { + return InvalidSchema("Map value field missing field id"); + } + + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + if (key_field.field_id() != key_field_id.value()) { + return InvalidSchema("Map key field id mismatch, expected {}, got {}", + key_field.field_id(), key_field_id.value()); + } + if (value_field.field_id() != value_field_id.value()) { + return InvalidSchema("Map value field id mismatch, expected {}, got {}", + value_field.field_id(), value_field_id.value()); + } + + FieldProjection result; + result.children.reserve(2); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + FieldProjection sub_projection; + const auto& sub_node = parquet_fields[i]; + const auto& sub_field = map_type.fields()[i]; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); + if (sub_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(sub_projection, + ProjectNested(*sub_field.type(), sub_node.children)); + } else { + sub_projection.attributes = + std::make_shared(sub_node.column_index); + } + sub_projection.kind = FieldProjection::Kind::kProjected; + sub_projection.from = i; + result.children.emplace_back(std::move(sub_projection)); + } + + return result; +} + +Result ProjectNested( + const Type& nested_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (!nested_type.is_nested()) { + return InvalidSchema("Expected a nested type, but got {}", nested_type); + } + + switch (nested_type.type_id()) { + case TypeId::kStruct: + return ProjectStruct(internal::checked_cast(nested_type), + parquet_fields); + case TypeId::kList: + return ProjectList(internal::checked_cast(nested_type), + parquet_fields); + case TypeId::kMap: + if (parquet_fields.size() != 1 || + parquet_fields[0].field->type()->id() != ::arrow::Type::STRUCT || + parquet_fields[0].children.size() != 2) { + return InvalidSchema( + "Map type must have exactly one struct field with two children"); + } + return ProjectMap(internal::checked_cast(nested_type), + parquet_fields[0].children); + default: + return InvalidSchema("Unsupported nested type: {}", nested_type); + } +} + +void CollectColumnIds(const FieldProjection& field_projection, + std::vector* column_ids) { + if (field_projection.attributes) { + const auto& attributes = internal::checked_cast( + *field_projection.attributes); + if (attributes.column_id) { + column_ids->push_back(attributes.column_id.value()); + } + } + for (const auto& child : field_projection.children) { + CollectColumnIds(child, column_ids); + } +} + +} // namespace + Result Project(const Schema& expected_schema, const ::parquet::arrow::SchemaManifest& parquet_schema) { - return NotImplemented("NYI"); + ICEBERG_ASSIGN_OR_RAISE(auto field_projection, + ProjectNested(static_cast(expected_schema), + parquet_schema.schema_fields)); + return SchemaProjection{std::move(field_projection.children)}; } -Result> SelectedColumnIndices(const SchemaProjection& projection) { - return NotImplemented("NYI"); +std::vector SelectedColumnIndices(const SchemaProjection& projection) { + std::vector column_ids; + for (const auto& field : projection.fields) { + CollectColumnIds(field, &column_ids); + } + std::ranges::sort(column_ids); + return column_ids; } bool HasFieldIds(const ::parquet::schema::NodePtr& node) { diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h index 84e71ab4f..8e06b0bcf 100644 --- a/src/iceberg/parquet/parquet_schema_util_internal.h +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -19,6 +19,8 @@ #pragma once +#include + #include #include "iceberg/schema.h" @@ -26,6 +28,15 @@ namespace iceberg::parquet { +/// \brief Parquet specific attributes for the field. +struct ParquetExtraAttributes : public FieldProjection::ExtraAttributes { + explicit ParquetExtraAttributes(int32_t column_id) : column_id(column_id) {} + ~ParquetExtraAttributes() override = default; + + /// \brief The column id of projected Parquet column. + std::optional column_id; +}; + /// \brief Project an Iceberg Schema onto a Parquet Schema. /// /// This function creates a projection from an Iceberg Schema to a Parquet schema. @@ -34,7 +45,8 @@ namespace iceberg::parquet { /// /// \param expected_schema The Iceberg Schema that defines the expected structure. /// \param parquet_schema The Parquet schema to read data from. -/// \return The schema projection result with column indices of projected Parquet columns. +/// \return The schema projection result with column indices of projected Parquet columns +/// specified via ParquetExtraAttributes. Result Project(const Schema& expected_schema, const ::parquet::arrow::SchemaManifest& parquet_schema); @@ -42,7 +54,7 @@ Result Project(const Schema& expected_schema, /// /// \param projection The schema projection result. /// \return The selected column indices. -Result> SelectedColumnIndices(const SchemaProjection& projection); +std::vector SelectedColumnIndices(const SchemaProjection& projection); /// \brief Check whether the Parquet schema has field IDs. /// diff --git a/src/iceberg/schema_util.h b/src/iceberg/schema_util.h index 8ba21a7c9..9df402336 100644 --- a/src/iceberg/schema_util.h +++ b/src/iceberg/schema_util.h @@ -70,7 +70,7 @@ struct ICEBERG_EXPORT FieldProjection { /// \brief The children of the field if it is a nested field. std::vector children; /// \brief Format-specific attributes for the field. - std::shared_ptr extra; + std::shared_ptr attributes; }; /// \brief A schema partner to carry projection information. diff --git a/test/parquet_schema_test.cc b/test/parquet_schema_test.cc index 1a280ff4d..99f0b690b 100644 --- a/test/parquet_schema_test.cc +++ b/test/parquet_schema_test.cc @@ -17,50 +17,622 @@ * under the License. */ -#include +#include +#include +#include #include #include +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/type.h" +#include "matchers.h" namespace iceberg::parquet { namespace { -::parquet::schema::NodePtr MakeInt32Node(const std::string& name, int field_id = -1) { +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +::parquet::schema::NodePtr MakeInt32Node(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::INT32, /*primitive_length=*/-1, + field_id); +} + +::parquet::schema::NodePtr MakeInt64Node(const std::string& name, int field_id = -1, + bool optional = true) { return ::parquet::schema::PrimitiveNode::Make( - name, ::parquet::Repetition::REQUIRED, ::parquet::LogicalType::None(), - ::parquet::Type::INT32, /*primitive_length=*/-1, field_id); + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::INT64, /*primitive_length=*/-1, + field_id); +} + +::parquet::schema::NodePtr MakeStringNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::String(), ::parquet::Type::BYTE_ARRAY, + /*primitive_length=*/-1, field_id); +} + +::parquet::schema::NodePtr MakeDoubleNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::DOUBLE, /*primitive_length=*/-1, + field_id); +} + +::parquet::schema::NodePtr MakeFloatNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::FLOAT, /*primitive_length=*/-1, + field_id); } ::parquet::schema::NodePtr MakeGroupNode(const std::string& name, const ::parquet::schema::NodeVector& fields, - int field_id = -1) { - return ::parquet::schema::GroupNode::Make(name, ::parquet::Repetition::REQUIRED, fields, - /*logical_type=*/nullptr, field_id); + int field_id = -1, bool optional = true) { + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + fields, /*logical_type=*/nullptr, field_id); +} + +::parquet::schema::NodePtr MakeListNode(const std::string& name, + const ::parquet::schema::NodePtr& element_node, + int field_id = -1, bool optional = true) { + auto list_group = ::parquet::schema::GroupNode::Make( + "element", ::parquet::Repetition::REPEATED, {element_node}); + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + {list_group}, ::parquet::LogicalType::List(), field_id); } +::parquet::schema::NodePtr MakeMapNode(const std::string& name, + const ::parquet::schema::NodePtr& key_node, + const ::parquet::schema::NodePtr& value_node, + int field_id = -1, bool optional = true) { + auto key_value_group = ::parquet::schema::GroupNode::Make( + "key_value", ::parquet::Repetition::REPEATED, {key_node, value_node}); + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + {key_value_group}, ::parquet::LogicalType::Map(), field_id); +} + +// Helper to create SchemaManifest from Parquet schema +::parquet::arrow::SchemaManifest MakeSchemaManifest( + const ::parquet::schema::NodePtr& parquet_schema) { + auto parquet_schema_descriptor = std::make_shared<::parquet::SchemaDescriptor>(); + parquet_schema_descriptor->Init(parquet_schema); + + auto properties = ::parquet::default_arrow_reader_properties(); + properties.set_arrow_extensions_enabled(true); + + ::parquet::arrow::SchemaManifest manifest; + auto status = ::parquet::arrow::SchemaManifest::Make(parquet_schema_descriptor.get(), + /*key_value_metadata=*/nullptr, + properties, &manifest); + if (!status.ok()) { + throw std::runtime_error("Failed to create SchemaManifest: " + status.ToString()); + } + return manifest; +} + +#define ASSERT_PROJECTED_FIELD(field_projection, index) \ + ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kProjected); \ + ASSERT_EQ(std::get<1>(field_projection.from), index); + +#define ASSERT_PROJECTED_NULL_FIELD(field_projection) \ + ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kNull); + } // namespace -TEST(HasFieldIds, PrimitiveNode) { +TEST(HasFieldIdsTest, PrimitiveNode) { EXPECT_FALSE(HasFieldIds(MakeInt32Node("test_field"))); EXPECT_TRUE(HasFieldIds(MakeInt32Node("test_field", /*field_id=*/1))); + EXPECT_FALSE(HasFieldIds(MakeInt32Node("test_field", /*field_id=*/-1))); +} + +// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) +TEST(HasFieldIdsTest, GroupNode) { + EXPECT_FALSE( + HasFieldIds(MakeGroupNode("group_without_field_id", { + MakeInt32Node("c1"), + MakeInt32Node("c2"), + }))); + EXPECT_TRUE(HasFieldIds( + MakeGroupNode("group_with_full_field_id", { + MakeInt32Node("c1", /*field_id=*/1), + MakeInt32Node("c2", /*field_id=*/2), + }))); + EXPECT_TRUE(HasFieldIds(MakeGroupNode("group_with_partial_field_id", + { + MakeInt32Node("c1", /*field_id=*/1), + MakeInt32Node("c2"), + }))); +} +// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks) + +TEST(ParquetSchemaProjectionTest, ProjectIdenticalSchemas) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "age", iceberg::int32()), + SchemaField::MakeRequired(/*field_id=*/4, "data", iceberg::float64()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("age", /*field_id=*/3), MakeDoubleNode("data", /*field_id=*/4)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + for (size_t i = 0; i < projection.fields.size(); ++i) { + ASSERT_PROJECTED_FIELD(projection.fields[i], i); + } + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1, 2, 3})); +} + +TEST(ParquetSchemaProjectionTest, ProjectSubsetSchema) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/3, "age", iceberg::int32()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("age", /*field_id=*/3), MakeDoubleNode("data", /*field_id=*/4)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMissingOptionalField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/10, "extra", iceberg::string()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[2]); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMissingRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeRequired(/*field_id=*/10, "extra", iceberg::string()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {MakeInt64Node("id", /*field_id=*/1), MakeStringNode("name", /*field_id=*/2)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Missing required field")); +} + +TEST(ParquetSchemaProjectionTest, ProjectMetadataColumn) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + MetadataColumns::kFilePath, + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt64Node("id", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0})); +} + +TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIntToLong) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt32Node("id", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); } -TEST(HasFieldIds, GroupNode) { - auto group_node_without_field_id = - MakeGroupNode("test_group", {MakeInt32Node("c1"), MakeInt32Node("c2")}); - EXPECT_FALSE(HasFieldIds(group_node_without_field_id)); +TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::float64()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeFloatNode("value", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); +} + +TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeStringNode("value", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read Iceberg type")); +} + +TEST(ParquetSchemaProjectionTest, ProjectNestedStructures) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/101, "street", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/102, "city", iceberg::string()), + })), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeInt64Node("id", /*field_id=*/1), + MakeListNode("address", MakeStringNode("street", /*field_id=*/100), + /*field_id=*/2), + MakeGroupNode("address", + {MakeStringNode("street", /*field_id=*/101), + MakeStringNode("city", /*field_id=*/102)}, + /*field_id=*/3), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(projection.fields[1].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2, 3})); +} + +TEST(ParquetSchemaProjectionTest, ProjectListType) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional( + /*field_id=*/2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeInt64Node("id", /*field_id=*/1), + MakeListNode("numbers", MakeInt32Node("element", /*field_id=*/101), + /*field_id=*/2), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[0], 0); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMapType) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/101, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/102, "value", iceberg::int32()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeMapNode("counts", + MakeStringNode("key", /*field_id=*/101, /*optional=*/false), + MakeInt32Node("value", /*field_id=*/102), /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectListOfStruct) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", + std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/104, "z", iceberg::int32()), + SchemaField::MakeOptional(/*field_id=*/102, "x", iceberg::int32()), + })))), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", + { + MakeListNode("items", + MakeGroupNode("element", + {MakeInt32Node("x", /*field_id=*/102), + MakeInt32Node("y", /*field_id=*/103), + MakeInt32Node("z", /*field_id=*/104), + MakeInt32Node("m", /*field_id=*/105)}, + /*field_id=*/101), + /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + // Verify list element struct is properly projected + ASSERT_EQ(projection.fields[0].children.size(), 1); + const auto& element_proj = projection.fields[0].children[0]; + ASSERT_EQ(element_proj.children.size(), 2); + ASSERT_PROJECTED_FIELD(element_proj.children[0], 1); + ASSERT_PROJECTED_FIELD(element_proj.children[1], 0); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2})); +} + +TEST(ParquetSchemaProjectionTest, ProjectDecimalType) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::decimal(18, 2)), + }); + + auto decimal_node = ::parquet::schema::PrimitiveNode::Make( + "value", ::parquet::Repetition::REQUIRED, ::parquet::LogicalType::Decimal(9, 2), + ::parquet::Type::FIXED_LEN_BYTE_ARRAY, /*primitive_length=*/4, /*field_id=*/1); + auto parquet_schema = MakeGroupNode("iceberg_schema", {decimal_node}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); +} + +TEST(ParquetSchemaProjectionTest, ProjectDecimalIncompatible) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::decimal(18, 3)), + }); + + auto decimal_node = ::parquet::schema::PrimitiveNode::Make( + "value", ::parquet::Repetition::REQUIRED, ::parquet::LogicalType::Decimal(9, 2), + ::parquet::Type::FIXED_LEN_BYTE_ARRAY, /*primitive_length=*/4, /*field_id=*/1); + auto parquet_schema = MakeGroupNode("iceberg_schema", {decimal_node}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); +} + +TEST(ParquetSchemaProjectionTest, ProjectDuplicateFieldIds) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", { + MakeInt64Node("id", /*field_id=*/1), + MakeStringNode("name", /*field_id=*/1) // Duplicate field ID + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Duplicate field id")); +} + +TEST(ParquetSchemaProjectionTest, ProjectPrimitiveType) { + struct TestCase { + std::shared_ptr iceberg_type; + ::parquet::Type::type parquet_type; + std::shared_ptr parquet_logical_type; + int32_t primitive_length = -1; + }; + + std::vector test_cases = { + TestCase{.iceberg_type = float64(), .parquet_type = ::parquet::Type::DOUBLE}, + TestCase{.iceberg_type = float32(), .parquet_type = ::parquet::Type::FLOAT}, + TestCase{.iceberg_type = int64(), .parquet_type = ::parquet::Type::INT64}, + TestCase{.iceberg_type = int32(), .parquet_type = ::parquet::Type::INT32}, + TestCase{.iceberg_type = string(), + .parquet_type = ::parquet::Type::BYTE_ARRAY, + .parquet_logical_type = ::parquet::LogicalType::String()}, + TestCase{.iceberg_type = binary(), .parquet_type = ::parquet::Type::BYTE_ARRAY}, + TestCase{.iceberg_type = boolean(), .parquet_type = ::parquet::Type::BOOLEAN}, + TestCase{.iceberg_type = date(), + .parquet_type = ::parquet::Type::INT32, + .parquet_logical_type = ::parquet::LogicalType::Date()}, + TestCase{ + .iceberg_type = time(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Time( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::MICROS)}, + TestCase{ + .iceberg_type = timestamp(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/false, ::parquet::LogicalType::TimeUnit::MICROS)}, + TestCase{ + .iceberg_type = timestamp_tz(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::MICROS)}, + TestCase{.iceberg_type = decimal(4, 2), + .parquet_type = ::parquet::Type::INT32, + .parquet_logical_type = ::parquet::LogicalType::Decimal(4, 2)}, + TestCase{.iceberg_type = decimal(38, 18), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .parquet_logical_type = ::parquet::LogicalType::Decimal(38, 18), + .primitive_length = 16}, + TestCase{.iceberg_type = uuid(), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .parquet_logical_type = ::parquet::LogicalType::UUID(), + .primitive_length = 16}, + TestCase{.iceberg_type = fixed(8), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .primitive_length = 8}}; + + for (const auto& test_case : test_cases) { + Schema expected_schema({SchemaField::MakeRequired(/*field_id=*/1, "test_field", + test_case.iceberg_type)}); + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {::parquet::schema::PrimitiveNode::Make( + "test_field", ::parquet::Repetition::REQUIRED, test_case.parquet_logical_type, + test_case.parquet_type, test_case.primitive_length, + /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + } +} + +TEST(ParquetSchemaProjectionTest, UnsuportedProjection) { + struct TestCase { + std::shared_ptr iceberg_type; + ::parquet::Type::type parquet_type; + std::shared_ptr parquet_logical_type; + int32_t primitive_length = -1; + }; + + std::vector test_cases = { + TestCase{.iceberg_type = float32(), .parquet_type = ::parquet::Type::DOUBLE}, + TestCase{.iceberg_type = int32(), .parquet_type = ::parquet::Type::INT64}, + TestCase{.iceberg_type = date(), .parquet_type = ::parquet::Type::INT32}, + TestCase{.iceberg_type = time(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Time( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::NANOS)}, + TestCase{ + .iceberg_type = timestamp(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/false, ::parquet::LogicalType::TimeUnit::NANOS)}, + TestCase{.iceberg_type = timestamp_tz(), + .parquet_type = ::parquet::Type::INT64, + .parquet_logical_type = ::parquet::LogicalType::Timestamp( + /*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::NANOS)}, + TestCase{.iceberg_type = decimal(4, 2), + .parquet_type = ::parquet::Type::INT32, + .parquet_logical_type = ::parquet::LogicalType::Decimal(4, 1)}, + TestCase{.iceberg_type = fixed(8), + .parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY, + .primitive_length = 4}}; - auto group_node_with_full_field_id = MakeGroupNode( - "test_group", - {MakeInt32Node("c1", /*field_id=*/2), MakeInt32Node("c2", /*field_id=*/3)}, - /*field_id=*/1); - EXPECT_TRUE(HasFieldIds(group_node_with_full_field_id)); + for (const auto& test_case : test_cases) { + Schema expected_schema({SchemaField::MakeRequired(/*field_id=*/1, "test_field", + test_case.iceberg_type)}); + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + {::parquet::schema::PrimitiveNode::Make( + "test_field", ::parquet::Repetition::REQUIRED, test_case.parquet_logical_type, + test_case.parquet_type, test_case.primitive_length, + /*field_id=*/1)}); - auto group_node_with_partial_field_id = MakeGroupNode( - "test_group", {MakeInt32Node("c1", /*field_id=*/1), MakeInt32Node("c2")}); - EXPECT_TRUE(HasFieldIds(group_node_with_partial_field_id)); + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); + } } } // namespace iceberg::parquet