diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index a7edd5d79..dc63fe5c3 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,9 +19,11 @@ #include "iceberg/table_scan.h" -#include -#include +#include +#include +#include "iceberg/arrow_c_data.h" +#include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" @@ -33,6 +35,106 @@ namespace iceberg { +namespace { +/// \brief Private data structure to hold the Reader and error state +struct ReaderStreamPrivateData { + std::unique_ptr reader; + std::string last_error; + + explicit ReaderStreamPrivateData(std::unique_ptr reader_ptr) + : reader(std::move(reader_ptr)) {} + + ~ReaderStreamPrivateData() { + if (reader) { + std::ignore = reader->Close(); + } + } +}; + +/// \brief Callback to get the stream schema +static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) { + if (!stream || !stream->private_data) { + return EINVAL; + } + auto* private_data = static_cast(stream->private_data); + // Get schema from reader + auto schema_result = private_data->reader->Schema(); + if (!schema_result.has_value()) { + private_data->last_error = schema_result.error().message; + std::memset(out, 0, sizeof(ArrowSchema)); + return EIO; + } + + *out = std::move(schema_result.value()); + return 0; +} + +/// \brief Callback to get the next array from the stream +static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) { + if (!stream || !stream->private_data) { + return EINVAL; + } + + auto* private_data = static_cast(stream->private_data); + + auto next_result = private_data->reader->Next(); + if (!next_result.has_value()) { + private_data->last_error = next_result.error().message; + std::memset(out, 0, sizeof(ArrowArray)); + return EIO; + } + + auto& optional_array = next_result.value(); + if (optional_array.has_value()) { + *out = std::move(optional_array.value()); + } else { + // End of stream - set release to nullptr to signal end + std::memset(out, 0, sizeof(ArrowArray)); + out->release = nullptr; + } + + return 0; +} + +/// \brief Callback to get the last error message +static const char* GetLastError(struct ArrowArrayStream* stream) { + if (!stream || !stream->private_data) { + return nullptr; + } + + auto* private_data = static_cast(stream->private_data); + return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str(); +} + +/// \brief Callback to release the stream resources +static void Release(struct ArrowArrayStream* stream) { + if (!stream || !stream->private_data) { + return; + } + + delete static_cast(stream->private_data); + stream->private_data = nullptr; + stream->release = nullptr; +} + +Result MakeArrowArrayStream(std::unique_ptr reader) { + if (!reader) { + return InvalidArgument("Reader cannot be null"); + } + + auto private_data = std::make_unique(std::move(reader)); + + ArrowArrayStream stream{.get_schema = GetSchema, + .get_next = GetNext, + .get_last_error = GetLastError, + .release = Release, + .private_data = private_data.release()}; + + return stream; +} + +} // namespace + // implement FileScanTask FileScanTask::FileScanTask(std::shared_ptr data_file) : data_file_(std::move(data_file)) {} @@ -45,6 +147,21 @@ int32_t FileScanTask::files_count() const { return 1; } int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } +Result FileScanTask::ToArrow( + const std::shared_ptr& io, const std::shared_ptr& projected_schema, + const std::shared_ptr& filter) const { + const ReaderOptions options{.path = data_file_->file_path, + .length = data_file_->file_size_in_bytes, + .io = io, + .projection = projected_schema, + .filter = filter}; + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ReaderFactoryRegistry::Open(data_file_->file_format, options)); + + return MakeArrowArrayStream(std::move(reader)); +} + TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, std::shared_ptr file_io) : file_io_(std::move(file_io)) { diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index dcfa72205..9e7f313cc 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -22,6 +22,7 @@ #include #include +#include "iceberg/arrow_c_data.h" #include "iceberg/manifest_entry.h" #include "iceberg/type_fwd.h" @@ -54,6 +55,18 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { int32_t files_count() const override; int64_t estimated_row_count() const override; + /** + * \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task. + * + * \param io The FileIO instance for accessing the file data. + * \param projected_schema The projected schema for reading the data. + * \param filter Optional filter expression to apply during reading. + * \return A Result containing an ArrowArrayStream, or an error on failure. + */ + Result ToArrow(const std::shared_ptr& io, + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter) const; + private: /// \brief Data file metadata. std::shared_ptr data_file_; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 42ad13209..f8926eb70 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -124,4 +124,7 @@ if(ICEBERG_BUILD_BUNDLE) parquet_data_test.cc parquet_schema_test.cc parquet_test.cc) + + add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc) + endif() diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc new file mode 100644 index 000000000..b386d948c --- /dev/null +++ b/test/file_scan_task_test.cc @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "matchers.h" +#include "temp_file_test_base.h" + +namespace iceberg { + +class FileScanTaskTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); + temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet"); + CreateSimpleParquetFile(); + } + + // Helper method to create a Parquet file with sample data. + void CreateSimpleParquetFile(int64_t chunk_size = 1024) { + const std::string kParquetFieldIdKey = "PARQUET:field_id"; + auto arrow_schema = ::arrow::schema( + {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})), + ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))}); + auto table = ::arrow::Table::FromRecordBatches( + arrow_schema, {::arrow::RecordBatch::FromStructArray( + ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])") + .ValueOrDie()) + .ValueOrDie()}) + .ValueOrDie(); + + auto io = internal::checked_cast(*file_io_); + auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + + ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), + outfile, chunk_size) + .ok()); + } + + // Helper to create a valid but empty Parquet file. + void CreateEmptyParquetFile() { + const std::string kParquetFieldIdKey = "PARQUET:field_id"; + auto arrow_schema = ::arrow::schema( + {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))}); + auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, {}).ValueOrDie(); + + auto io = internal::checked_cast(*file_io_); + auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, ::arrow::default_memory_pool(), + outfile, 1024) + .ok()); + } + + // Helper method to verify the content of the next batch from an ArrowArrayStream. + void VerifyStreamNextBatch(struct ArrowArrayStream* stream, + std::string_view expected_json) { + auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); + + auto result = record_batch_reader->Next(); + ASSERT_TRUE(result.ok()) << result.status().message(); + auto actual_batch = result.ValueOrDie(); + ASSERT_NE(actual_batch, nullptr) << "Stream is exhausted but expected more data."; + + auto arrow_schema = actual_batch->schema(); + auto struct_type = ::arrow::struct_(arrow_schema->fields()); + auto expected_array = + ::arrow::json::ArrayFromJSONString(struct_type, expected_json).ValueOrDie(); + auto expected_batch = + ::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie(); + + ASSERT_TRUE(actual_batch->Equals(*expected_batch)) + << "Actual batch:\n" + << actual_batch->ToString() << "\nExpected batch:\n" + << expected_batch->ToString(); + } + + // Helper method to verify that an ArrowArrayStream is exhausted. + void VerifyStreamExhausted(struct ArrowArrayStream* stream) { + auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); + auto result = record_batch_reader->Next(); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(result.ValueOrDie(), nullptr) << "Reader was not exhausted as expected."; + } + + std::shared_ptr file_io_; + std::string temp_parquet_file_; +}; + +TEST_F(FileScanTaskTest, ReadFullSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + FileScanTask task(data_file); + + auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); +} + +TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "score", float64())}); + + FileScanTask task(data_file); + + auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", null]])")); +} + +TEST_F(FileScanTaskTest, ReadEmptyFile) { + CreateEmptyParquetFile(); + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto projected_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + + FileScanTask task(data_file); + + auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + // The stream should be immediately exhausted + ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); +} + +} // namespace iceberg