From b8435b0f2aa08ea62b17659ee590595709e5219e Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Fri, 5 Sep 2025 15:35:36 +0800 Subject: [PATCH 1/4] a basic implement for export scan tasks as Arrow C ABI streams --- src/iceberg/table_scan.cc | 153 +++++++++++++++++++++++++++++++++++++- src/iceberg/table_scan.h | 8 ++ 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index a7edd5d79..b48a8b86e 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,9 +19,13 @@ #include "iceberg/table_scan.h" -#include -#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" @@ -33,6 +37,112 @@ namespace iceberg { +namespace { +/// \brief Private data structure to hold the Reader and error state +struct ReaderStreamPrivateData { + std::unique_ptr reader; + std::string last_error; + + explicit ReaderStreamPrivateData(std::unique_ptr reader_ptr) + : reader(std::move(reader_ptr)) {} + + ~ReaderStreamPrivateData() { + if (reader) { + reader->Close(); + } + } +}; + +/// \brief Callback to get the stream schema +static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) { + if (!stream || !stream->private_data) { + return EINVAL; + } + auto* private_data = static_cast(stream->private_data); + // Get schema from reader + auto schema_result = private_data->reader->Schema(); + if (!schema_result.has_value()) { + private_data->last_error = schema_result.error().message; + std::memset(out, 0, sizeof(ArrowSchema)); + return EIO; + } + + *out = std::move(schema_result.value()); + return 0; +} + +/// \brief Callback to get the next array from the stream +static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) { + if (!stream || !stream->private_data) { + return EINVAL; + } + + auto* private_data = static_cast(stream->private_data); + + auto next_result = private_data->reader->Next(); + if (!next_result.has_value()) { + private_data->last_error = next_result.error().message; + std::memset(out, 0, sizeof(ArrowArray)); + return EIO; + } + + auto& optional_array = next_result.value(); + if (optional_array.has_value()) { + *out = std::move(optional_array.value()); + } else { + // End of stream - set release to nullptr to signal end + std::memset(out, 0, sizeof(ArrowArray)); + out->release = nullptr; + } + + return 0; +} + +/// \brief Callback to get the last error message +static const char* GetLastError(struct ArrowArrayStream* stream) { + if (!stream || !stream->private_data) { + return nullptr; + } + + auto* private_data = static_cast(stream->private_data); + return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str(); +} + +/// \brief Callback to release the stream resources +static void Release(struct ArrowArrayStream* stream) { + if (!stream || !stream->private_data) { + return; + } + + delete static_cast(stream->private_data); + stream->private_data = nullptr; + stream->release = nullptr; +} + +Result MakeArrowArrayStream(std::unique_ptr reader) { + if (!reader) { + return InvalidArgument("Reader cannot be null"); + } + + auto schema_check = reader->Schema(); + if (!schema_check.has_value()) { + return InvalidSchema("Failed to get schema from reader: {}", + schema_check.error().message); + } + + auto private_data = std::make_unique(std::move(reader)); + + ArrowArrayStream stream{.get_schema = GetSchema, + .get_next = GetNext, + .get_last_error = GetLastError, + .release = Release, + .private_data = private_data.release()}; + + return stream; +} + +} // namespace + // implement FileScanTask FileScanTask::FileScanTask(std::shared_ptr data_file) : data_file_(std::move(data_file)) {} @@ -45,6 +155,21 @@ int32_t FileScanTask::files_count() const { return 1; } int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } +Result FileScanTask::ToArrow( + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter, const std::shared_ptr& io) const { + const ReaderOptions options{.path = data_file_->file_path, + .length = data_file_->file_size_in_bytes, + .io = io, + .projection = projected_schema, + .filter = filter}; + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ReaderFactoryRegistry::Open(data_file_->file_format, options)); + + return MakeArrowArrayStream(std::move(reader)); +} + TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, std::shared_ptr file_io) : file_io_(std::move(file_io)) { @@ -178,4 +303,28 @@ Result>> DataTableScan::PlanFiles() co return tasks; } +Result> DataTableScan::ToArrow() const { + Result>> tasks_result = PlanFiles(); + if (!tasks_result.has_value()) { + return InvalidArgument("Failed to plan files: {}", tasks_result.error().message); + } + auto tasks = tasks_result.value(); + if (tasks.empty()) { + // TODO(Li Feiyang): return a empty arrow stream + return NotImplemented("No files to scan"); + } + + std::vector arrow_streams; + for (const auto& task : tasks_result.value()) { + Result arrow_stream_result = + task->ToArrow(context_.projected_schema, context_.filter, file_io_); + if (!arrow_stream_result.has_value()) { + return InvalidArgument("Failed to get arrow stream: {}", + arrow_stream_result.error().message); + } + arrow_streams.push_back(arrow_stream_result.value()); + } + return std::move(arrow_streams); +} + } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index dcfa72205..b74ae6902 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -22,6 +22,7 @@ #include #include +#include "iceberg/arrow_c_data.h" #include "iceberg/manifest_entry.h" #include "iceberg/type_fwd.h" @@ -54,6 +55,11 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { int32_t files_count() const override; int64_t estimated_row_count() const override; + Result ToArrow(const std::shared_ptr& projected_schema, + + const std::shared_ptr& filter, + const std::shared_ptr& io) const; + private: /// \brief Data file metadata. std::shared_ptr data_file_; @@ -183,6 +189,8 @@ class ICEBERG_EXPORT DataTableScan : public TableScan { /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. Result>> PlanFiles() const override; + + Result> ToArrow() const; }; } // namespace iceberg From a5ebaf916a6f18d8ef7e3808bba7a4a68381e8c7 Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Fri, 5 Sep 2025 18:19:55 +0800 Subject: [PATCH 2/4] completed with test --- src/iceberg/table_scan.cc | 28 +---- src/iceberg/table_scan.h | 11 +- test/CMakeLists.txt | 3 + test/file_scan_task_test.cc | 206 ++++++++++++++++++++++++++++++++++++ 4 files changed, 218 insertions(+), 30 deletions(-) create mode 100644 test/file_scan_task_test.cc diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index b48a8b86e..421f604df 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -22,8 +22,6 @@ #include #include -#include - #include "iceberg/arrow_c_data.h" #include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" @@ -48,7 +46,7 @@ struct ReaderStreamPrivateData { ~ReaderStreamPrivateData() { if (reader) { - reader->Close(); + std::ignore = reader->Close(); } } }; @@ -303,28 +301,4 @@ Result>> DataTableScan::PlanFiles() co return tasks; } -Result> DataTableScan::ToArrow() const { - Result>> tasks_result = PlanFiles(); - if (!tasks_result.has_value()) { - return InvalidArgument("Failed to plan files: {}", tasks_result.error().message); - } - auto tasks = tasks_result.value(); - if (tasks.empty()) { - // TODO(Li Feiyang): return a empty arrow stream - return NotImplemented("No files to scan"); - } - - std::vector arrow_streams; - for (const auto& task : tasks_result.value()) { - Result arrow_stream_result = - task->ToArrow(context_.projected_schema, context_.filter, file_io_); - if (!arrow_stream_result.has_value()) { - return InvalidArgument("Failed to get arrow stream: {}", - arrow_stream_result.error().message); - } - arrow_streams.push_back(arrow_stream_result.value()); - } - return std::move(arrow_streams); -} - } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index b74ae6902..2cd04cd8a 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -55,8 +55,15 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { int32_t files_count() const override; int64_t estimated_row_count() const override; + /** + * \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task. + * + * \param projected_schema The projected schema for reading the data. + * \param filter Optional filter expression to apply during reading. + * \param io The FileIO instance for accessing the file data. + * \return A Result containing an ArrowArrayStream, or an error on failure. + */ Result ToArrow(const std::shared_ptr& projected_schema, - const std::shared_ptr& filter, const std::shared_ptr& io) const; @@ -189,8 +196,6 @@ class ICEBERG_EXPORT DataTableScan : public TableScan { /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. Result>> PlanFiles() const override; - - Result> ToArrow() const; }; } // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 42ad13209..f8926eb70 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -124,4 +124,7 @@ if(ICEBERG_BUILD_BUNDLE) parquet_data_test.cc parquet_schema_test.cc parquet_test.cc) + + add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc) + endif() diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc new file mode 100644 index 000000000..d276ed6cb --- /dev/null +++ b/test/file_scan_task_test.cc @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "matchers.h" +#include "temp_file_test_base.h" + +namespace iceberg { + +class FileScanTaskTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); + temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet"); + CreateSimpleParquetFile(); + } + + // Helper method to create a Parquet file with sample data. + void CreateSimpleParquetFile(int64_t chunk_size = 1024) { + const std::string kParquetFieldIdKey = "PARQUET:field_id"; + auto arrow_schema = ::arrow::schema( + {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})), + ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))}); + auto table = ::arrow::Table::FromRecordBatches( + arrow_schema, {::arrow::RecordBatch::FromStructArray( + ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])") + .ValueOrDie()) + .ValueOrDie()}) + .ValueOrDie(); + + auto io = internal::checked_cast(*file_io_); + auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + + ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), + outfile, chunk_size) + .ok()); + } + + // Helper to create a valid but empty Parquet file. + void CreateEmptyParquetFile() { + const std::string kParquetFieldIdKey = "PARQUET:field_id"; + auto arrow_schema = ::arrow::schema( + {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))}); + auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, {}).ValueOrDie(); + + auto io = internal::checked_cast(*file_io_); + auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, ::arrow::default_memory_pool(), + outfile, 1024) + .ok()); + } + + // Helper method to verify the content of the next batch from an ArrowArrayStream. + void VerifyStreamNextBatch(struct ArrowArrayStream* stream, + std::string_view expected_json) { + ASSERT_NE(stream->get_schema, nullptr) << "Stream has been released or is invalid."; + + ArrowSchema c_schema; + ASSERT_EQ(stream->get_schema(stream, &c_schema), 0); + auto import_schema_result = ::arrow::ImportSchema(&c_schema); + ASSERT_TRUE(import_schema_result.ok()) << import_schema_result.status().message(); + auto arrow_schema = import_schema_result.ValueOrDie(); + + ArrowArray c_array; + ASSERT_EQ(stream->get_next(stream, &c_array), 0) + << "get_next failed. Error: " << stream->get_last_error(stream); + ASSERT_NE(c_array.release, nullptr) << "Stream is exhausted but expected more data."; + + auto import_batch_result = ::arrow::ImportRecordBatch(&c_array, arrow_schema); + ASSERT_TRUE(import_batch_result.ok()) << import_batch_result.status().message(); + auto actual_batch = import_batch_result.ValueOrDie(); + + auto struct_type = ::arrow::struct_(arrow_schema->fields()); + auto expected_array = + ::arrow::json::ArrayFromJSONString(struct_type, expected_json).ValueOrDie(); + auto expected_batch = + ::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie(); + + ASSERT_TRUE(actual_batch->Equals(*expected_batch)) + << "Actual batch:\n" + << actual_batch->ToString() << "\nExpected batch:\n" + << expected_batch->ToString(); + } + + // Helper method to verify that an ArrowArrayStream is exhausted. + void VerifyStreamExhausted(struct ArrowArrayStream* stream) { + ASSERT_NE(stream->get_next, nullptr) << "Stream has been released or is invalid."; + ArrowArray c_array; + ASSERT_EQ(stream->get_next(stream, &c_array), 0); + ASSERT_EQ(c_array.release, nullptr) << "Stream was not exhausted as expected."; + } + + std::shared_ptr file_io_; + std::string temp_parquet_file_; +}; + +TEST_F(FileScanTaskTest, ReadFullSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + FileScanTask task(data_file); + + auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); + + ASSERT_NE(stream.release, nullptr); + stream.release(&stream); + ASSERT_EQ(stream.release, nullptr); + ASSERT_EQ(stream.private_data, nullptr); +} + +TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "score", float64())}); + + FileScanTask task(data_file); + + auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", null]])")); + ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); + + stream.release(&stream); +} + +TEST_F(FileScanTaskTest, ReadEmptyFile) { + CreateEmptyParquetFile(); + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + + FileScanTask task(data_file); + + auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + // The stream should be immediately exhausted + ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); + + stream.release(&stream); +} + +} // namespace iceberg From d64b0fa8240db404daf6a5ca56c4aea203256861 Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Mon, 8 Sep 2025 10:49:26 +0800 Subject: [PATCH 3/4] fix ci --- src/iceberg/table_scan.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 421f604df..63ddbcce8 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -122,12 +122,6 @@ Result MakeArrowArrayStream(std::unique_ptr reader) { return InvalidArgument("Reader cannot be null"); } - auto schema_check = reader->Schema(); - if (!schema_check.has_value()) { - return InvalidSchema("Failed to get schema from reader: {}", - schema_check.error().message); - } - auto private_data = std::make_unique(std::move(reader)); ArrowArrayStream stream{.get_schema = GetSchema, From a2d0914c9209148b08f293175c57f47bba8d9795 Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Tue, 9 Sep 2025 17:03:46 +0800 Subject: [PATCH 4/4] fix ci --- src/iceberg/table_scan.cc | 4 ++-- src/iceberg/table_scan.h | 8 +++---- test/file_scan_task_test.cc | 46 +++++++++++-------------------------- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 63ddbcce8..dc63fe5c3 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -148,8 +148,8 @@ int32_t FileScanTask::files_count() const { return 1; } int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } Result FileScanTask::ToArrow( - const std::shared_ptr& projected_schema, - const std::shared_ptr& filter, const std::shared_ptr& io) const { + const std::shared_ptr& io, const std::shared_ptr& projected_schema, + const std::shared_ptr& filter) const { const ReaderOptions options{.path = data_file_->file_path, .length = data_file_->file_size_in_bytes, .io = io, diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 2cd04cd8a..9e7f313cc 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -58,14 +58,14 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { /** * \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task. * + * \param io The FileIO instance for accessing the file data. * \param projected_schema The projected schema for reading the data. * \param filter Optional filter expression to apply during reading. - * \param io The FileIO instance for accessing the file data. * \return A Result containing an ArrowArrayStream, or an error on failure. */ - Result ToArrow(const std::shared_ptr& projected_schema, - const std::shared_ptr& filter, - const std::shared_ptr& io) const; + Result ToArrow(const std::shared_ptr& io, + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter) const; private: /// \brief Data file metadata. diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc index d276ed6cb..b386d948c 100644 --- a/test/file_scan_task_test.cc +++ b/test/file_scan_task_test.cc @@ -94,23 +94,14 @@ class FileScanTaskTest : public TempFileTestBase { // Helper method to verify the content of the next batch from an ArrowArrayStream. void VerifyStreamNextBatch(struct ArrowArrayStream* stream, std::string_view expected_json) { - ASSERT_NE(stream->get_schema, nullptr) << "Stream has been released or is invalid."; + auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); - ArrowSchema c_schema; - ASSERT_EQ(stream->get_schema(stream, &c_schema), 0); - auto import_schema_result = ::arrow::ImportSchema(&c_schema); - ASSERT_TRUE(import_schema_result.ok()) << import_schema_result.status().message(); - auto arrow_schema = import_schema_result.ValueOrDie(); - - ArrowArray c_array; - ASSERT_EQ(stream->get_next(stream, &c_array), 0) - << "get_next failed. Error: " << stream->get_last_error(stream); - ASSERT_NE(c_array.release, nullptr) << "Stream is exhausted but expected more data."; - - auto import_batch_result = ::arrow::ImportRecordBatch(&c_array, arrow_schema); - ASSERT_TRUE(import_batch_result.ok()) << import_batch_result.status().message(); - auto actual_batch = import_batch_result.ValueOrDie(); + auto result = record_batch_reader->Next(); + ASSERT_TRUE(result.ok()) << result.status().message(); + auto actual_batch = result.ValueOrDie(); + ASSERT_NE(actual_batch, nullptr) << "Stream is exhausted but expected more data."; + auto arrow_schema = actual_batch->schema(); auto struct_type = ::arrow::struct_(arrow_schema->fields()); auto expected_array = ::arrow::json::ArrayFromJSONString(struct_type, expected_json).ValueOrDie(); @@ -125,10 +116,10 @@ class FileScanTaskTest : public TempFileTestBase { // Helper method to verify that an ArrowArrayStream is exhausted. void VerifyStreamExhausted(struct ArrowArrayStream* stream) { - ASSERT_NE(stream->get_next, nullptr) << "Stream has been released or is invalid."; - ArrowArray c_array; - ASSERT_EQ(stream->get_next(stream, &c_array), 0); - ASSERT_EQ(c_array.release, nullptr) << "Stream was not exhausted as expected."; + auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); + auto result = record_batch_reader->Next(); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(result.ValueOrDie(), nullptr) << "Reader was not exhausted as expected."; } std::shared_ptr file_io_; @@ -146,18 +137,12 @@ TEST_F(FileScanTaskTest, ReadFullSchema) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_); + auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); ASSERT_THAT(stream_result, IsOk()); auto stream = std::move(stream_result.value()); ASSERT_NO_FATAL_FAILURE( VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); - ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); - - ASSERT_NE(stream.release, nullptr); - stream.release(&stream); - ASSERT_EQ(stream.release, nullptr); - ASSERT_EQ(stream.private_data, nullptr); } TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { @@ -171,15 +156,12 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_); + auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); ASSERT_THAT(stream_result, IsOk()); auto stream = std::move(stream_result.value()); ASSERT_NO_FATAL_FAILURE( VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", null]])")); - ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); - - stream.release(&stream); } TEST_F(FileScanTaskTest, ReadEmptyFile) { @@ -193,14 +175,12 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_); + auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); ASSERT_THAT(stream_result, IsOk()); auto stream = std::move(stream_result.value()); // The stream should be immediately exhausted ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); - - stream.release(&stream); } } // namespace iceberg