diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 6237b00be..c822daa5b 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -17,15 +17,266 @@ * under the License. */ +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/parquet/parquet_data_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_util.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +// Forward declaration +Result> ProjectNestedArray( + const std::shared_ptr<::arrow::Array>& array, + const std::shared_ptr<::arrow::DataType>& output_arrow_type, + const NestedType& nested_type, std::span projections, + ::arrow::MemoryPool* pool); + +/// \brief Create a null array of the given type and length. +Result> MakeNullArray( + const std::shared_ptr<::arrow::DataType>& type, int64_t length, + ::arrow::MemoryPool* pool) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type, pool)); + ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish()); + return array; +} + +Result> ProjectPrimitiveArray( + const std::shared_ptr<::arrow::Array>& array, + const std::shared_ptr<::arrow::DataType>& output_arrow_type, + ::arrow::MemoryPool* pool) { + if (array->type()->Equals(output_arrow_type)) { + return array; + } + + // Use Arrow compute cast function for type conversions. + // Note: We don't check the schema evolution rule again because projecting schemas + // has checked this. + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result, + ::arrow::compute::Cast(array, output_arrow_type)); + return cast_result.make_array(); +} + +Result> ProjectStructArray( + const std::shared_ptr<::arrow::StructArray>& struct_array, + const std::shared_ptr<::arrow::StructType>& output_struct_type, + const StructType& struct_type, std::span projections, + ::arrow::MemoryPool* pool) { + if (struct_type.fields().size() != projections.size()) { + return InvalidSchema( + "Inconsistent number of fields ({}) and number of projections ({})", + struct_type.fields().size(), projections.size()); + } + if (struct_type.fields().size() != output_struct_type->num_fields()) { + return InvalidSchema( + "Inconsistent number of fields ({}) and number of output fields ({})", + struct_type.fields().size(), output_struct_type->num_fields()); + } + + std::vector> projected_arrays; + projected_arrays.reserve(projections.size()); + + for (size_t i = 0; i < projections.size(); ++i) { + const auto& projected_field = struct_type.fields()[i]; + const auto& field_projection = projections[i]; + const auto& output_arrow_type = output_struct_type->fields()[i]->type(); + + std::shared_ptr<::arrow::Array> projected_array; + + if (field_projection.kind == FieldProjection::Kind::kProjected) { + auto parquet_field_index = + static_cast(std::get(field_projection.from)); + if (parquet_field_index >= struct_array->num_fields()) { + return InvalidArgument("Parquet field index {} out of bound {}", + parquet_field_index, struct_array->num_fields()); + } + const auto& parquet_array = struct_array->field(parquet_field_index); + if (projected_field.type()->is_nested()) { + const auto& nested_type = + internal::checked_cast(*projected_field.type()); + ICEBERG_ASSIGN_OR_RAISE( + projected_array, + ProjectNestedArray(parquet_array, output_arrow_type, nested_type, + field_projection.children, pool)); + } else { + ICEBERG_ASSIGN_OR_RAISE( + projected_array, + ProjectPrimitiveArray(parquet_array, output_arrow_type, pool)); + } + } else if (field_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_array, + MakeNullArray(output_arrow_type, struct_array->length(), pool)); + } else { + return NotImplemented("Unsupported field projection kind: {}", + ToString(field_projection.kind)); + } + + projected_arrays.emplace_back(std::move(projected_array)); + } + + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto output_array, + ::arrow::StructArray::Make(projected_arrays, output_struct_type->fields(), + struct_array->null_bitmap(), struct_array->null_count(), + struct_array->offset())); + return output_array; +} + +/// FIXME: Support ::arrow::LargeListArray. +Result> ProjectListArray( + const std::shared_ptr<::arrow::ListArray>& list_array, + const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type, + std::span projections, ::arrow::MemoryPool* pool) { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", projections.size()); + } + + const auto& element_field = list_type.fields().back(); + const auto& element_projection = projections[0]; + const auto& output_element_type = output_list_type->value_type(); + + std::shared_ptr<::arrow::Array> projected_values; + if (element_field.type()->is_nested()) { + const auto& nested_type = + internal::checked_cast(*element_field.type()); + ICEBERG_ASSIGN_OR_RAISE( + projected_values, + ProjectNestedArray(list_array->values(), output_element_type, nested_type, + element_projection.children, pool)); + } else { + ICEBERG_ASSIGN_OR_RAISE( + projected_values, + ProjectPrimitiveArray(list_array->values(), output_element_type, pool)); + } + + return std::make_shared<::arrow::ListArray>( + output_list_type, list_array->length(), list_array->value_offsets(), + std::move(projected_values), list_array->null_bitmap(), list_array->null_count(), + list_array->offset()); +} + +Result> ProjectMapArray( + const std::shared_ptr<::arrow::MapArray>& map_array, + const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type, + std::span projections, ::arrow::MemoryPool* pool) { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", projections.size()); + } + + const auto& key_projection = projections[0]; + const auto& value_projection = projections[1]; + const auto& key_type = map_type.key().type(); + const auto& value_type = map_type.value().type(); + + // Project keys + std::shared_ptr<::arrow::Array> projected_keys; + if (key_type->is_nested()) { + const auto& nested_type = internal::checked_cast(*key_type); + ICEBERG_ASSIGN_OR_RAISE( + projected_keys, ProjectNestedArray(map_array->keys(), output_map_type->key_type(), + nested_type, key_projection.children, pool)); + } else { + ICEBERG_ASSIGN_OR_RAISE( + projected_keys, + ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(), pool)); + } + + // Project values + std::shared_ptr<::arrow::Array> projected_items; + if (value_type->is_nested()) { + const auto& nested_type = internal::checked_cast(*value_type); + ICEBERG_ASSIGN_OR_RAISE( + projected_items, + ProjectNestedArray(map_array->items(), output_map_type->item_type(), nested_type, + value_projection.children, pool)); + } else { + ICEBERG_ASSIGN_OR_RAISE( + projected_items, + ProjectPrimitiveArray(map_array->items(), output_map_type->item_type(), pool)); + } + + return std::make_shared<::arrow::MapArray>( + output_map_type, map_array->length(), map_array->value_offsets(), + std::move(projected_keys), std::move(projected_items), map_array->null_bitmap(), + map_array->null_count(), map_array->offset()); +} + +Result> ProjectNestedArray( + const std::shared_ptr<::arrow::Array>& array, + const std::shared_ptr<::arrow::DataType>& output_arrow_type, + const NestedType& nested_type, std::span projections, + ::arrow::MemoryPool* pool) { + switch (nested_type.type_id()) { + case TypeId::kStruct: { + if (output_arrow_type->id() != ::arrow::Type::STRUCT) { + return InvalidSchema("Expected struct type, got: {}", + output_arrow_type->ToString()); + } + auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(array); + auto output_struct_type = + internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type); + const auto& struct_type = internal::checked_cast(nested_type); + return ProjectStructArray(struct_array, output_struct_type, struct_type, + projections, pool); + } + case TypeId::kList: { + if (output_arrow_type->id() != ::arrow::Type::LIST) { + return InvalidSchema("Expected list type, got: {}", + output_arrow_type->ToString()); + } + + auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array); + auto output_list_type = + internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type); + const auto& list_type = internal::checked_cast(nested_type); + return ProjectListArray(list_array, output_list_type, list_type, projections, pool); + } + case TypeId::kMap: { + if (output_arrow_type->id() != ::arrow::Type::MAP) { + return InvalidSchema("Expected map type, got: {}", output_arrow_type->ToString()); + } + + auto map_array = internal::checked_pointer_cast<::arrow::MapArray>(array); + auto output_map_type = + internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type); + const auto& map_type = internal::checked_cast(nested_type); + return ProjectMapArray(map_array, output_map_type, map_type, projections, pool); + } + default: + return InvalidSchema("Cannot project array of unsupported nested type: {}", + nested_type.ToString()); + } +} + +} // namespace + Result> ProjectRecordBatch( std::shared_ptr<::arrow::RecordBatch> record_batch, const std::shared_ptr<::arrow::Schema>& output_arrow_schema, - const Schema& projected_schema, const SchemaProjection& projection) { - return NotImplemented("NYI"); + const Schema& projected_schema, const SchemaProjection& projection, + ::arrow::MemoryPool* pool) { + auto array = std::make_shared<::arrow::StructArray>( + ::arrow::struct_(record_batch->schema()->fields()), record_batch->num_rows(), + record_batch->columns()); + ICEBERG_ASSIGN_OR_RAISE( + auto output_array, + ProjectNestedArray(array, ::arrow::struct_(output_arrow_schema->fields()), + projected_schema, projection.fields, pool)); + auto* struct_array = internal::checked_cast<::arrow::StructArray*>(output_array.get()); + return ::arrow::RecordBatch::Make(output_arrow_schema, record_batch->num_rows(), + struct_array->fields()); } } // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_data_util_internal.h b/src/iceberg/parquet/parquet_data_util_internal.h index c222d7497..4a4f88138 100644 --- a/src/iceberg/parquet/parquet_data_util_internal.h +++ b/src/iceberg/parquet/parquet_data_util_internal.h @@ -19,12 +19,9 @@ #pragma once -#include "iceberg/schema_util.h" +#include -namespace arrow { -class RecordBatch; -class Schema; -} // namespace arrow +#include "iceberg/schema_util.h" namespace iceberg::parquet { @@ -34,10 +31,12 @@ namespace iceberg::parquet { /// \param output_arrow_schema The Arrow schema to convert to. /// \param projected_schema The projected Iceberg schema. /// \param projection The projection from projected Iceberg schema to the record batch. +/// \param pool The arrow memory pool. /// \return The converted record batch. Result> ProjectRecordBatch( std::shared_ptr<::arrow::RecordBatch> record_batch, const std::shared_ptr<::arrow::Schema>& output_arrow_schema, - const Schema& projected_schema, const SchemaProjection& projection); + const Schema& projected_schema, const SchemaProjection& projection, + ::arrow::MemoryPool* pool); } // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 6903310ad..3f1598658 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -117,11 +117,8 @@ class ParquetReader::Impl { split_ = options.split; read_schema_ = options.projection; - // TODO(gangwu): make memory pool configurable - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(); - // Prepare reader properties - ::parquet::ReaderProperties reader_properties(pool); + ::parquet::ReaderProperties reader_properties(pool_); ::parquet::ArrowReaderProperties arrow_reader_properties; arrow_reader_properties.set_batch_size(options.batch_size); arrow_reader_properties.set_arrow_extensions_enabled(true); @@ -131,7 +128,7 @@ class ParquetReader::Impl { auto file_reader = ::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties); ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make( - pool, std::move(file_reader), arrow_reader_properties, &reader_)); + pool_, std::move(file_reader), arrow_reader_properties, &reader_)); // Project read schema onto the Parquet file schema ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_)); @@ -152,7 +149,7 @@ class ParquetReader::Impl { ICEBERG_ASSIGN_OR_RAISE( batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_, - *read_schema_, projection_)); + *read_schema_, projection_, pool_)); ArrowArray arrow_array; ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array)); @@ -227,6 +224,8 @@ class ParquetReader::Impl { } private: + // TODO(gangwu): make memory pool configurable + ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); // The split to read from the Parquet file. std::optional split_; // Schema to read from the Parquet file. diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 091fa292d..be4afcb1c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -117,5 +117,9 @@ if(ICEBERG_BUILD_BUNDLE) test_common.cc in_memory_catalog_test.cc) - add_iceberg_test(parquet_test USE_BUNDLE SOURCES parquet_schema_test.cc) + add_iceberg_test(parquet_test + USE_BUNDLE + SOURCES + parquet_data_test.cc + parquet_schema_test.cc) endif() diff --git a/test/parquet_data_test.cc b/test/parquet_data_test.cc new file mode 100644 index 000000000..bc8f421c3 --- /dev/null +++ b/test/parquet_data_test.cc @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/parquet/parquet_data_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/schema_util.h" +#include "iceberg/type.h" +#include "matchers.h" + +namespace iceberg::parquet { + +struct ProjectRecordBatchParam { + std::string name; + std::shared_ptr projected_type; + std::shared_ptr source_type; + std::string input_json; + std::string expected_json; +}; + +std::shared_ptr<::arrow::RecordBatch> RecordBatchFromJson( + const std::shared_ptr<::arrow::Schema>& schema, const std::string& json_data) { + auto struct_type = ::arrow::struct_(schema->fields()); + auto array = ::arrow::json::ArrayFromJSONString(struct_type, json_data).ValueOrDie(); + auto struct_array = std::static_pointer_cast<::arrow::StructArray>(array); + return ::arrow::RecordBatch::Make(schema, struct_array->length(), + struct_array->fields()); +} + +void VerifyProjectRecordBatch(const Schema& projected_schema, const Schema& source_schema, + const std::string& input_json, + const std::string& expected_json) { + auto schema_projection_result = + Project(projected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(schema_projection_result, IsOk()); + auto schema_projection = std::move(schema_projection_result.value()); + + ArrowSchema source_arrow_c_schema; + ASSERT_THAT(ToArrowSchema(source_schema, &source_arrow_c_schema), IsOk()); + auto source_arrow_schema = ::arrow::ImportSchema(&source_arrow_c_schema).ValueOrDie(); + auto input_record_batch = RecordBatchFromJson(source_arrow_schema, input_json); + + ArrowSchema projected_arrow_c_schema; + ASSERT_THAT(ToArrowSchema(projected_schema, &projected_arrow_c_schema), IsOk()); + auto projected_arrow_schema = + ::arrow::ImportSchema(&projected_arrow_c_schema).ValueOrDie(); + + auto project_result = + ProjectRecordBatch(input_record_batch, projected_arrow_schema, projected_schema, + schema_projection, ::arrow::default_memory_pool()); + ASSERT_THAT(project_result, IsOk()); + auto projected_record_batch = std::move(project_result.value()); + + auto expected_record_batch = RecordBatchFromJson(projected_arrow_schema, expected_json); + ASSERT_TRUE(projected_record_batch->Equals(*expected_record_batch)) + << "projected_record_batch: " << projected_record_batch->ToString() + << "\nexpected_record_batch: " << expected_record_batch->ToString(); +} + +class ProjectRecordBatchTest : public ::testing::TestWithParam { +}; + +TEST_P(ProjectRecordBatchTest, PrimitiveType) { + const auto& test_case = GetParam(); + + Schema projected_schema({SchemaField::MakeRequired( + /*field_id=*/1, /*name=*/"a", test_case.projected_type)}); + Schema source_schema({SchemaField::MakeRequired( + /*field_id=*/1, /*name=*/"a", test_case.source_type)}); + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch( + projected_schema, source_schema, test_case.input_json, test_case.expected_json)); +} + +const std::vector kPrimitiveTestCases = { + { + .name = "Boolean", + .projected_type = boolean(), + .source_type = boolean(), + .input_json = R"([{"a": true}, {"a": false}, {"a": true}])", + .expected_json = R"([{"a": true}, {"a": false}, {"a": true}])", + }, + { + .name = "Int", + .projected_type = int32(), + .source_type = int32(), + .input_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", + .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", + }, + { + .name = "Long", + .projected_type = int64(), + .source_type = int64(), + .input_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", + .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", + }, + { + .name = "Float", + .projected_type = float32(), + .source_type = float32(), + .input_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])", + .expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])", + }, + { + .name = "Double", + .projected_type = float64(), + .source_type = float64(), + .input_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])", + .expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])", + }, + { + .name = "String", + .projected_type = string(), + .source_type = string(), + .input_json = + R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])", + .expected_json = + R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])", + }, + { + .name = "Binary", + .projected_type = binary(), + .source_type = binary(), + .input_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])", + .expected_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])", + }, + { + .name = "Fixed", + .projected_type = fixed(4), + .source_type = fixed(4), + .input_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])", + .expected_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])", + }, + { + .name = "Decimal", + .projected_type = decimal(10, 2), + .source_type = decimal(10, 2), + .input_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", + .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", + }, + { + .name = "Date", + .projected_type = date(), + .source_type = date(), + .input_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])", + .expected_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])", + }, + { + .name = "Time", + .projected_type = time(), + .source_type = time(), + .input_json = R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])", + .expected_json = + R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])", + }, + { + .name = "Timestamp", + .projected_type = timestamp(), + .source_type = timestamp(), + .input_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", + .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", + }, + { + .name = "TimestampTz", + .projected_type = timestamp_tz(), + .source_type = timestamp_tz(), + .input_json = + R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 1672531202000000}])", + .expected_json = + R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 1672531202000000}])", + }, + { + .name = "IntToLongPromotion", + .projected_type = int64(), + .source_type = int32(), + .input_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", + .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", + }, + { + .name = "FloatToDoublePromotion", + .projected_type = float64(), + .source_type = float32(), + .input_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])", + .expected_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])", + }, + { + .name = "DecimalPrecisionPromotion", + .projected_type = decimal(10, 2), + .source_type = decimal(6, 2), + .input_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", + .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", + }, +}; + +INSTANTIATE_TEST_SUITE_P( + AllPrimitiveTypes, ProjectRecordBatchTest, ::testing::ValuesIn(kPrimitiveTestCases), + [](const ::testing::TestParamInfo& info) { + return info.param.name; + }); + +TEST(ProjectRecordBatchTest, StructWithTwoFields) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string()), + }); + + const std::string input_json = R"([{"id": 42, "name": "test"}])"; + const std::string expected_json = R"([{"id": 42, "name": "test"}])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, NestedStruct) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + SchemaField::MakeRequired(4, "age", int32()), + })), + }); + + const std::string input_json = R"([ + {"id": 1, "person": {"name": "Person0", "age": 25}}, + {"id": 2, "person": {"name": "Person1", "age": 26}} + ])"; + + ASSERT_NO_FATAL_FAILURE( + VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); +} + +TEST(ProjectRecordBatchTest, ListOfIntegers) { + Schema iceberg_schema({ + SchemaField::MakeRequired( + 1, "numbers", + std::make_shared(SchemaField::MakeRequired(2, "element", int32()))), + }); + + const std::string input_json = R"([ + {"numbers": [0, 1, 2]}, + {"numbers": [10, 11, 12]} + ])"; + + ASSERT_NO_FATAL_FAILURE( + VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); +} + +TEST(ProjectRecordBatchTest, ListOfStructs) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "people", + std::make_shared(SchemaField::MakeRequired( + 2, "element", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + SchemaField::MakeRequired(4, "age", int32()), + })))), + }); + + const std::string input_json = R"([ + {"people": [ + {"name": "Person0_0", "age": 20}, + {"name": "Person0_1", "age": 21} + ]}, + {"people": [ + {"name": "Person1_0", "age": 30}, + {"name": "Person1_1", "age": 31} + ]} + ])"; + + ASSERT_NO_FATAL_FAILURE( + VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); +} + +TEST(ProjectRecordBatchTest, MapStringToInt) { + Schema iceberg_schema({ + SchemaField::MakeRequired( + 1, "scores", + std::make_shared(SchemaField::MakeRequired(2, "key", string()), + SchemaField::MakeRequired(3, "value", int32()))), + }); + + const std::string input_json = R"([ + {"scores": [["score_0", 100], ["score_1", 105]]}, + {"scores": [["score_2", 110], ["score_3", 115]]} + ])"; + + ASSERT_NO_FATAL_FAILURE( + VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); +} + +TEST(ProjectRecordBatchTest, MapStringToStruct) { + Schema iceberg_schema({ + SchemaField::MakeRequired( + 1, "users", + std::make_shared( + SchemaField::MakeRequired(2, "key", string()), + SchemaField::MakeRequired( + 3, "value", + std::make_shared(std::vector{ + SchemaField::MakeRequired(4, "id", int32()), + SchemaField::MakeRequired(5, "email", string()), + })))), + }); + + const std::string input_json = R"([ + {"users": [["user_0", {"id": 1000, "email": "user0@example.com"}]]}, + {"users": [["user_1", {"id": 1001, "email": "user1@example.com"}]]} + ])"; + + ASSERT_NO_FATAL_FAILURE( + VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); +} + +TEST(ProjectRecordBatchTest, StructWithMissingOptionalField) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "age", int32()), // Missing in source + SchemaField::MakeOptional(4, "email", string()), // Missing in source + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string()), + }); + + const std::string input_json = R"([ + {"id": 1, "name": "Person0"}, + {"id": 2, "name": "Person1"} + ])"; + const std::string expected_json = R"([ + {"id": 1, "name": "Person0", "age": null, "email": null}, + {"id": 2, "name": "Person1", "age": null, "email": null} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, NestedStructWithMissingOptionalFields) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + SchemaField::MakeOptional(4, "age", int32()), // Missing + SchemaField::MakeOptional(5, "phone", string()), // Missing + })), + SchemaField::MakeOptional(6, "department", string()), // Missing + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + })), + }); + + const std::string input_json = R"([ + {"id": 100, "person": {"name": "Employee0"}}, + {"id": 101, "person": {"name": "Employee1"}} + ])"; + const std::string expected_json = R"([ + {"id": 100, "person": {"name": "Employee0", "age": null, "phone": null}, "department": null}, + {"id": 101, "person": {"name": "Employee1", "age": null, "phone": null}, "department": null} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, ListWithMissingOptionalElementFields) { + Schema projected_schema({ + SchemaField::MakeRequired( + 1, "people", + std::make_shared(SchemaField::MakeRequired( + 2, "element", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + SchemaField::MakeOptional(4, "age", int32()), // Missing in source + SchemaField::MakeOptional(5, "email", string()), // Missing in source + })))), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "people", + std::make_shared(SchemaField::MakeRequired( + 2, "element", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + })))), + }); + + const std::string input_json = R"([ + {"people": [ + {"name": "Person0_0"}, + {"name": "Person0_1"} + ]}, + {"people": [ + {"name": "Person1_0"}, + {"name": "Person1_1"} + ]} + ])"; + const std::string expected_json = R"([ + {"people": [ + {"name": "Person0_0", "age": null, "email": null}, + {"name": "Person0_1", "age": null, "email": null} + ]}, + {"people": [ + {"name": "Person1_0", "age": null, "email": null}, + {"name": "Person1_1", "age": null, "email": null} + ]} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, FieldReordering) { + Schema projected_schema({ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeRequired(1, "id", int32()), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string()), + }); + + const std::string input_json = R"([ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"} + ])"; + const std::string expected_json = R"([ + {"name": "Alice", "id": 1}, + {"name": "Bob", "id": 2} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, FieldSubset) { + Schema projected_schema({ + SchemaField::MakeRequired(2, "name", string()), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeRequired(3, "age", int32()), + }); + + const std::string input_json = R"([ + {"id": 1, "name": "Alice", "age": 25}, + {"id": 2, "name": "Bob", "age": 30} + ])"; + const std::string expected_json = R"([ + {"name": "Alice"}, + {"name": "Bob"} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, EmptyRecordBatch) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string()), + }); + + const std::string input_json = R"([])"; + + ASSERT_NO_FATAL_FAILURE( + VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); +} + +} // namespace iceberg::parquet