Skip to content

Commit 9b69e45

Browse files
authored
feat: read data from FileScanTask as Arrow C Stream (#214)
Introduces a new method `FileScanTask::ToArrow()` which returns a standard struct `ArrowArrayStream`. This makes iceberg-cpp a first-class citizen in the Arrow ecosystem. + `FileScanTask::ToArrow()` is added. It takes the projected schema, filter, and FileIO as arguments to create and configure a file-format-specific reader (ParquetReader, etc.). + A new factory function, `MakeArrowArrayStream`, is introduced. It takes an internal C++ Reader instance and wraps it in an `ArrowArrayStream`, correctly managing state and resource lifecycle via the C interface callbacks (`get_schema`, `get_next`, `release`).
1 parent b0b5be7 commit 9b69e45

File tree

4 files changed

+321
-2
lines changed

4 files changed

+321
-2
lines changed

src/iceberg/table_scan.cc

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
#include "iceberg/table_scan.h"
2121

22-
#include <algorithm>
23-
#include <ranges>
22+
#include <cstring>
23+
#include <vector>
2424

25+
#include "iceberg/arrow_c_data.h"
26+
#include "iceberg/file_reader.h"
2527
#include "iceberg/manifest_entry.h"
2628
#include "iceberg/manifest_list.h"
2729
#include "iceberg/manifest_reader.h"
@@ -33,6 +35,106 @@
3335

3436
namespace iceberg {
3537

38+
namespace {
39+
/// \brief Private data structure to hold the Reader and error state
40+
struct ReaderStreamPrivateData {
41+
std::unique_ptr<Reader> reader;
42+
std::string last_error;
43+
44+
explicit ReaderStreamPrivateData(std::unique_ptr<Reader> reader_ptr)
45+
: reader(std::move(reader_ptr)) {}
46+
47+
~ReaderStreamPrivateData() {
48+
if (reader) {
49+
std::ignore = reader->Close();
50+
}
51+
}
52+
};
53+
54+
/// \brief Callback to get the stream schema
55+
static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
56+
if (!stream || !stream->private_data) {
57+
return EINVAL;
58+
}
59+
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
60+
// Get schema from reader
61+
auto schema_result = private_data->reader->Schema();
62+
if (!schema_result.has_value()) {
63+
private_data->last_error = schema_result.error().message;
64+
std::memset(out, 0, sizeof(ArrowSchema));
65+
return EIO;
66+
}
67+
68+
*out = std::move(schema_result.value());
69+
return 0;
70+
}
71+
72+
/// \brief Callback to get the next array from the stream
73+
static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
74+
if (!stream || !stream->private_data) {
75+
return EINVAL;
76+
}
77+
78+
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
79+
80+
auto next_result = private_data->reader->Next();
81+
if (!next_result.has_value()) {
82+
private_data->last_error = next_result.error().message;
83+
std::memset(out, 0, sizeof(ArrowArray));
84+
return EIO;
85+
}
86+
87+
auto& optional_array = next_result.value();
88+
if (optional_array.has_value()) {
89+
*out = std::move(optional_array.value());
90+
} else {
91+
// End of stream - set release to nullptr to signal end
92+
std::memset(out, 0, sizeof(ArrowArray));
93+
out->release = nullptr;
94+
}
95+
96+
return 0;
97+
}
98+
99+
/// \brief Callback to get the last error message
100+
static const char* GetLastError(struct ArrowArrayStream* stream) {
101+
if (!stream || !stream->private_data) {
102+
return nullptr;
103+
}
104+
105+
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
106+
return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str();
107+
}
108+
109+
/// \brief Callback to release the stream resources
110+
static void Release(struct ArrowArrayStream* stream) {
111+
if (!stream || !stream->private_data) {
112+
return;
113+
}
114+
115+
delete static_cast<ReaderStreamPrivateData*>(stream->private_data);
116+
stream->private_data = nullptr;
117+
stream->release = nullptr;
118+
}
119+
120+
Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
121+
if (!reader) {
122+
return InvalidArgument("Reader cannot be null");
123+
}
124+
125+
auto private_data = std::make_unique<ReaderStreamPrivateData>(std::move(reader));
126+
127+
ArrowArrayStream stream{.get_schema = GetSchema,
128+
.get_next = GetNext,
129+
.get_last_error = GetLastError,
130+
.release = Release,
131+
.private_data = private_data.release()};
132+
133+
return stream;
134+
}
135+
136+
} // namespace
137+
36138
// implement FileScanTask
37139
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
38140
: data_file_(std::move(data_file)) {}
@@ -45,6 +147,21 @@ int32_t FileScanTask::files_count() const { return 1; }
45147

