Skip to content

Commit c06c261

Browse files
authored
feat: add avro reader to registry (#133)
- Added a function to register the Avro reader impl to the registery. - Removed reader adapters for ArrowArray and StructLike. - Added simple test cases for Avro reader. - Removed demo code.
1 parent c00ee8f commit c06c261

File tree

16 files changed

+212
-386
lines changed

16 files changed

+212
-386
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,11 +299,26 @@ function(resolve_zlib_dependency)
299299

300300
endfunction()
301301

302+
# ----------------------------------------------------------------------
303+
# Zstd
304+
305+
function(resolve_zstd_dependency)
306+
find_package(zstd CONFIG)
307+
if(zstd_FOUND)
308+
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES zstd)
309+
message(STATUS "Found zstd, version: ${zstd_VERSION}")
310+
set(ICEBERG_SYSTEM_DEPENDENCIES
311+
${ICEBERG_SYSTEM_DEPENDENCIES}
312+
PARENT_SCOPE)
313+
endif()
314+
endfunction()
315+
302316
resolve_zlib_dependency()
303317
resolve_nanoarrow_dependency()
304318
resolve_nlohmann_json_dependency()
305319

306320
if(ICEBERG_BUILD_BUNDLE)
307321
resolve_arrow_dependency()
308322
resolve_avro_dependency()
323+
resolve_zstd_dependency()
309324
endif()

example/demo_example.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
#include <iostream>
2121

22-
#include "iceberg/arrow/demo_arrow.h"
23-
#include "iceberg/avro/demo_avro.h"
24-
#include "iceberg/demo.h"
22+
#include "iceberg/avro/avro_reader.h"
23+
#include "iceberg/file_reader.h"
2524

2625
int main() {
27-
std::cout << iceberg::Demo().print() << std::endl;
28-
std::cout << iceberg::arrow::DemoArrow().print() << std::endl;
29-
std::cout << iceberg::avro::DemoAvro().print() << std::endl;
26+
iceberg::avro::AvroReader::Register();
27+
auto open_result = iceberg::ReaderFactoryRegistry::Open(
28+
iceberg::FileFormatType::kAvro, {.path = "non-existing-file.avro"});
29+
if (!open_result.has_value()) {
30+
std::cerr << "Failed to open avro file" << std::endl;
31+
return 1;
32+
}
3033
return 0;
3134
}

src/iceberg/CMakeLists.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_internal.cc
2222
catalog/in_memory_catalog.cc
23-
demo.cc
2423
expression/expression.cc
2524
expression/literal.cc
2625
file_reader.cc
@@ -94,9 +93,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
9493

9594
if(ICEBERG_BUILD_BUNDLE)
9695
set(ICEBERG_BUNDLE_SOURCES
97-
arrow/demo_arrow.cc
9896
arrow/arrow_fs_file_io.cc
99-
avro/demo_avro.cc
10097
avro/avro_data_util.cc
10198
avro/avro_reader.cc
10299
avro/avro_schema_util.cc

src/iceberg/arrow/demo_arrow.cc

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/iceberg/arrow/demo_arrow.h

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/iceberg/avro/avro_data_util.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,14 +419,13 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
419419
const SchemaField& projected_field,
420420
::arrow::ArrayBuilder* array_builder) {
421421
if (avro_node->type() == ::avro::AVRO_UNION) {
422-
const auto& union_datum = avro_datum.value<::avro::GenericUnion>();
423-
size_t branch = union_datum.currentBranch();
422+
size_t branch = avro_datum.unionBranch();
424423
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
425424
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
426425
return {};
427426
} else {
428-
return AppendFieldToBuilder(avro_node->leafAt(branch), union_datum.datum(),
429-
projection, projected_field, array_builder);
427+
return AppendFieldToBuilder(avro_node->leafAt(branch), avro_datum, projection,
428+
projected_field, array_builder);
430429
}
431430
}
432431

