diff --git a/src/iceberg/arrow_array_reader.h b/src/iceberg/arrow_array_reader.h new file mode 100644 index 000000000..a86f8162c --- /dev/null +++ b/src/iceberg/arrow_array_reader.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief A reader interface that returns ArrowArray in a streaming fashion. +class ICEBERG_EXPORT ArrowArrayReader { + public: + /// \brief Read next batch of data. + /// + /// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`. + virtual Result> Next() = 0; + + /// \brief Get schema of data returned by `Next`. + virtual Result Schema() const = 0; + + /// \brief Close this reader and release all resources. + virtual Status Close() = 0; + + virtual ~ArrowArrayReader() = default; +}; + +} // namespace iceberg diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 048cd4997..ddb8d1df3 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -160,7 +160,7 @@ class AvroReader::Impl { return {}; } - Result Schema() { + Result Schema() const { if (!context_) { ICEBERG_RETURN_UNEXPECTED(InitReadContext()); } @@ -174,7 +174,7 @@ class AvroReader::Impl { } private: - Status InitReadContext() { + Status InitReadContext() const { context_ = std::make_unique(); context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema()); @@ -232,14 +232,14 @@ class AvroReader::Impl { // The avro reader to read the data into a datum. std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_; // The context to keep track of the reading progress. - std::unique_ptr context_; + mutable std::unique_ptr context_; }; AvroReader::~AvroReader() = default; Result> AvroReader::Next() { return impl_->Next(); } -Result AvroReader::Schema() { return impl_->Schema(); } +Result AvroReader::Schema() const { return impl_->Schema(); } Status AvroReader::Open(const ReaderOptions& options) { impl_ = std::make_unique(); diff --git a/src/iceberg/avro/avro_reader.h b/src/iceberg/avro/avro_reader.h index 07737bb7b..6481fe276 100644 --- a/src/iceberg/avro/avro_reader.h +++ b/src/iceberg/avro/avro_reader.h @@ -37,7 +37,7 @@ class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader { Result> Next() final; - Result Schema() final; + Result Schema() const final; private: class Impl; diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index 8a59e33fe..4bafefda1 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -26,6 +26,7 @@ #include #include +#include "iceberg/arrow_array_reader.h" #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" #include "iceberg/result.h" @@ -34,26 +35,14 @@ namespace iceberg { /// \brief Base reader class to read data from different file formats. -class ICEBERG_EXPORT Reader { +class ICEBERG_EXPORT Reader : public ArrowArrayReader { public: - virtual ~Reader() = default; Reader() = default; Reader(const Reader&) = delete; Reader& operator=(const Reader&) = delete; /// \brief Open the reader. virtual Status Open(const struct ReaderOptions& options) = 0; - - /// \brief Close the reader. - virtual Status Close() = 0; - - /// \brief Read next data from file. - /// - /// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`. - virtual Result> Next() = 0; - - /// \brief Get the schema of the data. - virtual Result Schema() = 0; }; /// \brief A split of the file to read. diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 405b09f03..b6490f65e 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -173,7 +173,7 @@ class ParquetReader::Impl { } // Get the schema of the data - Result Schema() { + Result Schema() const { if (!context_) { ICEBERG_RETURN_UNEXPECTED(InitReadContext()); } @@ -185,7 +185,7 @@ class ParquetReader::Impl { } private: - Status InitReadContext() { + Status InitReadContext() const { context_ = std::make_unique(); // Build the output Arrow schema @@ -239,14 +239,14 @@ class ParquetReader::Impl { // Parquet file reader to create RecordBatchReader. std::unique_ptr<::parquet::arrow::FileReader> reader_; // The context to keep track of the reading progress. - std::unique_ptr context_; + mutable std::unique_ptr context_; }; ParquetReader::~ParquetReader() = default; Result> ParquetReader::Next() { return impl_->Next(); } -Result ParquetReader::Schema() { return impl_->Schema(); } +Result ParquetReader::Schema() const { return impl_->Schema(); } Status ParquetReader::Open(const ReaderOptions& options) { impl_ = std::make_unique(); diff --git a/src/iceberg/parquet/parquet_reader.h b/src/iceberg/parquet/parquet_reader.h index 23d34dfa9..47ecd44c2 100644 --- a/src/iceberg/parquet/parquet_reader.h +++ b/src/iceberg/parquet/parquet_reader.h @@ -37,7 +37,7 @@ class ICEBERG_BUNDLE_EXPORT ParquetReader : public Reader { Result> Next() final; - Result Schema() final; + Result Schema() const final; private: class Impl; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index a7edd5d79..7ba5cdb8d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,9 +19,7 @@ #include "iceberg/table_scan.h" -#include -#include - +#include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" @@ -45,6 +43,21 @@ int32_t FileScanTask::files_count() const { return 1; } int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } +Result> FileScanTask::ToArrowArrayReader( + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter, const std::shared_ptr& io) const { + const ReaderOptions options{.path = data_file_->file_path, + .length = data_file_->file_size_in_bytes, + .io = io, + .projection = projected_schema, + .filter = filter}; + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ReaderFactoryRegistry::Open(data_file_->file_format, options)); + + return reader; +} + TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, std::shared_ptr file_io) : file_io_(std::move(file_io)) { diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index dcfa72205..99f8cae5f 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -22,6 +22,7 @@ #include #include +#include "iceberg/arrow_array_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/type_fwd.h" @@ -51,9 +52,23 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { const std::shared_ptr& data_file() const; int64_t size_bytes() const override; + int32_t files_count() const override; + int64_t estimated_row_count() const override; + /** + * \brief Returns an ArrowArrayReader to read the data for this task. + * + * \param projected_schema The projected schema for reading the data. + * \param filter Optional filter expression to apply during reading. + * \param io The FileIO instance for accessing the file data. + * \return A Result containing a unique pointer to the reader, or an error on failure. + */ + Result> ToArrowArrayReader( + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter, const std::shared_ptr& io) const; + private: /// \brief Data file metadata. std::shared_ptr data_file_; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8dbb3df86..bbce0d44c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -123,4 +123,7 @@ if(ICEBERG_BUILD_BUNDLE) parquet_data_test.cc parquet_schema_test.cc parquet_test.cc) + + add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc) + endif() diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc new file mode 100644 index 000000000..7643ee3e1 --- /dev/null +++ b/test/file_scan_task_test.cc @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow_array_reader.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "matchers.h" +#include "temp_file_test_base.h" + +namespace iceberg { + +class FileScanTaskTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); + temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet"); + CreateSimpleParquetFile(); + } + + // Helper method to create a Parquet file with sample data. + // This is identical to the one in ParquetReaderTest. + void CreateSimpleParquetFile() { + const std::string kParquetFieldIdKey = "PARQUET:field_id"; + auto arrow_schema = ::arrow::schema( + {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})), + ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))}); + auto table = ::arrow::Table::FromRecordBatches( + arrow_schema, {::arrow::RecordBatch::FromStructArray( + ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])") + .ValueOrDie()) + .ValueOrDie()}) + .ValueOrDie(); + + auto io = internal::checked_cast(*file_io_); + auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + + ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), + outfile, /*chunk_size=*/1024) + .ok()); + } + + // Helper method to verify the content of the next batch from the reader. + // This is identical to the one in ParquetReaderTest and AvroReaderTest. + void VerifyNextBatch(ArrowArrayReader& reader, std::string_view expected_json) { + // Boilerplate to get Arrow schema + auto schema_result = reader.Schema(); + ASSERT_THAT(schema_result, IsOk()); + auto arrow_c_schema = std::move(schema_result.value()); + auto import_schema_result = ::arrow::ImportType(&arrow_c_schema); + auto arrow_schema = import_schema_result.ValueOrDie(); + + // Boilerplate to get Arrow array + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()) << "Reader.Next() failed: " << data.error().message; + ASSERT_TRUE(data.value().has_value()) << "Reader.Next() returned no data"; + auto arrow_c_array = data.value().value(); + auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema); + auto arrow_array = data_result.ValueOrDie(); + + // Verify data + auto expected_array = + ::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie(); + ASSERT_TRUE(arrow_array->Equals(*expected_array)) + << "Expected: " << expected_array->ToString() + << "\nGot: " << arrow_array->ToString(); + } + + // Helper method to verify that the reader is exhausted. + void VerifyExhausted(ArrowArrayReader& reader) { + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_FALSE(data.value().has_value()); + } + + std::shared_ptr file_io_; + std::string temp_parquet_file_; +}; + +TEST_F(FileScanTaskTest, ReadFullSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto io = internal::checked_cast(*file_io_); + data_file->file_size_in_bytes = + io.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + FileScanTask task(data_file); + + auto reader_result = task.ToArrowArrayReader(projected_schema, nullptr, file_io_); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto io = internal::checked_cast(*file_io_); + data_file->file_size_in_bytes = + io.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "score", float64())}); + + FileScanTask task(data_file); + + auto reader_result = task.ToArrowArrayReader(projected_schema, nullptr, file_io_); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([["Foo", null], ["Bar", null], ["Baz", null]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +} // namespace iceberg diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 122256bb3..825517857 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -28,7 +28,7 @@ #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" -#include "iceberg/parquet/parquet_reader.h" +#include "iceberg/file_reader.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/schema.h" #include "iceberg/type.h"