diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc index 0ec603bd0..3ff64bb45 100644 --- a/src/iceberg/avro/avro_stream_internal.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -145,4 +145,9 @@ void AvroOutputStream::flush() { } } +const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream() + const { + return output_stream_; +} + } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_stream_internal.h b/src/iceberg/avro/avro_stream_internal.h index bb75b4d8a..18653cca6 100644 --- a/src/iceberg/avro/avro_stream_internal.h +++ b/src/iceberg/avro/avro_stream_internal.h @@ -87,6 +87,8 @@ class AvroOutputStream : public ::avro::OutputStream { /// store, if any. void flush() override; + const std::shared_ptr<::arrow::io::OutputStream>& arrow_output_stream() const; + private: std::shared_ptr<::arrow::io::OutputStream> output_stream_; const int64_t buffer_size_; diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index f7caa69f1..4d82fb9a3 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -26,14 +26,17 @@ #include #include #include +#include #include #include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -50,9 +53,6 @@ Result> CreateOutputStream(const WriterOptions } // namespace -// A stateful context to keep track of the writing progress. -struct WriteContext {}; - class AvroWriter::Impl { public: Status Open(const WriterOptions& options) { @@ -67,15 +67,23 @@ class AvroWriter::Impl { constexpr int64_t kDefaultBufferSize = 1024 * 1024; ICEBERG_ASSIGN_OR_RAISE(auto output_stream, CreateOutputStream(options, kDefaultBufferSize)); - + arrow_output_stream_ = output_stream->arrow_output_stream(); writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( std::move(output_stream), *avro_schema_); + datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); return {}; } - Status Write(ArrowArray /*data*/) { - // TODO(xiao.dong) convert data and write to avro - // total_bytes_+= written_bytes; + Status Write(ArrowArray data) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result, + ::arrow::ImportArray(&data, &arrow_schema_)); + + for (int64_t i = 0; i < result->length(); i++) { + ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get())); + writer_->write(*datum_); + } + return {}; } @@ -83,6 +91,8 @@ class AvroWriter::Impl { if (writer_ != nullptr) { writer_->close(); writer_.reset(); + ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell()); + ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close()); } return {}; } @@ -92,13 +102,19 @@ class AvroWriter::Impl { int64_t length() { return total_bytes_; } private: - int64_t total_bytes_ = 0; // The schema to write. std::shared_ptr<::iceberg::Schema> write_schema_; // The avro schema to write. std::shared_ptr<::avro::ValidSchema> avro_schema_; + // Arrow output stream of the Avro file to write + std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_; // The avro writer to write the data into a datum. std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; + // Reusable Avro datum for writing individual records. + std::unique_ptr<::avro::GenericDatum> datum_; + // Arrow schema to write data. + ArrowSchema arrow_schema_; + int64_t total_bytes_ = 0; }; AvroWriter::~AvroWriter() = default; diff --git a/test/avro_test.cc b/test/avro_test.cc index cba29da12..4bc773cba 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -31,8 +30,10 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/avro/avro_writer.h" #include "iceberg/file_reader.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/type.h" #include "matchers.h" #include "temp_file_test_base.h" @@ -105,6 +106,46 @@ class AvroReaderTest : public TempFileTestBase { ASSERT_FALSE(data.value().has_value()); } + void WriteAndVerify(std::shared_ptr schema, + const std::string& expected_string) { + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + + auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema); + ASSERT_TRUE(arrow_schema_result.ok()); + auto arrow_schema = arrow_schema_result.ValueOrDie(); + + auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string); + ASSERT_TRUE(array_result.ok()); + auto array = array_result.ValueOrDie(); + + struct ArrowArray arrow_array; + auto export_result = ::arrow::ExportArray(*array, &arrow_array); + ASSERT_TRUE(export_result.ok()); + + auto writer_result = WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .schema = schema, .io = file_io_}); + ASSERT_TRUE(writer_result.has_value()); + auto writer = std::move(writer_result.value()); + ASSERT_THAT(writer->Write(arrow_array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_); + ASSERT_TRUE(file_info_result.ok()); + ASSERT_EQ(file_info_result->size(), writer->length().value()); + + auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro, + {.path = temp_avro_file_, + .length = file_info_result->size(), + .io = file_io_, + .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string)); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); + } + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; std::shared_ptr file_io_; std::string temp_avro_file_; @@ -161,4 +202,28 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(AvroReaderTest, AvroWriterBasicType) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "name", std::make_shared())}); + + std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, AvroWriterNestedType) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired( + 2, "info", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", std::make_shared()), + SchemaField::MakeRequired(4, "age", std::make_shared())}))}); + + std::string expected_string = + R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])"; + + WriteAndVerify(schema, expected_string); +} + } // namespace iceberg::avro