Skip to content

Commit cf2a0ea

Browse files
committed
feat: Introduce ArrowArrayReader factory on FileScanTask
1 parent 9f13bac commit cf2a0ea

File tree

11 files changed

+298
-45
lines changed

11 files changed

+298
-45
lines changed

src/iceberg/arrow_array_reader.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <optional>
23+
24+
#include "iceberg/arrow_c_data.h"
25+
#include "iceberg/iceberg_export.h"
26+
#include "iceberg/result.h"
27+
28+
namespace iceberg {
29+
30+
class ICEBERG_EXPORT ArrowArrayReader {
31+
public:
32+
/// \brief Read next batch of data.
33+
///
34+
/// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`.
35+
virtual Result<std::optional<ArrowArray>> Next() = 0;
36+
37+
/// \brief Get schema of data returned by `Next`.
38+
virtual Result<ArrowSchema> Schema() const = 0;
39+
40+
/// \brief Close this reader and release all resources.
41+
virtual Status Close() = 0;
42+
43+
virtual ~ArrowArrayReader() = default;
44+
};
45+
46+
} // namespace iceberg

src/iceberg/avro/avro_reader.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class AvroReader::Impl {
160160
return {};
161161
}
162162

163-
Result<ArrowSchema> Schema() {
163+
Result<ArrowSchema> Schema() const {
164164
if (!context_) {
165165
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
166166
}
@@ -174,7 +174,7 @@ class AvroReader::Impl {
174174
}
175175

176176
private:
177-
Status InitReadContext() {
177+
Status InitReadContext() const {
178178
context_ = std::make_unique<ReadContext>();
179179
context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema());
180180

@@ -232,14 +232,14 @@ class AvroReader::Impl {
232232
// The avro reader to read the data into a datum.
233233
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
234234
// The context to keep track of the reading progress.
235-
std::unique_ptr<ReadContext> context_;
235+
mutable std::unique_ptr<ReadContext> context_;
236236
};
237237

238238
AvroReader::~AvroReader() = default;
239239

240240
Result<std::optional<ArrowArray>> AvroReader::Next() { return impl_->Next(); }
241241

242-
Result<ArrowSchema> AvroReader::Schema() { return impl_->Schema(); }
242+
Result<ArrowSchema> AvroReader::Schema() const { return impl_->Schema(); }
243243

244244
Status AvroReader::Open(const ReaderOptions& options) {
245245
impl_ = std::make_unique<Impl>();

src/iceberg/avro/avro_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader {
3737

3838
Result<std::optional<ArrowArray>> Next() final;
3939

40-
Result<ArrowSchema> Schema() final;
40+
Result<ArrowSchema> Schema() const final;
4141

4242
private:
4343
class Impl;

src/iceberg/file_reader.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <memory>
2727
#include <optional>
2828

29+
#include "iceberg/arrow_array_reader.h"
2930
#include "iceberg/arrow_c_data.h"
3031
#include "iceberg/file_format.h"
3132
#include "iceberg/result.h"
@@ -34,7 +35,7 @@
3435
namespace iceberg {
3536

3637
/// \brief Base reader class to read data from different file formats.
37-
class ICEBERG_EXPORT Reader {
38+
class ICEBERG_EXPORT Reader : public ArrowArrayReader {
3839
public:
3940
virtual ~Reader() = default;
4041
Reader() = default;
@@ -45,15 +46,15 @@ class ICEBERG_EXPORT Reader {
4546
virtual Status Open(const struct ReaderOptions& options) = 0;
4647

4748
/// \brief Close the reader.
48-
virtual Status Close() = 0;
49+
Status Close() override = 0;
4950

5051
/// \brief Read next data from file.
5152
///
5253
/// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`.
53-
virtual Result<std::optional<ArrowArray>> Next() = 0;
54+
Result<std::optional<ArrowArray>> Next() override = 0;
5455

5556
/// \brief Get the schema of the data.
56-
virtual Result<ArrowSchema> Schema() = 0;
57+
Result<ArrowSchema> Schema() const override = 0;
5758
};
5859

5960
/// \brief A split of the file to read.

src/iceberg/parquet/parquet_reader.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class ParquetReader::Impl {
173173
}
174174

175175
// Get the schema of the data
176-
Result<ArrowSchema> Schema() {
176+
Result<ArrowSchema> Schema() const {
177177
if (!context_) {
178178
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
179179
}
@@ -185,7 +185,7 @@ class ParquetReader::Impl {
185185
}
186186

187187
private:
188-
Status InitReadContext() {
188+
Status InitReadContext() const {
189189
context_ = std::make_unique<ReadContext>();
190190

191191
// Build the output Arrow schema
@@ -239,14 +239,14 @@ class ParquetReader::Impl {
239239
// Parquet file reader to create RecordBatchReader.
240240
std::unique_ptr<::parquet::arrow::FileReader> reader_;
241241
// The context to keep track of the reading progress.
242-
std::unique_ptr<ReadContext> context_;
242+
mutable std::unique_ptr<ReadContext> context_;
243243
};
244244

245245
ParquetReader::~ParquetReader() = default;
246246

247247
Result<std::optional<ArrowArray>> ParquetReader::Next() { return impl_->Next(); }
248248

249-
Result<ArrowSchema> ParquetReader::Schema() { return impl_->Schema(); }
249+
Result<ArrowSchema> ParquetReader::Schema() const { return impl_->Schema(); }
250250

251251
Status ParquetReader::Open(const ReaderOptions& options) {
252252
impl_ = std::make_unique<Impl>();

src/iceberg/parquet/parquet_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ICEBERG_BUNDLE_EXPORT ParquetReader : public Reader {
3737

3838
Result<std::optional<ArrowArray>> Next() final;
3939

40-
Result<ArrowSchema> Schema() final;
40+
Result<ArrowSchema> Schema() const final;
4141

4242
private:
4343
class Impl;

src/iceberg/table_scan.cc

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121

2222
#include <algorithm>
2323
#include <ranges>
24+
#include <utility>
2425

26+
#include <iceberg/file_format.h>
27+
28+
#include "iceberg/file_reader.h"
2529
#include "iceberg/manifest_entry.h"
2630
#include "iceberg/manifest_list.h"
2731
#include "iceberg/manifest_reader.h"
@@ -33,18 +37,6 @@
3337

3438
namespace iceberg {
3539

36-
// implement FileScanTask
37-
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
38-
: data_file_(std::move(data_file)) {}
39-
40-
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }
41-
42-
int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; }
43-
44-
int32_t FileScanTask::files_count() const { return 1; }
45-
46-
int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; }
47-
4840
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
4941
std::shared_ptr<FileIO> file_io)
5042
: file_io_(std::move(file_io)) {
@@ -178,4 +170,29 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
178170
return tasks;
179171
}
180172

173+
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
174+
: data_file_(std::move(data_file)) {}
175+
176+
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }
177+
178+
int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; }
179+
180+
int32_t FileScanTask::files_count() const { return 1; }
181+
182+
int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; }
183+
184+
Result<std::unique_ptr<ArrowArrayReader>> FileScanTask::ToArrowArrayReader(
185+
const TableScanContext& context, const std::shared_ptr<FileIO>& io) const {
186+
const ReaderOptions options{.path = data_file_->file_path,
187+
.length = data_file_->file_size_in_bytes,
188+
.io = io,
189+
.projection = context.projected_schema,
190+
.filter = context.filter};
191+
192+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
193+
ReaderFactoryRegistry::Open(data_file_->file_format, options));
194+
195+
return std::move(reader);
196+
}
197+
181198
} // namespace iceberg

src/iceberg/table_scan.h

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <string>
2323
#include <vector>
2424

25+
#include "iceberg/arrow_array_reader.h"
2526
#include "iceberg/manifest_entry.h"
2627
#include "iceberg/type_fwd.h"
2728

@@ -42,23 +43,6 @@ class ICEBERG_EXPORT ScanTask {
4243
virtual int64_t estimated_row_count() const = 0;
4344
};
4445

45-
/// \brief Task representing a data file and its corresponding delete files.
46-
class ICEBERG_EXPORT FileScanTask : public ScanTask {
47-
public:
48-
explicit FileScanTask(std::shared_ptr<DataFile> data_file);
49-
50-
/// \brief The data file that should be read by this scan task.
51-
const std::shared_ptr<DataFile>& data_file() const;
52-
53-
int64_t size_bytes() const override;
54-
int32_t files_count() const override;
55-
int64_t estimated_row_count() const override;
56-
57-
private:
58-
/// \brief Data file metadata.
59-
std::shared_ptr<DataFile> data_file_;
60-
};
61-
6246
/// \brief Scan context holding snapshot and scan-specific metadata.
6347
struct TableScanContext {
6448
/// \brief Table metadata.
@@ -185,4 +169,40 @@ class ICEBERG_EXPORT DataTableScan : public TableScan {
185169
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const override;
186170
};
187171

172+
/// \brief Task representing a data file and its corresponding delete files.
173+
class ICEBERG_EXPORT FileScanTask : public ScanTask {
174+
public:
175+
explicit FileScanTask(std::shared_ptr<DataFile> data_file);
176+
177+
/// \brief The data file that should be read by this scan task.
178+
const std::shared_ptr<DataFile>& data_file() const;
179+
180+
/// \brief The total size in bytes of the file split to be read.
181+
int64_t size_bytes() const override;
182+
183+
/// \brief The number of files that should be read by this scan task.
184+
int32_t files_count() const override;
185+
186+
/// \brief The number of rows that should be read by this scan task.
187+
int64_t estimated_row_count() const override;
188+
189+
/**
190+
* \brief Creates and returns an ArrowArrayReader to read the data for this task.
191+
*
192+
* This acts as a factory to instantiate a file-format-specific reader (e.g., Parquet)
193+
* based on the metadata in this task and the provided context.
194+
*
195+
* \param context The table scan context, used to configure the reader (e.g., with the
196+
* projected schema).
197+
* \param io The FileIO instance for accessing the file data.
198+
* \return A Result containing a unique pointer to the reader, or an error on failure.
199+
*/
200+
Result<std::unique_ptr<ArrowArrayReader>> ToArrowArrayReader(
201+
const TableScanContext& context, const std::shared_ptr<FileIO>& io) const;
202+
203+
private:
204+
/// \brief Data file metadata.
205+
std::shared_ptr<DataFile> data_file_;
206+
};
207+
188208
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,6 @@ if(ICEBERG_BUILD_BUNDLE)
122122
SOURCES
123123
parquet_data_test.cc
124124
parquet_schema_test.cc
125-
parquet_test.cc)
125+
parquet_test.cc
126+
file_scan_task_test.cc)
126127
endif()

0 commit comments

Comments
 (0)