Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ if(ICEBERG_BUILD_BUNDLE)
arrow/demo_arrow.cc
arrow/arrow_fs_file_io.cc
avro/demo_avro.cc
avro/avro_data_util.cc
avro/avro_reader.cc
avro/avro_schema_util.cc
avro/avro_stream_internal.cc)

Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/arrow/arrow_fs_file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
/// \brief Delete a file at the given location.
Status DeleteFile(const std::string& file_location) override;

/// \brief Get the Arrow file system.
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko We have two libraries: libiceberg and libiceberg-bundle:

  • libiceberg: it only uses interfaces of FileIO and FileReader, downstream projects should provide their own implementations.
  • libiceberg-bundle: it uses ArrowFileSystemFileIO as the implementation and both Avro and Parquet reader implementations assume that arrow::FileSystem is available.


private:
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
};
Expand Down
32 changes: 32 additions & 0 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/avro/avro_data_util_internal.h"

namespace iceberg::avro {

Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const SchemaProjection& projection,
const Schema& arrow_schema,
::arrow::ArrayBuilder* array_builder) {
return NotImplemented("AppendDatumToBuilder is not yet implemented");
}

} // namespace iceberg::avro
35 changes: 35 additions & 0 deletions src/iceberg/avro/avro_data_util_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <arrow/array/builder_base.h>
#include <avro/GenericDatum.hh>

#include "iceberg/schema_util.h"

namespace iceberg::avro {

Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const SchemaProjection& projection,
const Schema& arrow_schema,
::arrow::ArrayBuilder* array_builder);

} // namespace iceberg::avro
222 changes: 222 additions & 0 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/avro/avro_reader.h"

#include <memory>

#include <arrow/array/builder_base.h>
#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include <avro/DataFile.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>

#include "iceberg/arrow/arrow_fs_file_io.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

namespace iceberg::avro {

namespace {

Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions& options,
int64_t buffer_size) {
::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
if (options.length) {
file_info.set_size(options.length.value());
}

auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
auto result = io->fs()->OpenInputFile(file_info);
if (!result.ok()) {
return IOError("Failed to open file {} for {}", options.path,
result.status().message());
}

return std::make_unique<AvroInputStream>(result.MoveValueUnsafe(), buffer_size);
}

} // namespace

// A stateful context to keep track of the reading progress.
struct ReadContext {
// The datum to reuse for reading the data.
std::unique_ptr<::avro::GenericDatum> datum_;
// The arrow schema to build the record batch.
std::shared_ptr<::arrow::Schema> arrow_schema_;
// The builder to build the record batch.
std::shared_ptr<::arrow::ArrayBuilder> builder_;
};

// TODO(gang.wu): there are a lot to do to make this reader work.
// 1. prune the reader schema based on the projection
// 2. read key-value metadata from the avro file
// 3. collect basic reader metrics
class AvroBatchReader::Impl {
public:
Status Open(const ReaderOptions& options) {
batch_size_ = options.batch_size;
read_schema_ = options.projection;

// Open the input stream and adapt to the avro interface.
// TODO(gangwu): make this configurable
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the TODO, we can also defer this to a later PR. Implementations aim to create manifest files of ~8MB, and performance-wise wise it is best to read them all the way directly. The manifest list can be unbounded (theoretically).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I just blindly chose a default value for now.

ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
CreateInputStream(options, kDefaultBufferSize));

// Create a base reader without setting reader schema to enable projection.
auto base_reader =
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();

// Validate field ids in the file schema.
HasIdVisitor has_id_visitor;
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
if (has_id_visitor.HasNoIds()) {
// TODO(gangwu): support applying field-ids based on name mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that name-mapping only applies to data-files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we just need to pass NameMapping object (if available) via ReaderOptions to read data files. For manifest and manifest list files, it should error out when field ids are missing.

return NotImplemented("Avro file schema has no field IDs");
}
if (!has_id_visitor.AllHaveIds()) {
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
}

// Project the read schema on top of the file schema.
// TODO(gangwu): support pruning source fields
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(),
/*prune_source=*/false));
base_reader->init(file_schema);
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
std::move(base_reader));

