diff --git a/.gitignore b/.gitignore index 2b7154b0d..74df5bd65 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ cmake-build-release/ # intellij files .idea + +# vscode files +.vscode diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index bccd4d9d7..68cbd35c6 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -115,7 +115,8 @@ if(ICEBERG_BUILD_BUNDLE) parquet/parquet_data_util.cc parquet/parquet_reader.cc parquet/parquet_register.cc - parquet/parquet_schema_util.cc) + parquet/parquet_schema_util.cc + parquet/parquet_writer.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index cf64892f5..a19b2f992 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -30,6 +30,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { switch (status.code()) { case ::arrow::StatusCode::IOError: return ErrorKind::kIOError; + case ::arrow::StatusCode::NotImplemented: + return ErrorKind::kNotImplemented; default: return ErrorKind::kUnknownError; } diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h new file mode 100644 index 000000000..61d10e2d6 --- /dev/null +++ b/src/iceberg/constants.h @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +namespace iceberg { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +} // namespace iceberg diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 405b09f03..4c86802b3 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -125,9 +125,9 @@ class ParquetReader::Impl { arrow_reader_properties.set_arrow_extensions_enabled(true); // Open the Parquet file reader - ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options)); + ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options)); auto file_reader = - ::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties); + ::parquet::ParquetFileReader::Open(input_stream_, reader_properties); ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make( pool_, std::move(file_reader), arrow_reader_properties, &reader_)); @@ -169,6 +169,7 @@ class ParquetReader::Impl { } reader_.reset(); + ICEBERG_ARROW_RETURN_NOT_OK(input_stream_->Close()); return {}; } @@ -236,6 +237,8 @@ class ParquetReader::Impl { std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. SchemaProjection projection_; + // The input stream to read Parquet file. + std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_; // Parquet file reader to create RecordBatchReader. std::unique_ptr<::parquet::arrow::FileReader> reader_; // The context to keep track of the reading progress. diff --git a/src/iceberg/parquet/parquet_register.cc b/src/iceberg/parquet/parquet_register.cc index 19988cd29..d79e158c1 100644 --- a/src/iceberg/parquet/parquet_register.cc +++ b/src/iceberg/parquet/parquet_register.cc @@ -21,8 +21,6 @@ namespace iceberg::parquet { -void RegisterWriter() {} - void RegisterAll() { RegisterReader(); RegisterWriter(); diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 361489973..088c15c04 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -26,6 +26,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" #include "iceberg/result.h" @@ -38,8 +39,6 @@ namespace iceberg::parquet { namespace { -constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; - std::optional FieldIdFromMetadata( const std::shared_ptr& metadata) { if (!metadata) { diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc new file mode 100644 index 000000000..2d2cd5406 --- /dev/null +++ b/src/iceberg/parquet/parquet_writer.cc @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_writer.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::parquet { + +namespace { + +Result> OpenOutputStream( + const WriterOptions& options) { + auto io = internal::checked_pointer_cast(options.io); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + return output; +} + +} // namespace + +class ParquetWriter::Impl { + public: + Status Open(const WriterOptions& options) { + auto writer_properties = + ::parquet::WriterProperties::Builder().memory_pool(pool_)->build(); + auto arrow_writer_properties = ::parquet::default_arrow_writer_properties(); + + ArrowSchema c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema)); + + std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor; + ICEBERG_ARROW_RETURN_NOT_OK( + ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties, + *arrow_writer_properties, &schema_descriptor)); + auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( + schema_descriptor->schema_root()); + + ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); + auto file_writer = ::parquet::ParquetFileWriter::Open( + output_stream_, std::move(schema_node), std::move(writer_properties)); + ICEBERG_ARROW_RETURN_NOT_OK( + ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_, + std::move(arrow_writer_properties), &writer_)); + + return {}; + } + + Status Write(ArrowArray array) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, + ::arrow::ImportRecordBatch(&array, arrow_schema_)); + + ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch)); + + return {}; + } + + // Close the writer and release resources + Status Close() { + if (writer_ == nullptr) { + return {}; // Already closed + } + + ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); + auto& metadata = writer_->metadata(); + split_offsets_.reserve(metadata->num_row_groups()); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + split_offsets_.push_back(metadata->RowGroup(i)->file_offset()); + } + writer_.reset(); + + ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell()); + ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close()); + return {}; + } + + bool Closed() const { return writer_ == nullptr; } + + int64_t length() const { return total_bytes_; } + + std::vector split_offsets() const { return split_offsets_; } + + private: + // TODO(gangwu): make memory pool configurable + ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); + // Schema to write from the Parquet file. + std::shared_ptr<::arrow::Schema> arrow_schema_; + // The output stream to write Parquet file. + std::shared_ptr<::arrow::io::OutputStream> output_stream_; + // Parquet file writer to write ArrowArray. + std::unique_ptr<::parquet::arrow::FileWriter> writer_; + // Total length of the written Parquet file. + int64_t total_bytes_{0}; + // Row group start offsets in the Parquet file. + std::vector split_offsets_; +}; + +ParquetWriter::~ParquetWriter() = default; + +Status ParquetWriter::Open(const WriterOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); } + +Status ParquetWriter::Close() { return impl_->Close(); } + +std::optional ParquetWriter::metrics() { + if (!impl_->Closed()) { + return std::nullopt; + } + return {}; +} + +std::optional ParquetWriter::length() { + if (!impl_->Closed()) { + return std::nullopt; + } + return impl_->length(); +} + +std::vector ParquetWriter::split_offsets() { + if (!impl_->Closed()) { + return {}; + } + return impl_->split_offsets(); +} + +void RegisterWriter() { + static WriterFactoryRegistry parquet_writer_register( + FileFormatType::kParquet, []() -> Result> { + return std::make_unique(); + }); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_writer.h b/src/iceberg/parquet/parquet_writer.h new file mode 100644 index 000000000..5371f3810 --- /dev/null +++ b/src/iceberg/parquet/parquet_writer.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_writer.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::parquet { + +/// \brief A writer that writes ArrowArray to Parquet files. +class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer { + public: + ParquetWriter() = default; + + ~ParquetWriter() override; + + Status Open(const WriterOptions& options) final; + + Status Close() final; + + Status Write(ArrowArray array) final; + + std::optional metrics() final; + + std::optional length() final; + + std::vector split_offsets() final; + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::parquet diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index 124c595d5..beb973b28 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -24,6 +24,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/schema.h" #include "iceberg/type.h" #include "iceberg/util/macros.h" @@ -45,7 +46,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr)); if (field_id.has_value()) { NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend( - &metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()), + &metadata_buffer, ArrowCharView(std::string(kParquetFieldIdKey).c_str()), ArrowCharView(std::to_string(field_id.value()).c_str()))); } @@ -185,8 +186,8 @@ int32_t GetFieldId(const ArrowSchema& schema) { return kUnknownFieldId; } - ArrowStringView field_id_key{.data = kFieldIdKey.data(), - .size_bytes = kFieldIdKey.size()}; + ArrowStringView field_id_key{.data = kParquetFieldIdKey.data(), + .size_bytes = kParquetFieldIdKey.size()}; ArrowStringView field_id_value; if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) != NANOARROW_OK) { diff --git a/src/iceberg/schema_internal.h b/src/iceberg/schema_internal.h index 1a31e68f2..8b290852a 100644 --- a/src/iceberg/schema_internal.h +++ b/src/iceberg/schema_internal.h @@ -30,11 +30,6 @@ namespace iceberg { -// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet. -// Here we follow a similar convention for Iceberg but we might also add -// "PARQUET:field_id" in the future once we implement a Parquet writer. -constexpr std::string_view kFieldIdKey = "ICEBERG:field_id"; - /// \brief Convert an Iceberg schema to an Arrow schema. /// /// \param[in] schema The Iceberg schema to convert. diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index e66f96daf..f77b76173 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -319,4 +319,16 @@ std::shared_ptr fixed(int32_t length) { return std::make_shared(length); } +std::shared_ptr map(SchemaField key, SchemaField value) { + return std::make_shared(key, value); +} + +std::shared_ptr list(SchemaField element) { + return std::make_shared(std::move(element)); +} + +std::shared_ptr struct_(std::vector fields) { + return std::make_shared(std::move(fields)); +} + } // namespace iceberg diff --git a/src/iceberg/type.h b/src/iceberg/type.h index 78c0141b1..d49f19fcd 100644 --- a/src/iceberg/type.h +++ b/src/iceberg/type.h @@ -487,6 +487,22 @@ ICEBERG_EXPORT std::shared_ptr decimal(int32_t precision, int32_t s /// \return A shared pointer to the FixedType instance. ICEBERG_EXPORT std::shared_ptr fixed(int32_t length); +/// \brief Create a StructType with the given fields. +/// \param fields The fields of the struct. +/// \return A shared pointer to the StructType instance. +ICEBERG_EXPORT std::shared_ptr struct_(std::vector fields); + +/// \brief Create a ListType with the given element field. +/// \param element The element field of the list. +/// \return A shared pointer to the ListType instance. +ICEBERG_EXPORT std::shared_ptr list(SchemaField element); + +/// \brief Create a MapType with the given key and value fields. +/// \param key The key field of the map. +/// \param value The value field of the map. +/// \return A shared pointer to the MapType instance. +ICEBERG_EXPORT std::shared_ptr map(SchemaField key, SchemaField value); + /// @} } // namespace iceberg diff --git a/test/arrow_test.cc b/test/arrow_test.cc index 202463df0..d76020845 100644 --- a/test/arrow_test.cc +++ b/test/arrow_test.cc @@ -28,6 +28,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "matchers.h" @@ -64,8 +65,8 @@ TEST_P(ToArrowSchemaTest, PrimitiveType) { ASSERT_TRUE(field->type()->Equals(param.arrow_type)); auto metadata = field->metadata(); - ASSERT_TRUE(metadata->Contains(kFieldIdKey)); - ASSERT_EQ(metadata->Get(kFieldIdKey), std::to_string(kFieldId)); + ASSERT_TRUE(metadata->Contains(kParquetFieldIdKey)); + ASSERT_EQ(metadata->Get(kParquetFieldIdKey), std::to_string(kFieldId)); } INSTANTIATE_TEST_SUITE_P( @@ -112,8 +113,8 @@ void CheckArrowField(const ::arrow::Field& field, ::arrow::Type::type type_id, auto metadata = field.metadata(); ASSERT_TRUE(metadata != nullptr); - ASSERT_TRUE(metadata->Contains(kFieldIdKey)); - ASSERT_EQ(metadata->Get(kFieldIdKey), std::to_string(field_id)); + ASSERT_TRUE(metadata->Contains(kParquetFieldIdKey)); + ASSERT_EQ(metadata->Get(kParquetFieldIdKey), std::to_string(field_id)); } } // namespace @@ -241,7 +242,7 @@ TEST_P(FromArrowSchemaTest, PrimitiveType) { auto metadata = ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kFieldId)}}); + {std::string(kParquetFieldIdKey), std::to_string(kFieldId)}}); auto arrow_schema = ::arrow::schema({::arrow::field( std::string(kFieldName), param.arrow_type, param.optional, std::move(metadata))}); ArrowSchema exported_schema; @@ -309,16 +310,16 @@ TEST(FromArrowSchemaTest, StructType) { auto int_field = ::arrow::field( std::string(kIntFieldName), ::arrow::int32(), /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kIntFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kIntFieldId)}})); auto str_field = ::arrow::field( std::string(kStrFieldName), ::arrow::utf8(), /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kStrFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kStrFieldId)}})); auto struct_type = ::arrow::struct_({int_field, str_field}); auto struct_field = ::arrow::field( std::string(kStructFieldName), struct_type, /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kStructFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kStructFieldId)}})); auto arrow_schema = ::arrow::schema({struct_field}); ArrowSchema exported_schema; ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); @@ -363,12 +364,12 @@ TEST(FromArrowSchemaTest, ListType) { auto element_field = ::arrow::field( std::string(kElemFieldName), ::arrow::int64(), /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kElemFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kElemFieldId)}})); auto list_type = ::arrow::list(element_field); auto list_field = ::arrow::field( std::string(kListFieldName), list_type, /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kListFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kListFieldId)}})); auto arrow_schema = ::arrow::schema({list_field}); ArrowSchema exported_schema; @@ -410,16 +411,16 @@ TEST(FromArrowSchemaTest, MapType) { auto key_field = ::arrow::field( std::string(kKeyFieldName), ::arrow::utf8(), /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kKeyFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kKeyFieldId)}})); auto value_field = ::arrow::field( std::string(kValueFieldName), ::arrow::int32(), /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kValueFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kValueFieldId)}})); auto map_type = std::make_shared<::arrow::MapType>(key_field, value_field); auto map_field = ::arrow::field( std::string(kMapFieldName), map_type, /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kFieldId)}})); auto arrow_schema = ::arrow::schema({map_field}); ArrowSchema exported_schema; diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 122256bb3..0c42b8463 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -17,38 +17,120 @@ * under the License. */ +#include + #include #include #include #include #include +#include #include #include #include #include +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" -#include "iceberg/parquet/parquet_reader.h" +#include "iceberg/file_reader.h" +#include "iceberg/file_writer.h" #include "iceberg/parquet/parquet_register.h" +#include "iceberg/result.h" #include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" #include "matchers.h" -#include "temp_file_test_base.h" namespace iceberg::parquet { -class ParquetReaderTest : public TempFileTestBase { +namespace { + +Status WriteArray(std::shared_ptr<::arrow::Array> data, Writer& writer) { + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); + ICEBERG_RETURN_UNEXPECTED(writer.Write(arr)); + return writer.Close(); +} + +Status WriteArray(std::shared_ptr<::arrow::Array> data, + const WriterOptions& writer_options) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); + return WriteArray(data, *writer); +} + +Status ReadArray(std::shared_ptr<::arrow::Array>& out, + const ReaderOptions& reader_options) { + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, reader_options)); + ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next()); + + if (!read_data.has_value()) { + out = nullptr; + return {}; + } + auto arrow_c_array = read_data.value(); + + ICEBERG_ASSIGN_OR_RAISE(ArrowSchema arrow_schema, reader->Schema()); + ICEBERG_ARROW_ASSIGN_OR_RETURN(out, + ::arrow::ImportArray(&arrow_c_array, &arrow_schema)); + return {}; +} + +void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr schema, + std::shared_ptr<::arrow::Array>& out) { + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "base.parquet"; + + auto writer_data = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = file_io}); + ASSERT_THAT(writer_data, IsOk()) + << "Failed to create writer: " << writer_data.error().message; + auto writer = std::move(writer_data.value()); + ASSERT_THAT(WriteArray(data, *writer), IsOk()); + + ASSERT_THAT(ReadArray(out, {.path = basePath, + .length = writer->length(), + .io = file_io, + .projection = schema}), + IsOk()); + + ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data"; +} + +} // namespace + +class ParquetReaderTest : public ::testing::Test { protected: static void SetUpTestSuite() { parquet::RegisterAll(); } void SetUp() override { - TempFileTestBase::SetUp(); - file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); - temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet"); + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + temp_parquet_file_ = "parquet_reader_test.parquet"; } void CreateSimpleParquetFile() { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"],[2, "Bar"],[3, "Baz"]])") + .ValueOrDie(); + + ASSERT_TRUE(WriteArray( + array, {.path = temp_parquet_file_, .schema = schema, .io = file_io_})); + } + + void CreateSplitParquetFile() { const std::string kParquetFieldIdKey = "PARQUET:field_id"; auto arrow_schema = ::arrow::schema( {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, @@ -70,6 +152,7 @@ class ParquetReaderTest : public TempFileTestBase { ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), outfile, /*chunk_size=*/2) .ok()); + ASSERT_TRUE(outfile->Close().ok()); } void VerifyNextBatch(Reader& reader, std::string_view expected_json) { @@ -162,7 +245,7 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) { } TEST_F(ParquetReaderTest, ReadSplit) { - CreateSimpleParquetFile(); + CreateSplitParquetFile(); // Read split offsets auto io = internal::checked_cast(*file_io_); @@ -204,4 +287,82 @@ TEST_F(ParquetReaderTest, ReadSplit) { } } +class ParquetReadWrite : public ::testing::Test { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } +}; + +TEST_F(ParquetReadWrite, EmptyStruct) { + auto schema = + std::make_shared(std::vector{SchemaField::MakeRequired( + 1, "empty_struct", std::make_shared(std::vector{}))}); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), R"([null, {}])") + .ValueOrDie(); + + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "base.parquet"; + + ASSERT_THAT(WriteArray(array, {.path = basePath, .schema = schema, .io = file_io}), + IsError(ErrorKind::kNotImplemented)); +} + +TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "a", + struct_({ + SchemaField::MakeOptional(2, "a1", int64()), + SchemaField::MakeOptional(3, "a2", string()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[{"a1": 1, "a2": "abc"}], [{"a1": 0}], [{"a2": "edf"}], [{}]])") + .ValueOrDie(); + + std::shared_ptr<::arrow::Array> out; + DoRoundtrip(array, schema, out); + + ASSERT_TRUE(out->Equals(*array)); +} + +TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "a", boolean()), + SchemaField::MakeOptional(2, "b", int32()), + SchemaField::MakeOptional(3, "c", int64()), + SchemaField::MakeOptional(4, "d", float32()), + SchemaField::MakeOptional(5, "e", float64()), + SchemaField::MakeOptional(6, "f", string()), + SchemaField::MakeOptional(7, "g", time()), + SchemaField::MakeOptional(8, "h", timestamp()), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], + [false, 0, 0, 0, 0, "", 0, 0], + [null, null, null, null, null, null, null, null]])") + .ValueOrDie(); + + std::shared_ptr<::arrow::Array> out; + DoRoundtrip(array, schema, out); + + ASSERT_TRUE(out->Equals(*array)); +} + } // namespace iceberg::parquet