src/iceberg/avro/avro_reader.cc

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,14 @@ struct ReadContext {
7676
// 1. prune the reader schema based on the projection
7777
// 2. read key-value metadata from the avro file
7878
// 3. collect basic reader metrics
79-
class AvroBatchReader::Impl {
79+
class AvroReader::Impl {
8080
public:
8181
Status Open(const ReaderOptions& options) {
82+
// TODO(gangwu): perhaps adding a ReaderOptions::Validate() method
83+
if (options.projection == nullptr) {
84+
return InvalidArgument("Projected schema is required by Avro reader");
85+
}
86+
8287
batch_size_ = options.batch_size;
8388
read_schema_ = options.projection;
8489

@@ -106,11 +111,10 @@ class AvroBatchReader::Impl {
106111

107112
// Project the read schema on top of the file schema.
108113
// TODO(gangwu): support pruning source fields
109-
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(),
114+
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
110115
/*prune_source=*/false));
111-
base_reader->init(file_schema);
112116
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
113-
std::move(base_reader));
117+
std::move(base_reader), file_schema);
114118

115119
if (options.split) {
116120
reader_->sync(options.split->offset);
@@ -119,7 +123,7 @@ class AvroBatchReader::Impl {
119123
return {};
120124
}
121125

122-
Result<Data> Next() {
126+
Result<std::optional<ArrowArray>> Next() {
123127
if (!context_) {
124128
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
125129
}
@@ -148,6 +152,19 @@ class AvroBatchReader::Impl {
148152
return {};
149153
}
150154

155+
Result<ArrowSchema> Schema() {
156+
if (!context_) {
157+
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
158+
}
159+
ArrowSchema arrow_schema;
160+
auto export_result = ::arrow::ExportSchema(*context_->arrow_schema_, &arrow_schema);
161+
if (!export_result.ok()) {
162+
return InvalidSchema("Failed to export the arrow schema: {}",
163+
export_result.message());
164+
}
165+
return arrow_schema;
166+
}
167+
151168
private:
152169
Status InitReadContext() {
153170
context_ = std::make_unique<ReadContext>();
@@ -174,9 +191,9 @@ class AvroBatchReader::Impl {
174191
return {};
175192
}
176193

177-
Result<Data> ConvertBuilderToArrowArray() {
194+
Result<std::optional<ArrowArray>> ConvertBuilderToArrowArray() {
178195
if (context_->builder_->length() == 0) {
179-
return {};
196+
return std::nullopt;
180197
}
181198

182199
auto builder_result = context_->builder_->Finish();
@@ -201,7 +218,7 @@ class AvroBatchReader::Impl {
201218
// The end of the split to read and used to terminate the reading.
202219
std::optional<int64_t> split_end_;
203220
// The schema to read.
204-
std::shared_ptr<Schema> read_schema_;
221+
std::shared_ptr<::iceberg::Schema> read_schema_;
205222
// The projection result to apply to the read schema.
206223
SchemaProjection projection_;
207224
// The avro reader to read the data into a datum.
@@ -210,13 +227,21 @@ class AvroBatchReader::Impl {
210227
std::unique_ptr<ReadContext> context_;
211228
};
212229

213-
Result<Reader::Data> AvroBatchReader::Next() { return impl_->Next(); }
230+
Result<std::optional<ArrowArray>> AvroReader::Next() { return impl_->Next(); }
214231

215-
Status AvroBatchReader::Open(const ReaderOptions& options) {
232+
Result<ArrowSchema> AvroReader::Schema() { return impl_->Schema(); }
233+
234+
Status AvroReader::Open(const ReaderOptions& options) {
216235
impl_ = std::make_unique<Impl>();
217236
return impl_->Open(options);
218237
}
219238

220-
Status AvroBatchReader::Close() { return impl_->Close(); }
239+
Status AvroReader::Close() { return impl_->Close(); }
240+
241+
void AvroReader::Register() {
242+
static ReaderFactoryRegistry avro_reader_register(
243+
FileFormatType::kAvro,
244+
[]() -> Result<std::unique_ptr<Reader>> { return std::make_unique<AvroReader>(); });
245+
}
221246

222247
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,22 @@
2525
namespace iceberg::avro {
2626

2727
/// \brief A reader that reads ArrowArray from Avro files.
28-
class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader {
28+
class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader {
2929
public:
30-
AvroBatchReader() = default;
30+
AvroReader() = default;
3131

32-
~AvroBatchReader() override = default;
32+
~AvroReader() override = default;
3333

3434
Status Open(const ReaderOptions& options) final;
3535

3636
Status Close() final;
3737

38-
Result<Data> Next() final;
38+
Result<std::optional<ArrowArray>> Next() final;
3939

40-
DataLayout data_layout() const final { return DataLayout::kArrowArray; }
40+
Result<ArrowSchema> Schema() final;
41+
42+
/// \brief Register this Avro reader implementation.
43+
static void Register();
4144

4245
private:
4346
class Impl;

src/iceberg/avro/demo_avro.cc

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)