Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,13 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
const SchemaField& projected_field,
::arrow::ArrayBuilder* array_builder) {
if (avro_node->type() == ::avro::AVRO_UNION) {
const auto& union_datum = avro_datum.value<::avro::GenericUnion>();
size_t branch = union_datum.currentBranch();
size_t branch = avro_datum.unionBranch();
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
return {};
} else {
return AppendFieldToBuilder(avro_node->leafAt(branch), union_datum.datum(),
projection, projected_field, array_builder);
return AppendFieldToBuilder(avro_node->leafAt(branch), avro_datum, projection,
projected_field, array_builder);
}
}

Expand Down
41 changes: 33 additions & 8 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ struct ReadContext {
// 1. prune the reader schema based on the projection
// 2. read key-value metadata from the avro file
// 3. collect basic reader metrics
class AvroBatchReader::Impl {
class AvroReader::Impl {
public:
Status Open(const ReaderOptions& options) {
// TODO(gangwu): perhaps adding a ReaderOptions::Validate() method
if (options.projection == nullptr) {
return InvalidArgument("Projected schema is required by Avro reader");
}

batch_size_ = options.batch_size;
read_schema_ = options.projection;

Expand Down Expand Up @@ -106,11 +111,10 @@ class AvroBatchReader::Impl {

// Project the read schema on top of the file schema.
// TODO(gangwu): support pruning source fields
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(),
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
/*prune_source=*/false));
base_reader->init(file_schema);
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
std::move(base_reader));
std::move(base_reader), file_schema);

if (options.split) {
reader_->sync(options.split->offset);
Expand Down Expand Up @@ -148,6 +152,19 @@ class AvroBatchReader::Impl {
return {};
}

Result<ArrowSchema> Schema() {
if (!context_) {
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
}
ArrowSchema arrow_schema;
auto export_result = ::arrow::ExportSchema(*context_->arrow_schema_, &arrow_schema);
if (!export_result.ok()) {
return InvalidSchema("Failed to export the arrow schema: {}",
export_result.message());
}
return arrow_schema;
}

private:
Status InitReadContext() {
context_ = std::make_unique<ReadContext>();
Expand Down Expand Up @@ -201,7 +218,7 @@ class AvroBatchReader::Impl {
// The end of the split to read and used to terminate the reading.
std::optional<int64_t> split_end_;
// The schema to read.
std::shared_ptr<Schema> read_schema_;
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
// The avro reader to read the data into a datum.
Expand All @@ -210,13 +227,21 @@ class AvroBatchReader::Impl {
std::unique_ptr<ReadContext> context_;
};

Result<Reader::Data> AvroBatchReader::Next() { return impl_->Next(); }
Result<Reader::Data> AvroReader::Next() { return impl_->Next(); }

Status AvroBatchReader::Open(const ReaderOptions& options) {
Result<ArrowSchema> AvroReader::Schema() { return impl_->Schema(); }

Status AvroReader::Open(const ReaderOptions& options) {
impl_ = std::make_unique<Impl>();
return impl_->Open(options);
}

Status AvroBatchReader::Close() { return impl_->Close(); }
Status AvroReader::Close() { return impl_->Close(); }

void AvroReader::Register() {
static ReaderFactoryRegistry avro_reader_register(
FileFormatType::kAvro,
[]() -> Result<std::unique_ptr<Reader>> { return std::make_unique<AvroReader>(); });
}

} // namespace iceberg::avro
11 changes: 7 additions & 4 deletions src/iceberg/avro/avro_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@
namespace iceberg::avro {

/// \brief A reader that reads ArrowArray from Avro files.
class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader {
class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader {
public:
AvroBatchReader() = default;
AvroReader() = default;

~AvroBatchReader() override = default;
~AvroReader() override = default;

Status Open(const ReaderOptions& options) final;

Status Close() final;

Result<Data> Next() final;

DataLayout data_layout() const final { return DataLayout::kArrowArray; }
Result<ArrowSchema> Schema() final;

/// \brief Register this Avro reader implementation.
static void Register();

private:
class Impl;
Expand Down
14 changes: 0 additions & 14 deletions src/iceberg/avro/demo_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,4 @@ std::string DemoAvro::print() const {
return actual.str();
}

Result<Reader::Data> DemoAvroReader::Next() { return std::monostate(); }

Reader::DataLayout DemoAvroReader::data_layout() const {
return Reader::DataLayout::kStructLike;
}

Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; }

Status DemoAvroReader::Close() { return {}; }

ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result<std::unique_ptr<Reader>> {
return std::make_unique<DemoAvroReader>();
});

} // namespace iceberg::avro
10 changes: 0 additions & 10 deletions src/iceberg/avro/demo_avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,4 @@ class ICEBERG_BUNDLE_EXPORT DemoAvro : public Avro {
std::string print() const override;
};

class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
public:
DemoAvroReader() = default;
~DemoAvroReader() override = default;
Status Open(const ReaderOptions& options) override;
Status Close() override;
Result<Data> Next() override;
DataLayout data_layout() const override;
};

} // namespace iceberg::avro
9 changes: 0 additions & 9 deletions src/iceberg/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,4 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
return reader;
}

StructLikeReader::StructLikeReader(std::unique_ptr<Reader> reader)
: reader_(std::move(reader)) {}

Result<Reader::Data> StructLikeReader::Next() { return NotImplemented(""); }

BatchReader::BatchReader(std::unique_ptr<Reader> reader) : reader_(std::move(reader)) {}

Result<Reader::Data> BatchReader::Next() { return NotImplemented(""); }

} // namespace iceberg
71 changes: 6 additions & 65 deletions src/iceberg/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,66 +50,12 @@ class ICEBERG_EXPORT Reader {

/// \brief Read next data from file.
///
/// \return std::monostate if the reader has no more data, otherwise `ArrowArray` or
/// `StructLike` depending on the data layout by the reader implementation.
using Data =
std::variant<std::monostate, ArrowArray, std::reference_wrapper<const StructLike>>;
/// \return std::monostate if the reader has no more data, otherwise `ArrowArray`.
using Data = std::variant<std::monostate, ArrowArray>;
virtual Result<Data> Next() = 0;

enum class DataLayout { kArrowArray, kStructLike };

/// \brief Get the data layout returned by `Next()` of the reader.
virtual DataLayout data_layout() const = 0;
};

