From ef3f7af73086b06f59f034dcc86e79c9ea7628d4 Mon Sep 17 00:00:00 2001 From: nullccxsy <32149055912@qq.com> Date: Mon, 1 Sep 2025 14:35:58 +0800 Subject: [PATCH 1/4] feat: implement AvroWriter Write method --- src/iceberg/avro/avro_writer.cc | 22 ++++++-- test/avro_test.cc | 95 ++++++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index f7caa69f1..e64358acb 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -26,14 +26,17 @@ #include #include #include +#include #include #include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -73,9 +76,22 @@ class AvroWriter::Impl { return {}; } - Status Write(ArrowArray /*data*/) { - // TODO(xiao.dong) convert data and write to avro - // total_bytes_+= written_bytes; + Status Write(ArrowArray data) { + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema)); + auto import_result = ::arrow::ImportArray(&data, &arrow_schema); + if (!import_result.ok()) { + std::cout << import_result.status() << std::endl; + return InvalidArgument("Failed to import ArrowArray: {}", + import_result.status().ToString()); + } + + auto arrow_array = import_result.ValueOrDie(); + ::avro::GenericDatum datum(*avro_schema_); + for (int64_t i = 0; i < arrow_array->length(); i++) { + ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*arrow_array, i, &datum)); + writer_->write(datum); + } return {}; } diff --git a/test/avro_test.cc b/test/avro_test.cc index cba29da12..493df4260 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -28,11 +27,14 @@ #include #include #include +#include #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/avro/avro_writer.h" #include "iceberg/file_reader.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/type.h" #include "matchers.h" #include "temp_file_test_base.h" @@ -105,6 +107,30 @@ class AvroReaderTest : public TempFileTestBase { ASSERT_FALSE(data.value().has_value()); } + void WriteAndVerify(std::shared_ptr schema, const std::string& expected_string, + ArrowArray array) { + iceberg::WriterOptions options; + options.schema = schema; + options.path = temp_avro_file_; + options.io = file_io_; + + auto writer_result = + iceberg::WriterFactoryRegistry::Open(iceberg::FileFormatType::kAvro, options); + ASSERT_TRUE(writer_result.has_value()); + auto writer = std::move(writer_result).value(); + ASSERT_THAT(writer->Write(array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string)); + + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); + } + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; std::shared_ptr file_io_; std::string temp_avro_file_; @@ -161,4 +187,71 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(AvroReaderTest, AvroWriterBasicType) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "name", std::make_shared())}); + + ArrowSchema struct_schema; + ASSERT_THAT(ToArrowSchema(*schema, &struct_schema), IsOk()); + + ArrowArray array; + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr)); + NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array)); + + std::vector str_values{"Hello", "世界", "nanoarrow"}; + std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])"; + + for (const auto& element : str_values) { + NANOARROW_THROW_NOT_OK( + ArrowArrayAppendString(array.children[0], ArrowCharView(element.c_str()))); + NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array)); + } + NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr)); + + WriteAndVerify(schema, expected_string, array); + + ArrowSchemaRelease(&struct_schema); +} + +TEST_F(AvroReaderTest, AvroWriterNestedType) { + auto nested_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired( + 2, "info", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", std::make_shared()), + SchemaField::MakeRequired(4, "age", std::make_shared())}))}); + + ArrowSchema struct_schema; + ASSERT_THAT(ToArrowSchema(*nested_schema, &struct_schema), IsOk()); + + ArrowArray array; + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr)); + + std::vector int_array = {1, 2, 3}; + std::vector> info_array = { + {"Alice", 25}, {"Bob", 30}, {"Ivy", 35}}; + + std::string expected_string = + R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])"; + + NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array)); + + for (int i = 0; i < int_array.size(); i++) { + NANOARROW_THROW_NOT_OK(ArrowArrayAppendInt(array.children[0], int_array[i])); + + NANOARROW_THROW_NOT_OK(ArrowArrayAppendString( + array.children[1]->children[0], ArrowCharView(info_array[i].first.c_str()))); + NANOARROW_THROW_NOT_OK( + ArrowArrayAppendInt(array.children[1]->children[1], info_array[i].second)); + + NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(array.children[1])); + NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array)); + } + + NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr)); + WriteAndVerify(nested_schema, expected_string, array); + ArrowSchemaRelease(&struct_schema); +} + } // namespace iceberg::avro From 8eaad5c7cc508d79082eaf312c50dea127055278 Mon Sep 17 00:00:00 2001 From: nullccxsy <32149055912@qq.com> Date: Tue, 2 Sep 2025 12:11:03 +0800 Subject: [PATCH 2/4] fix comments --- src/iceberg/avro/avro_stream_internal.cc | 4 ++ src/iceberg/avro/avro_stream_internal.h | 2 + src/iceberg/avro/avro_writer.cc | 31 +++++---- test/avro_test.cc | 86 ++++++++---------------- 4 files changed, 51 insertions(+), 72 deletions(-) diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc index 0ec603bd0..02b99ffcf 100644 --- a/src/iceberg/avro/avro_stream_internal.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -145,4 +145,8 @@ void AvroOutputStream::flush() { } } +std::shared_ptr<::arrow::io::OutputStream> AvroOutputStream::get_output_stream() const { + return output_stream_; +} + } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_stream_internal.h b/src/iceberg/avro/avro_stream_internal.h index bb75b4d8a..ee0163cc3 100644 --- a/src/iceberg/avro/avro_stream_internal.h +++ b/src/iceberg/avro/avro_stream_internal.h @@ -87,6 +87,8 @@ class AvroOutputStream : public ::avro::OutputStream { /// store, if any. void flush() override; + std::shared_ptr<::arrow::io::OutputStream> get_output_stream() const; + private: std::shared_ptr<::arrow::io::OutputStream> output_stream_; const int64_t buffer_size_; diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index e64358acb..489fae927 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -70,28 +70,23 @@ class AvroWriter::Impl { constexpr int64_t kDefaultBufferSize = 1024 * 1024; ICEBERG_ASSIGN_OR_RAISE(auto output_stream, CreateOutputStream(options, kDefaultBufferSize)); - + arrow_output_stream_ = output_stream->get_output_stream(); writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( std::move(output_stream), *avro_schema_); + datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); return {}; } Status Write(ArrowArray data) { - ArrowSchema arrow_schema; - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema)); - auto import_result = ::arrow::ImportArray(&data, &arrow_schema); - if (!import_result.ok()) { - std::cout << import_result.status() << std::endl; - return InvalidArgument("Failed to import ArrowArray: {}", - import_result.status().ToString()); - } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result, + ::arrow::ImportArray(&data, &arrow_schema_)); - auto arrow_array = import_result.ValueOrDie(); - ::avro::GenericDatum datum(*avro_schema_); - for (int64_t i = 0; i < arrow_array->length(); i++) { - ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*arrow_array, i, &datum)); - writer_->write(datum); + for (int64_t i = 0; i < result->length(); i++) { + ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get())); + writer_->write(*datum_); } + return {}; } @@ -99,6 +94,8 @@ class AvroWriter::Impl { if (writer_ != nullptr) { writer_->close(); writer_.reset(); + ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell()); + ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close()); } return {}; } @@ -115,6 +112,12 @@ class AvroWriter::Impl { std::shared_ptr<::avro::ValidSchema> avro_schema_; // The avro writer to write the data into a datum. std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; + // Arrow schema for data conversion (C API format) + ArrowSchema arrow_schema_; + // Reusable Avro datum for writing individual records + std::unique_ptr<::avro::GenericDatum> datum_; + // Arrow output stream for writing data to filesystem + std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_; }; AvroWriter::~AvroWriter() = default; diff --git a/test/avro_test.cc b/test/avro_test.cc index 493df4260..515883632 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -27,7 +27,6 @@ #include #include #include -#include #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" @@ -107,27 +106,41 @@ class AvroReaderTest : public TempFileTestBase { ASSERT_FALSE(data.value().has_value()); } - void WriteAndVerify(std::shared_ptr schema, const std::string& expected_string, - ArrowArray array) { - iceberg::WriterOptions options; - options.schema = schema; - options.path = temp_avro_file_; - options.io = file_io_; + void WriteAndVerify(std::shared_ptr schema, + const std::string& expected_string) { + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); - auto writer_result = - iceberg::WriterFactoryRegistry::Open(iceberg::FileFormatType::kAvro, options); + auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema); + ASSERT_TRUE(arrow_schema_result.ok()); + auto arrow_schema = arrow_schema_result.ValueOrDie(); + + auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string); + ASSERT_TRUE(array_result.ok()); + auto array = array_result.ValueOrDie(); + + struct ArrowArray arrow_array; + auto export_result = ::arrow::ExportArray(*array, &arrow_array); + ASSERT_TRUE(export_result.ok()); + + auto writer_result = WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .io = file_io_, .schema = schema}); ASSERT_TRUE(writer_result.has_value()); - auto writer = std::move(writer_result).value(); - ASSERT_THAT(writer->Write(array), IsOk()); + auto writer = std::move(writer_result.value()); + ASSERT_THAT(writer->Write(arrow_array), IsOk()); ASSERT_THAT(writer->Close(), IsOk()); + auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_); + ASSERT_TRUE(file_info_result.ok()); + ASSERT_EQ(file_info_result->size(), writer->length().value()); + auto reader_result = ReaderFactoryRegistry::Open( FileFormatType::kAvro, {.path = temp_avro_file_, .io = file_io_, .projection = schema}); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string)); - ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } @@ -191,30 +204,13 @@ TEST_F(AvroReaderTest, AvroWriterBasicType) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "name", std::make_shared())}); - ArrowSchema struct_schema; - ASSERT_THAT(ToArrowSchema(*schema, &struct_schema), IsOk()); - - ArrowArray array; - NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr)); - NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array)); - - std::vector str_values{"Hello", "世界", "nanoarrow"}; std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])"; - for (const auto& element : str_values) { - NANOARROW_THROW_NOT_OK( - ArrowArrayAppendString(array.children[0], ArrowCharView(element.c_str()))); - NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array)); - } - NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr)); - - WriteAndVerify(schema, expected_string, array); - - ArrowSchemaRelease(&struct_schema); + WriteAndVerify(schema, expected_string); } TEST_F(AvroReaderTest, AvroWriterNestedType) { - auto nested_schema = std::make_shared(std::vector{ + auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared()), SchemaField::MakeRequired( 2, "info", @@ -222,36 +218,10 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) { SchemaField::MakeRequired(3, "name", std::make_shared()), SchemaField::MakeRequired(4, "age", std::make_shared())}))}); - ArrowSchema struct_schema; - ASSERT_THAT(ToArrowSchema(*nested_schema, &struct_schema), IsOk()); - - ArrowArray array; - NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr)); - - std::vector int_array = {1, 2, 3}; - std::vector> info_array = { - {"Alice", 25}, {"Bob", 30}, {"Ivy", 35}}; - std::string expected_string = R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])"; - NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array)); - - for (int i = 0; i < int_array.size(); i++) { - NANOARROW_THROW_NOT_OK(ArrowArrayAppendInt(array.children[0], int_array[i])); - - NANOARROW_THROW_NOT_OK(ArrowArrayAppendString( - array.children[1]->children[0], ArrowCharView(info_array[i].first.c_str()))); - NANOARROW_THROW_NOT_OK( - ArrowArrayAppendInt(array.children[1]->children[1], info_array[i].second)); - - NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(array.children[1])); - NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array)); - } - - NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr)); - WriteAndVerify(nested_schema, expected_string, array); - ArrowSchemaRelease(&struct_schema); + WriteAndVerify(schema, expected_string); } } // namespace iceberg::avro From b7e3d05d7069bdc28407fed46270e61801c292db Mon Sep 17 00:00:00 2001 From: nullccxsy <32149055912@qq.com> Date: Tue, 2 Sep 2025 13:10:30 +0800 Subject: [PATCH 3/4] fix: correct parameter order in AvroWriter initialization --- test/avro_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/avro_test.cc b/test/avro_test.cc index 515883632..a41d571e6 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -125,7 +125,7 @@ class AvroReaderTest : public TempFileTestBase { auto writer_result = WriterFactoryRegistry::Open( FileFormatType::kAvro, - {.path = temp_avro_file_, .io = file_io_, .schema = schema}); + {.path = temp_avro_file_, .schema = schema, .io = file_io_}); ASSERT_TRUE(writer_result.has_value()); auto writer = std::move(writer_result.value()); ASSERT_THAT(writer->Write(arrow_array), IsOk()); From ab2befb39179f8642c047d2cf55eb105ae0e440d Mon Sep 17 00:00:00 2001 From: nullccxsy <32149055912@qq.com> Date: Wed, 3 Sep 2025 18:45:28 +0800 Subject: [PATCH 4/4] fix comments --- src/iceberg/avro/avro_stream_internal.cc | 3 ++- src/iceberg/avro/avro_stream_internal.h | 2 +- src/iceberg/avro/avro_writer.cc | 17 +++++++---------- test/avro_test.cc | 8 +++++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc index 02b99ffcf..3ff64bb45 100644 --- a/src/iceberg/avro/avro_stream_internal.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -145,7 +145,8 @@ void AvroOutputStream::flush() { } } -std::shared_ptr<::arrow::io::OutputStream> AvroOutputStream::get_output_stream() const { +const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream() + const { return output_stream_; } diff --git a/src/iceberg/avro/avro_stream_internal.h b/src/iceberg/avro/avro_stream_internal.h index ee0163cc3..18653cca6 100644 --- a/src/iceberg/avro/avro_stream_internal.h +++ b/src/iceberg/avro/avro_stream_internal.h @@ -87,7 +87,7 @@ class AvroOutputStream : public ::avro::OutputStream { /// store, if any. void flush() override; - std::shared_ptr<::arrow::io::OutputStream> get_output_stream() const; + const std::shared_ptr<::arrow::io::OutputStream>& arrow_output_stream() const; private: std::shared_ptr<::arrow::io::OutputStream> output_stream_; diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 489fae927..4d82fb9a3 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -53,9 +53,6 @@ Result> CreateOutputStream(const WriterOptions } // namespace -// A stateful context to keep track of the writing progress. -struct WriteContext {}; - class AvroWriter::Impl { public: Status Open(const WriterOptions& options) { @@ -70,7 +67,7 @@ class AvroWriter::Impl { constexpr int64_t kDefaultBufferSize = 1024 * 1024; ICEBERG_ASSIGN_OR_RAISE(auto output_stream, CreateOutputStream(options, kDefaultBufferSize)); - arrow_output_stream_ = output_stream->get_output_stream(); + arrow_output_stream_ = output_stream->arrow_output_stream(); writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( std::move(output_stream), *avro_schema_); datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_); @@ -105,19 +102,19 @@ class AvroWriter::Impl { int64_t length() { return total_bytes_; } private: - int64_t total_bytes_ = 0; // The schema to write. std::shared_ptr<::iceberg::Schema> write_schema_; // The avro schema to write. std::shared_ptr<::avro::ValidSchema> avro_schema_; + // Arrow output stream of the Avro file to write + std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_; // The avro writer to write the data into a datum. std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; - // Arrow schema for data conversion (C API format) - ArrowSchema arrow_schema_; - // Reusable Avro datum for writing individual records + // Reusable Avro datum for writing individual records. std::unique_ptr<::avro::GenericDatum> datum_; - // Arrow output stream for writing data to filesystem - std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_; + // Arrow schema to write data. + ArrowSchema arrow_schema_; + int64_t total_bytes_ = 0; }; AvroWriter::~AvroWriter() = default; diff --git a/test/avro_test.cc b/test/avro_test.cc index a41d571e6..4bc773cba 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -135,9 +135,11 @@ class AvroReaderTest : public TempFileTestBase { ASSERT_TRUE(file_info_result.ok()); ASSERT_EQ(file_info_result->size(), writer->length().value()); - auto reader_result = ReaderFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = temp_avro_file_, .io = file_io_, .projection = schema}); + auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro, + {.path = temp_avro_file_, + .length = file_info_result->size(), + .io = file_io_, + .projection = schema}); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));