if (options.split) {
reader_->sync(options.split->offset);
split_end_ = options.split->offset + options.split->length;
}
return {};
}

Result<Data> Next() {
if (!context_) {
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
}

while (context_->builder_->length() < batch_size_) {
if (split_end_ && reader_->pastSync(split_end_.value())) {
break;
}
if (!reader_->read(*context_->datum_)) {
break;
}
ICEBERG_RETURN_UNEXPECTED(
AppendDatumToBuilder(reader_->readerSchema().root(), *context_->datum_,
projection_, *read_schema_, context_->builder_.get()));
}

return ConvertBuilderToArrowArray();
}

Status Close() {
if (reader_ != nullptr) {
reader_->close();
reader_.reset();
}
context_.reset();
return {};
}

private:
Status InitReadContext() {
context_ = std::make_unique<ReadContext>();
context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema());

ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
auto import_result = ::arrow::ImportSchema(&arrow_schema);
if (!import_result.ok()) {
return InvalidSchema("Failed to import the arrow schema: {}",
import_result.status().message());
}
context_->arrow_schema_ = import_result.MoveValueUnsafe();

auto arrow_struct_type =
std::make_shared<::arrow::StructType>(context_->arrow_schema_->fields());
auto builder_result = ::arrow::MakeBuilder(arrow_struct_type);
if (!builder_result.ok()) {
return InvalidSchema("Failed to make the arrow builder: {}",
builder_result.status().message());
}
context_->builder_ = builder_result.MoveValueUnsafe();

return {};
}

Result<Data> ConvertBuilderToArrowArray() {
if (context_->builder_->length() == 0) {
return {};
}

auto builder_result = context_->builder_->Finish();
if (!builder_result.ok()) {
return InvalidArrowData("Failed to finish the arrow array builder: {}",
builder_result.status().message());
}

auto array = builder_result.MoveValueUnsafe();
ArrowArray arrow_array;
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
if (!export_result.ok()) {
return InvalidArrowData("Failed to export the arrow array: {}",
export_result.message());
}
return arrow_array;
}

private:
// Max number of rows in the record batch to read.
int64_t batch_size_{};
// The end of the split to read and used to terminate the reading.
std::optional<int64_t> split_end_;
// The schema to read.
std::shared_ptr<Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
// The avro reader to read the data into a datum.
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
// The context to keep track of the reading progress.
std::unique_ptr<ReadContext> context_;
};

Result<Reader::Data> AvroBatchReader::Next() { return impl_->Next(); }

Status AvroBatchReader::Open(const ReaderOptions& options) {
impl_ = std::make_unique<Impl>();
return impl_->Open(options);
}

Status AvroBatchReader::Close() { return impl_->Close(); }

} // namespace iceberg::avro
47 changes: 47 additions & 0 deletions src/iceberg/avro/avro_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "iceberg/file_reader.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::avro {

/// \brief A reader that reads ArrowArray from Avro files.
class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader {
public:
AvroBatchReader() = default;

~AvroBatchReader() override = default;

Status Open(const ReaderOptions& options) final;

Status Close() final;

Result<Data> Next() final;

DataLayout data_layout() const final { return DataLayout::kArrowArray; }

private:
class Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace iceberg::avro
11 changes: 7 additions & 4 deletions src/iceberg/avro/demo_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ Reader::DataLayout DemoAvroReader::data_layout() const {
return Reader::DataLayout::kStructLike;
}

ICEBERG_REGISTER_READER_FACTORY(
Avro, [](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
return std::make_unique<DemoAvroReader>();
});
Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; }

Status DemoAvroReader::Close() { return {}; }

ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result<std::unique_ptr<Reader>> {
return std::make_unique<DemoAvroReader>();
});

} // namespace iceberg::avro
2 changes: 2 additions & 0 deletions src/iceberg/avro/demo_avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
public:
DemoAvroReader() = default;
~DemoAvroReader() override = default;
Status Open(const ReaderOptions& options) override;
Status Close() override;
Result<Data> Next() override;
DataLayout data_layout() const override;
};
Expand Down
Loading
Loading