46148
int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; }
47149

150+
Result<ArrowArrayStream> FileScanTask::ToArrow(
151+
const std::shared_ptr<FileIO>& io, const std::shared_ptr<Schema>& projected_schema,
152+
const std::shared_ptr<Expression>& filter) const {
153+
const ReaderOptions options{.path = data_file_->file_path,
154+
.length = data_file_->file_size_in_bytes,
155+
.io = io,
156+
.projection = projected_schema,
157+
.filter = filter};
158+
159+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
160+
ReaderFactoryRegistry::Open(data_file_->file_format, options));
161+
162+
return MakeArrowArrayStream(std::move(reader));
163+
}
164+
48165
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
49166
std::shared_ptr<FileIO> file_io)
50167
: file_io_(std::move(file_io)) {

src/iceberg/table_scan.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <string>
2323
#include <vector>
2424

25+
#include "iceberg/arrow_c_data.h"
2526
#include "iceberg/manifest_entry.h"
2627
#include "iceberg/type_fwd.h"
2728

@@ -54,6 +55,18 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
5455
int32_t files_count() const override;
5556
int64_t estimated_row_count() const override;
5657

58+
/**
59+
* \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task.
60+
*
61+
* \param io The FileIO instance for accessing the file data.
62+
* \param projected_schema The projected schema for reading the data.
63+
* \param filter Optional filter expression to apply during reading.
64+
* \return A Result containing an ArrowArrayStream, or an error on failure.
65+
*/
66+
Result<ArrowArrayStream> ToArrow(const std::shared_ptr<FileIO>& io,
67+
const std::shared_ptr<Schema>& projected_schema,
68+
const std::shared_ptr<Expression>& filter) const;
69+
5770
private:
5871
/// \brief Data file metadata.
5972
std::shared_ptr<DataFile> data_file_;

test/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,7 @@ if(ICEBERG_BUILD_BUNDLE)
124124
parquet_data_test.cc
125125
parquet_schema_test.cc
126126
parquet_test.cc)
127+
128+
add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc)
129+
127130
endif()

