Skip to content

Commit 001a86d

Browse files
committed
feat: add avro reader to registry
1 parent c00ee8f commit 001a86d

File tree

9 files changed

+185
-126
lines changed

9 files changed

+185
-126
lines changed

src/iceberg/avro/avro_data_util.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,14 +419,13 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
419419
const SchemaField& projected_field,
420420
::arrow::ArrayBuilder* array_builder) {
421421
if (avro_node->type() == ::avro::AVRO_UNION) {
422-
const auto& union_datum = avro_datum.value<::avro::GenericUnion>();
423-
size_t branch = union_datum.currentBranch();
422+
size_t branch = avro_datum.unionBranch();
424423
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
425424
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
426425
return {};
427426
} else {
428-
return AppendFieldToBuilder(avro_node->leafAt(branch), union_datum.datum(),
429-
projection, projected_field, array_builder);
427+
return AppendFieldToBuilder(avro_node->leafAt(branch), avro_datum, projection,
428+
projected_field, array_builder);
430429
}
431430
}
432431

src/iceberg/avro/avro_reader.cc

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,14 @@ struct ReadContext {
7676
// 1. prune the reader schema based on the projection
7777
// 2. read key-value metadata from the avro file
7878
// 3. collect basic reader metrics
79-
class AvroBatchReader::Impl {
79+
class AvroReader::Impl {
8080
public:
8181
Status Open(const ReaderOptions& options) {
82+
// TODO(gangwu): perhaps adding a ReaderOptions::Validate() method
83+
if (options.projection == nullptr) {
84+
return InvalidArgument("Projected schema is required by Avro reader");
85+
}
86+
8287
batch_size_ = options.batch_size;
8388
read_schema_ = options.projection;
8489

@@ -106,11 +111,10 @@ class AvroBatchReader::Impl {
106111

107112
// Project the read schema on top of the file schema.
108113
// TODO(gangwu): support pruning source fields
109-
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(),
114+
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
110115
/*prune_source=*/false));
111-
base_reader->init(file_schema);
112116
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
113-
std::move(base_reader));
117+
std::move(base_reader), file_schema);
114118

115119
if (options.split) {
116120
reader_->sync(options.split->offset);
@@ -148,6 +152,19 @@ class AvroBatchReader::Impl {
148152
return {};
149153
}
150154

155+
Result<ArrowSchema> Schema() {
156+
if (!context_) {
157+
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
158+
}
159+
ArrowSchema arrow_schema;
160+
auto export_result = ::arrow::ExportSchema(*context_->arrow_schema_, &arrow_schema);
161+
if (!export_result.ok()) {
162+
return InvalidSchema("Failed to export the arrow schema: {}",
163+
export_result.message());
164+
}
165+
return arrow_schema;
166+
}
167+
151168
private:
152169
Status InitReadContext() {
153170
context_ = std::make_unique<ReadContext>();
@@ -201,7 +218,7 @@ class AvroBatchReader::Impl {
201218
// The end of the split to read and used to terminate the reading.
202219
std::optional<int64_t> split_end_;
203220
// The schema to read.
204-
std::shared_ptr<Schema> read_schema_;
221+
std::shared_ptr<::iceberg::Schema> read_schema_;
205222
// The projection result to apply to the read schema.
206223
SchemaProjection projection_;
207224
// The avro reader to read the data into a datum.
@@ -210,13 +227,21 @@ class AvroBatchReader::Impl {
210227
std::unique_ptr<ReadContext> context_;
211228
};
212229

213-
Result<Reader::Data> AvroBatchReader::Next() { return impl_->Next(); }
230+
Result<Reader::Data> AvroReader::Next() { return impl_->Next(); }
214231

215-
Status AvroBatchReader::Open(const ReaderOptions& options) {
232+
Result<ArrowSchema> AvroReader::Schema() { return impl_->Schema(); }
233+
234+
Status AvroReader::Open(const ReaderOptions& options) {
216235
impl_ = std::make_unique<Impl>();
217236
return impl_->Open(options);
218237
}
219238

220-
Status AvroBatchReader::Close() { return impl_->Close(); }
239+
Status AvroReader::Close() { return impl_->Close(); }
240+
241+
void AvroReader::Register() {
242+
static ReaderFactoryRegistry avro_reader_register(
243+
FileFormatType::kAvro,
244+
[]() -> Result<std::unique_ptr<Reader>> { return std::make_unique<AvroReader>(); });
245+
}
221246

222247
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,22 @@
2525
namespace iceberg::avro {
2626

2727
/// \brief A reader that reads ArrowArray from Avro files.
28-
class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader {
28+
class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader {
2929
public:
30-
AvroBatchReader() = default;
30+
AvroReader() = default;
3131

32-
~AvroBatchReader() override = default;
32+
~AvroReader() override = default;
3333

3434
Status Open(const ReaderOptions& options) final;
3535

3636
Status Close() final;
3737

3838
Result<Data> Next() final;
3939

40-
DataLayout data_layout() const final { return DataLayout::kArrowArray; }
40+
Result<ArrowSchema> Schema() final;
41+
42+
/// \brief Register this Avro reader implementation.
43+
static void Register();
4144

4245
private:
4346
class Impl;

src/iceberg/avro/demo_avro.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,4 @@ std::string DemoAvro::print() const {
4949
return actual.str();
5050
}
5151

52-
Result<Reader::Data> DemoAvroReader::Next() { return std::monostate(); }
53-
54-
Reader::DataLayout DemoAvroReader::data_layout() const {
55-
return Reader::DataLayout::kStructLike;
56-
}
57-
58-
Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; }
59-
60-
Status DemoAvroReader::Close() { return {}; }
61-
62-
ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result<std::unique_ptr<Reader>> {
63-
return std::make_unique<DemoAvroReader>();
64-
});
65-
6652
} // namespace iceberg::avro

src/iceberg/avro/demo_avro.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,4 @@ class ICEBERG_BUNDLE_EXPORT DemoAvro : public Avro {
3434
std::string print() const override;
3535
};
3636

37-
class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
38-
public:
39-
DemoAvroReader() = default;
40-
~DemoAvroReader() override = default;
41-
Status Open(const ReaderOptions& options) override;
42-
Status Close() override;
43-
Result<Data> Next() override;
44-
DataLayout data_layout() const override;
45-
};
46-
4737
} // namespace iceberg::avro

src/iceberg/file_reader.cc

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,4 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
5959
return reader;
6060
}
6161

62-
StructLikeReader::StructLikeReader(std::unique_ptr<Reader> reader)
63-
: reader_(std::move(reader)) {}
64-
65-
Result<Reader::Data> StructLikeReader::Next() { return NotImplemented(""); }
66-
67-
BatchReader::BatchReader(std::unique_ptr<Reader> reader) : reader_(std::move(reader)) {}
68-
69-
Result<Reader::Data> BatchReader::Next() { return NotImplemented(""); }
70-
7162
} // namespace iceberg

src/iceberg/file_reader.h

Lines changed: 6 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -50,66 +50,12 @@ class ICEBERG_EXPORT Reader {
5050

5151
/// \brief Read next data from file.
5252
///
53-
/// \return std::monostate if the reader has no more data, otherwise `ArrowArray` or
54-
/// `StructLike` depending on the data layout by the reader implementation.
55-
using Data =
56-
std::variant<std::monostate, ArrowArray, std::reference_wrapper<const StructLike>>;
53+
/// \return std::monostate if the reader has no more data, otherwise `ArrowArray`.
54+
using Data = std::variant<std::monostate, ArrowArray>;
5755
virtual Result<Data> Next() = 0;
5856

59-
enum class DataLayout { kArrowArray, kStructLike };
60-
61-
/// \brief Get the data layout returned by `Next()` of the reader.
62-
virtual DataLayout data_layout() const = 0;
63-
};
64-
65-
/// \brief Wrapper of `Reader` to always return `StructLike`.
66-
///
67-
/// If the data layout of the wrapped reader is `ArrowArray`, the data will be converted
68-
/// to `StructLike`; otherwise, the data will be returned as is without any cost.
69-
class ICEBERG_EXPORT StructLikeReader : public Reader {
70-
public:
71-
explicit StructLikeReader(std::unique_ptr<Reader> reader);
72-
73-
~StructLikeReader() override = default;
74-
75-
/// \brief Always read data into `StructLike` or monostate if no more data.
76-
Result<Data> Next() final;
77-
78-
DataLayout data_layout() const final { return DataLayout::kStructLike; }
79-
80-
Status Open(const struct ReaderOptions& options) final {
81-
return reader_->Open(options);
82-
}
83-
84-
Status Close() final { return reader_->Close(); }
85-
86-
private:
87-
std::unique_ptr<Reader> reader_;
88-
};
89-
90-
/// \brief Wrapper of `Reader` to always return `ArrowArray`.
91-
///
92-
/// If the data layout of the wrapped reader is `StructLike`, the data will be converted
93-
/// to `ArrowArray`; otherwise, the data will be returned as is without any cost.
94-
class ICEBERG_EXPORT BatchReader : public Reader {
95-
public:
96-
explicit BatchReader(std::unique_ptr<Reader> reader);
97-
98-
~BatchReader() override = default;
99-
100-
/// \brief Always read data into `ArrowArray` or monostate if no more data.
101-
Result<Data> Next() final;
102-
103-
DataLayout data_layout() const final { return DataLayout::kArrowArray; }
104-
105-
Status Open(const struct ReaderOptions& options) final {
106-
return reader_->Open(options);
107-
}
108-
109-
Status Close() final { return reader_->Close(); }
110-
111-
private:
112-
std::unique_ptr<Reader> reader_;
57+
/// \brief Get the schema of the data.
58+
virtual Result<ArrowSchema> Schema() = 0;
11359
};
11460

11561
/// \brief A split of the file to read.
@@ -130,12 +76,12 @@ struct ICEBERG_EXPORT ReaderOptions {
13076
std::optional<Split> split;
13177
/// \brief The batch size to read. Only applies to implementations that support
13278
/// batching.
133-
int64_t batch_size;
79+
int64_t batch_size = 4096;
13480
/// \brief FileIO instance to open the file. Reader implementations should down cast it
13581
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
13682
/// `ArrowFileSystemFileIO` as the default implementation.
13783
std::shared_ptr<class FileIO> io;
138-
/// \brief The projection schema to read from the file.
84+
/// \brief The projection schema to read from the file. This field is required.
13985
std::shared_ptr<class Schema> projection;
14086
/// \brief The filter to apply to the data. Reader implementations may ignore this if
14187
/// the file format does not support filtering.
@@ -160,9 +106,4 @@ struct ICEBERG_EXPORT ReaderFactoryRegistry {
160106
const ReaderOptions& options);
161107
};
162108

163-
/// \brief Macro to register a reader factory for a specific file format.
164-
#define ICEBERG_REGISTER_READER_FACTORY(format_type, reader_factory) \
165-
static ::iceberg::ReaderFactoryRegistry register_reader_factory_##format_type( \
166-
::iceberg::FileFormatType::k##format_type, reader_factory);
167-
168109
} // namespace iceberg

src/iceberg/manifest_reader.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ICEBERG_EXPORT ManifestReader {
3737
virtual Result<std::span<std::unique_ptr<ManifestEntry>>> Entries() const = 0;
3838

3939
private:
40-
std::unique_ptr<StructLikeReader> reader_;
40+
std::unique_ptr<Reader> reader_;
4141
};
4242

4343
/// \brief Read manifest files from a manifest list file.
@@ -46,7 +46,7 @@ class ICEBERG_EXPORT ManifestListReader {
4646
virtual Result<std::span<std::unique_ptr<ManifestFile>>> Files() const = 0;
4747

4848
private:
49-
std::unique_ptr<StructLikeReader> reader_;
49+
std::unique_ptr<Reader> reader_;
5050
};
5151

5252
} // namespace iceberg

0 commit comments

Comments
 (0)