/// \brief Wrapper of `Reader` to always return `StructLike`.
///
/// If the data layout of the wrapped reader is `ArrowArray`, the data will be converted
/// to `StructLike`; otherwise, the data will be returned as is without any cost.
class ICEBERG_EXPORT StructLikeReader : public Reader {
public:
explicit StructLikeReader(std::unique_ptr<Reader> reader);

~StructLikeReader() override = default;

/// \brief Always read data into `StructLike` or monostate if no more data.
Result<Data> Next() final;

DataLayout data_layout() const final { return DataLayout::kStructLike; }

Status Open(const struct ReaderOptions& options) final {
return reader_->Open(options);
}

Status Close() final { return reader_->Close(); }

private:
std::unique_ptr<Reader> reader_;
};

/// \brief Wrapper of `Reader` to always return `ArrowArray`.
///
/// If the data layout of the wrapped reader is `StructLike`, the data will be converted
/// to `ArrowArray`; otherwise, the data will be returned as is without any cost.
class ICEBERG_EXPORT BatchReader : public Reader {
public:
explicit BatchReader(std::unique_ptr<Reader> reader);

~BatchReader() override = default;

/// \brief Always read data into `ArrowArray` or monostate if no more data.
Result<Data> Next() final;

DataLayout data_layout() const final { return DataLayout::kArrowArray; }

Status Open(const struct ReaderOptions& options) final {
return reader_->Open(options);
}

Status Close() final { return reader_->Close(); }

private:
std::unique_ptr<Reader> reader_;
/// \brief Get the schema of the data.
virtual Result<ArrowSchema> Schema() = 0;
};

/// \brief A split of the file to read.
Expand All @@ -130,12 +76,12 @@ struct ICEBERG_EXPORT ReaderOptions {
std::optional<Split> split;
/// \brief The batch size to read. Only applies to implementations that support
/// batching.
int64_t batch_size;
int64_t batch_size = 4096;
/// \brief FileIO instance to open the file. Reader implementations should down cast it
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
/// `ArrowFileSystemFileIO` as the default implementation.
std::shared_ptr<class FileIO> io;
/// \brief The projection schema to read from the file.
/// \brief The projection schema to read from the file. This field is required.
std::shared_ptr<class Schema> projection;
/// \brief The filter to apply to the data. Reader implementations may ignore this if
/// the file format does not support filtering.
Expand All @@ -160,9 +106,4 @@ struct ICEBERG_EXPORT ReaderFactoryRegistry {
const ReaderOptions& options);
};

/// \brief Macro to register a reader factory for a specific file format.
#define ICEBERG_REGISTER_READER_FACTORY(format_type, reader_factory) \
static ::iceberg::ReaderFactoryRegistry register_reader_factory_##format_type( \
::iceberg::FileFormatType::k##format_type, reader_factory);

} // namespace iceberg
4 changes: 2 additions & 2 deletions src/iceberg/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ICEBERG_EXPORT ManifestReader {
virtual Result<std::span<std::unique_ptr<ManifestEntry>>> Entries() const = 0;

private:
std::unique_ptr<StructLikeReader> reader_;
std::unique_ptr<Reader> reader_;
};

/// \brief Read manifest files from a manifest list file.
Expand All @@ -46,7 +46,7 @@ class ICEBERG_EXPORT ManifestListReader {
virtual Result<std::span<std::unique_ptr<ManifestFile>>> Files() const = 0;

private:
std::unique_ptr<StructLikeReader> reader_;
std::unique_ptr<Reader> reader_;
};

} // namespace iceberg
Loading
Loading