Skip to content

Commit b8435b0

Browse files
committed
a basic implement for export scan tasks as Arrow C ABI streams
1 parent 88f5520 commit b8435b0

File tree

2 files changed

+159
-2
lines changed

2 files changed

+159
-2
lines changed

src/iceberg/table_scan.cc

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
#include "iceberg/table_scan.h"
2121

22-
#include <algorithm>
23-
#include <ranges>
22+
#include <cstring>
23+
#include <vector>
2424

25+
#include <iceberg/result.h>
26+
27+
#include "iceberg/arrow_c_data.h"
28+
#include "iceberg/file_reader.h"
2529
#include "iceberg/manifest_entry.h"
2630
#include "iceberg/manifest_list.h"
2731
#include "iceberg/manifest_reader.h"
@@ -33,6 +37,112 @@
3337

3438
namespace iceberg {
3539

40+
namespace {
41+
/// \brief Private data structure to hold the Reader and error state
42+
struct ReaderStreamPrivateData {
43+
std::unique_ptr<Reader> reader;
44+
std::string last_error;
45+
46+
explicit ReaderStreamPrivateData(std::unique_ptr<Reader> reader_ptr)
47+
: reader(std::move(reader_ptr)) {}
48+
49+
~ReaderStreamPrivateData() {
50+
if (reader) {
51+
reader->Close();
52+
}
53+
}
54+
};
55+
56+
/// \brief Callback to get the stream schema
57+
static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
58+
if (!stream || !stream->private_data) {
59+
return EINVAL;
60+
}
61+
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
62+
// Get schema from reader
63+
auto schema_result = private_data->reader->Schema();
64+
if (!schema_result.has_value()) {
65+
private_data->last_error = schema_result.error().message;
66+
std::memset(out, 0, sizeof(ArrowSchema));
67+
return EIO;
68+
}
69+
70+
*out = std::move(schema_result.value());
71+
return 0;
72+
}
73+
74+
/// \brief Callback to get the next array from the stream
75+
static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
76+
if (!stream || !stream->private_data) {
77+
return EINVAL;
78+
}
79+
80+
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
81+
82+
auto next_result = private_data->reader->Next();
83+
if (!next_result.has_value()) {
84+
private_data->last_error = next_result.error().message;
85+
std::memset(out, 0, sizeof(ArrowArray));
86+
return EIO;
87+
}
88+
89+
auto& optional_array = next_result.value();
90+
if (optional_array.has_value()) {
91+
*out = std::move(optional_array.value());
92+
} else {
93+
// End of stream - set release to nullptr to signal end
94+
std::memset(out, 0, sizeof(ArrowArray));
95+
out->release = nullptr;
96+
}
97+
98+
return 0;
99+
}
100+
101+
/// \brief Callback to get the last error message
102+
static const char* GetLastError(struct ArrowArrayStream* stream) {
103+
if (!stream || !stream->private_data) {
104+
return nullptr;
105+
}
106+
107+
auto* private_data = static_cast<ReaderStreamPrivateData*>(stream->private_data);
108+
return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str();
109+
}
110+
111+
/// \brief Callback to release the stream resources
112+
static void Release(struct ArrowArrayStream* stream) {
113+
if (!stream || !stream->private_data) {
114+
return;
115+
}
116+
117+
delete static_cast<ReaderStreamPrivateData*>(stream->private_data);
118+
stream->private_data = nullptr;
119+
stream->release = nullptr;
120+
}
121+
122+
Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
123+
if (!reader) {
124+
return InvalidArgument("Reader cannot be null");
125+
}
126+
127+
auto schema_check = reader->Schema();
128+
if (!schema_check.has_value()) {
129+
return InvalidSchema("Failed to get schema from reader: {}",
130+
schema_check.error().message);
131+
}
132+
133+
auto private_data = std::make_unique<ReaderStreamPrivateData>(std::move(reader));
134+
135+
ArrowArrayStream stream{.get_schema = GetSchema,
136+
.get_next = GetNext,
137+
.get_last_error = GetLastError,
138+
.release = Release,
139+
.private_data = private_data.release()};
140+
141+
return stream;
142+
}
143+
144+
} // namespace
145+
36146
// implement FileScanTask
37147
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
38148
: data_file_(std::move(data_file)) {}
@@ -45,6 +155,21 @@ int32_t FileScanTask::files_count() const { return 1; }
45155

46156
int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; }
47157

158+
Result<ArrowArrayStream> FileScanTask::ToArrow(
159+
const std::shared_ptr<Schema>& projected_schema,
160+
const std::shared_ptr<Expression>& filter, const std::shared_ptr<FileIO>& io) const {
161+
const ReaderOptions options{.path = data_file_->file_path,
162+
.length = data_file_->file_size_in_bytes,
163+
.io = io,
164+
.projection = projected_schema,
165+
.filter = filter};
166+
167+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
168+
ReaderFactoryRegistry::Open(data_file_->file_format, options));
169+
170+
return MakeArrowArrayStream(std::move(reader));
171+
}
172+
48173
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
49174
std::shared_ptr<FileIO> file_io)
50175
: file_io_(std::move(file_io)) {
@@ -178,4 +303,28 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
178303
return tasks;
179304
}
180305

306+
Result<std::vector<ArrowArrayStream>> DataTableScan::ToArrow() const {
307+
Result<std::vector<std::shared_ptr<FileScanTask>>> tasks_result = PlanFiles();
308+
if (!tasks_result.has_value()) {
309+
return InvalidArgument("Failed to plan files: {}", tasks_result.error().message);
310+
}
311+
auto tasks = tasks_result.value();
312+
if (tasks.empty()) {
313+
// TODO(Li Feiyang): return a empty arrow stream
314+
return NotImplemented("No files to scan");
315+
}
316+
317+
std::vector<ArrowArrayStream> arrow_streams;
318+
for (const auto& task : tasks_result.value()) {
319+
Result<ArrowArrayStream> arrow_stream_result =
320+
task->ToArrow(context_.projected_schema, context_.filter, file_io_);
321+
if (!arrow_stream_result.has_value()) {
322+
return InvalidArgument("Failed to get arrow stream: {}",
323+
arrow_stream_result.error().message);
324+
}
325+
arrow_streams.push_back(arrow_stream_result.value());
326+
}
327+
return std::move(arrow_streams);
328+
}
329+
181330
} // namespace iceberg

src/iceberg/table_scan.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <string>
2323
#include <vector>
2424

25+
#include "iceberg/arrow_c_data.h"
2526
#include "iceberg/manifest_entry.h"
2627
#include "iceberg/type_fwd.h"
2728

@@ -54,6 +55,11 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
5455
int32_t files_count() const override;
5556
int64_t estimated_row_count() const override;
5657

58+
Result<ArrowArrayStream> ToArrow(const std::shared_ptr<Schema>& projected_schema,
59+
60+
const std::shared_ptr<Expression>& filter,
61+
const std::shared_ptr<FileIO>& io) const;
62+
5763
private:
5864
/// \brief Data file metadata.
5965
std::shared_ptr<DataFile> data_file_;
@@ -183,6 +189,8 @@ class ICEBERG_EXPORT DataTableScan : public TableScan {
183189
/// \brief Plans the scan tasks by resolving manifests and data files.
184190
/// \return A Result containing scan tasks or an error.
185191
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const override;
192+
193+
Result<std::vector<ArrowArrayStream>> ToArrow() const;
186194
};
187195

188196
} // namespace iceberg

0 commit comments

Comments
 (0)