test/file_scan_task_test.cc

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/array.h>
21+
#include <arrow/c/bridge.h>
22+
#include <arrow/json/from_string.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/table.h>
25+
#include <arrow/util/key_value_metadata.h>
26+
#include <parquet/arrow/reader.h>
27+
#include <parquet/arrow/writer.h>
28+
#include <parquet/metadata.h>
29+
30+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
31+
#include "iceberg/file_format.h"
32+
#include "iceberg/manifest_entry.h"
33+
#include "iceberg/parquet/parquet_register.h"
34+
#include "iceberg/schema.h"
35+
#include "iceberg/table_scan.h"
36+
#include "iceberg/type.h"
37+
#include "iceberg/util/checked_cast.h"
38+
#include "matchers.h"
39+
#include "temp_file_test_base.h"
40+
41+
namespace iceberg {
42+
43+
class FileScanTaskTest : public TempFileTestBase {
44+
protected:
45+
static void SetUpTestSuite() { parquet::RegisterAll(); }
46+
47+
void SetUp() override {
48+
TempFileTestBase::SetUp();
49+
file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
50+
temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet");
51+
CreateSimpleParquetFile();
52+
}
53+
54+
// Helper method to create a Parquet file with sample data.
55+
void CreateSimpleParquetFile(int64_t chunk_size = 1024) {
56+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
57+
auto arrow_schema = ::arrow::schema(
58+
{::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
59+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})),
60+
::arrow::field("name", ::arrow::utf8(), /*nullable=*/true,
61+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))});
62+
auto table = ::arrow::Table::FromRecordBatches(
63+
arrow_schema, {::arrow::RecordBatch::FromStructArray(
64+
::arrow::json::ArrayFromJSONString(
65+
::arrow::struct_(arrow_schema->fields()),
66+
R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")
67+
.ValueOrDie())
68+
.ValueOrDie()})
69+
.ValueOrDie();
70+
71+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
72+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
73+
74+
ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(),
75+
outfile, chunk_size)
76+
.ok());
77+
}
78+
79+
// Helper to create a valid but empty Parquet file.
80+
void CreateEmptyParquetFile() {
81+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
82+
auto arrow_schema = ::arrow::schema(
83+
{::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
84+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))});
85+
auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, {}).ValueOrDie();
86+
87+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
88+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
89+
ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, ::arrow::default_memory_pool(),
90+
outfile, 1024)
91+
.ok());
92+
}
93+
94+
// Helper method to verify the content of the next batch from an ArrowArrayStream.
95+
void VerifyStreamNextBatch(struct ArrowArrayStream* stream,
96+
std::string_view expected_json) {
97+
auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie();
98+
99+
auto result = record_batch_reader->Next();
100+
ASSERT_TRUE(result.ok()) << result.status().message();
101+
auto actual_batch = result.ValueOrDie();
102+
ASSERT_NE(actual_batch, nullptr) << "Stream is exhausted but expected more data.";
103+
104+
auto arrow_schema = actual_batch->schema();
105+
auto struct_type = ::arrow::struct_(arrow_schema->fields());
106+
auto expected_array =
107+
::arrow::json::ArrayFromJSONString(struct_type, expected_json).ValueOrDie();
108+
auto expected_batch =
109+
::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie();
110+
111+
ASSERT_TRUE(actual_batch->Equals(*expected_batch))
112+
<< "Actual batch:\n"
113+
<< actual_batch->ToString() << "\nExpected batch:\n"
114+
<< expected_batch->ToString();
115+
}
116+
117+
// Helper method to verify that an ArrowArrayStream is exhausted.
118+
void VerifyStreamExhausted(struct ArrowArrayStream* stream) {
119+
auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie();
120+
auto result = record_batch_reader->Next();
121+
ASSERT_TRUE(result.ok()) << result.status().message();
122+
ASSERT_EQ(result.ValueOrDie(), nullptr) << "Reader was not exhausted as expected.";
123+
}
124+
125+
std::shared_ptr<FileIO> file_io_;
126+
std::string temp_parquet_file_;
127+
};
128+
129+
TEST_F(FileScanTaskTest, ReadFullSchema) {
130+
auto data_file = std::make_shared<DataFile>();
131+
data_file->file_path = temp_parquet_file_;
132+
data_file->file_format = FileFormatType::kParquet;
133+
134+
auto projected_schema = std::make_shared<Schema>(
135+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
136+
SchemaField::MakeOptional(2, "name", string())});
137+
138+
FileScanTask task(data_file);
139+
140+
auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
141+
ASSERT_THAT(stream_result, IsOk());
142+
auto stream = std::move(stream_result.value());
143+
144+
ASSERT_NO_FATAL_FAILURE(
145+
VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
146+
}
147+
148+
TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
149+
auto data_file = std::make_shared<DataFile>();
150+
data_file->file_path = temp_parquet_file_;
151+
data_file->file_format = FileFormatType::kParquet;
152+
153+
auto projected_schema = std::make_shared<Schema>(
154+
std::vector<SchemaField>{SchemaField::MakeOptional(2, "name", string()),
155+
SchemaField::MakeOptional(3, "score", float64())});
156+
157+
FileScanTask task(data_file);
158+
159+
auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
160+
ASSERT_THAT(stream_result, IsOk());
161+
auto stream = std::move(stream_result.value());
162+
163+
ASSERT_NO_FATAL_FAILURE(
164+
VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", null]])"));
165+
}
166+
167+
TEST_F(FileScanTaskTest, ReadEmptyFile) {
168+
CreateEmptyParquetFile();
169+
auto data_file = std::make_shared<DataFile>();
170+
data_file->file_path = temp_parquet_file_;
171+
data_file->file_format = FileFormatType::kParquet;
172+
173+
auto projected_schema = std::make_shared<Schema>(
174+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
175+
176+
FileScanTask task(data_file);
177+
178+
auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
179+
ASSERT_THAT(stream_result, IsOk());
180+
auto stream = std::move(stream_result.value());
181+
182+
// The stream should be immediately exhausted
183+
ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream));
184+
}
185+
186+
} // namespace iceberg

0 commit comments

Comments
 (0)