Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/iceberg/avro/avro_stream_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,9 @@ void AvroOutputStream::flush() {
}
}

const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream()
const {
return output_stream_;
}

} // namespace iceberg::avro
2 changes: 2 additions & 0 deletions src/iceberg/avro/avro_stream_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class AvroOutputStream : public ::avro::OutputStream {
/// store, if any.
void flush() override;

const std::shared_ptr<::arrow::io::OutputStream>& arrow_output_stream() const;

private:
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
const int64_t buffer_size_;
Expand Down
32 changes: 24 additions & 8 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <avro/DataFile.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

Expand All @@ -50,9 +53,6 @@ Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions

} // namespace

// A stateful context to keep track of the writing progress.
struct WriteContext {};

class AvroWriter::Impl {
public:
Status Open(const WriterOptions& options) {
Expand All @@ -67,22 +67,32 @@ class AvroWriter::Impl {
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
CreateOutputStream(options, kDefaultBufferSize));

arrow_output_stream_ = output_stream->arrow_output_stream();
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
std::move(output_stream), *avro_schema_);
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
}

Status Write(ArrowArray /*data*/) {
// TODO(xiao.dong) convert data and write to avro
// total_bytes_+= written_bytes;
Status Write(ArrowArray data) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
::arrow::ImportArray(&data, &arrow_schema_));

for (int64_t i = 0; i < result->length(); i++) {
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get()));
writer_->write(*datum_);
}

return {};
}

Status Close() {
if (writer_ != nullptr) {
writer_->close();
writer_.reset();
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell());
ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
}
return {};
}
Expand All @@ -92,13 +102,19 @@ class AvroWriter::Impl {
int64_t length() { return total_bytes_; }

private:
int64_t total_bytes_ = 0;
// The schema to write.
std::shared_ptr<::iceberg::Schema> write_schema_;
// The avro schema to write.
std::shared_ptr<::avro::ValidSchema> avro_schema_;
// Arrow output stream of the Avro file to write
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
// The avro writer to write the data into a datum.
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
// Reusable Avro datum for writing individual records.
std::unique_ptr<::avro::GenericDatum> datum_;
// Arrow schema to write data.
ArrowSchema arrow_schema_;
int64_t total_bytes_ = 0;
};

AvroWriter::~AvroWriter() = default;
Expand Down
67 changes: 66 additions & 1 deletion test/avro_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <arrow/array/array_base.h>
#include <arrow/c/bridge.h>
#include <arrow/c/helpers.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/io/file.h>
#include <arrow/json/from_string.h>
Expand All @@ -31,8 +30,10 @@

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_writer.h"
#include "iceberg/file_reader.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/type.h"
#include "matchers.h"
#include "temp_file_test_base.h"
Expand Down Expand Up @@ -105,6 +106,46 @@ class AvroReaderTest : public TempFileTestBase {
ASSERT_FALSE(data.value().has_value());
}

void WriteAndVerify(std::shared_ptr<Schema> schema,
const std::string& expected_string) {
ArrowSchema arrow_c_schema;
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());

auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
ASSERT_TRUE(arrow_schema_result.ok());
auto arrow_schema = arrow_schema_result.ValueOrDie();

auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string);
ASSERT_TRUE(array_result.ok());
auto array = array_result.ValueOrDie();

struct ArrowArray arrow_array;
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
ASSERT_TRUE(export_result.ok());

auto writer_result = WriterFactoryRegistry::Open(
FileFormatType::kAvro,
{.path = temp_avro_file_, .schema = schema, .io = file_io_});
ASSERT_TRUE(writer_result.has_value());
auto writer = std::move(writer_result.value());
ASSERT_THAT(writer->Write(arrow_array), IsOk());
ASSERT_THAT(writer->Close(), IsOk());

auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
ASSERT_TRUE(file_info_result.ok());
ASSERT_EQ(file_info_result->size(), writer->length().value());

auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
{.path = temp_avro_file_,
.length = file_info_result->size(),
.io = file_io_,
.projection = schema});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
std::string temp_avro_file_;
Expand Down Expand Up @@ -161,4 +202,28 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(AvroReaderTest, AvroWriterBasicType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});

std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])";

WriteAndVerify(schema, expected_string);
}

TEST_F(AvroReaderTest, AvroWriterNestedType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeRequired(
2, "info",
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()),
SchemaField::MakeRequired(4, "age", std::make_shared<IntType>())}))});

std::string expected_string =
R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])";

WriteAndVerify(schema, expected_string);
}

} // namespace iceberg::avro
Loading