Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/iceberg/avro/avro_stream_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,8 @@ void AvroOutputStream::flush() {
}
}

std::shared_ptr<::arrow::io::OutputStream> AvroOutputStream::get_output_stream() const {
return output_stream_;
}

} // namespace iceberg::avro
2 changes: 2 additions & 0 deletions src/iceberg/avro/avro_stream_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class AvroOutputStream : public ::avro::OutputStream {
/// store, if any.
void flush() override;

std::shared_ptr<::arrow::io::OutputStream> get_output_stream() const;

private:
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
const int64_t buffer_size_;
Expand Down
27 changes: 23 additions & 4 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <avro/DataFile.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -67,22 +70,32 @@ class AvroWriter::Impl {
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
CreateOutputStream(options, kDefaultBufferSize));

arrow_output_stream_ = output_stream->get_output_stream();
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
std::move(output_stream), *avro_schema_);
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
}

Status Write(ArrowArray /*data*/) {
// TODO(xiao.dong) convert data and write to avro
// total_bytes_+= written_bytes;
Status Write(ArrowArray data) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
::arrow::ImportArray(&data, &arrow_schema_));

for (int64_t i = 0; i < result->length(); i++) {
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get()));
writer_->write(*datum_);
}

return {};
}

Status Close() {
if (writer_ != nullptr) {
writer_->close();
writer_.reset();
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell());
ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
}
return {};
}
Expand All @@ -99,6 +112,12 @@ class AvroWriter::Impl {
std::shared_ptr<::avro::ValidSchema> avro_schema_;
// The avro writer to write the data into a datum.
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
// Arrow schema for data conversion (C API format)
ArrowSchema arrow_schema_;
// Reusable Avro datum for writing individual records
std::unique_ptr<::avro::GenericDatum> datum_;
// Arrow output stream for writing data to filesystem
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
};

AvroWriter::~AvroWriter() = default;
Expand Down
65 changes: 64 additions & 1 deletion test/avro_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <arrow/array/array_base.h>
#include <arrow/c/bridge.h>
#include <arrow/c/helpers.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/io/file.h>
#include <arrow/json/from_string.h>
Expand All @@ -31,8 +30,10 @@

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_writer.h"
#include "iceberg/file_reader.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/type.h"
#include "matchers.h"
#include "temp_file_test_base.h"
Expand Down Expand Up @@ -105,6 +106,44 @@ class AvroReaderTest : public TempFileTestBase {
ASSERT_FALSE(data.value().has_value());
}

void WriteAndVerify(std::shared_ptr<Schema> schema,
const std::string& expected_string) {
ArrowSchema arrow_c_schema;
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());

auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
ASSERT_TRUE(arrow_schema_result.ok());
auto arrow_schema = arrow_schema_result.ValueOrDie();

auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string);
ASSERT_TRUE(array_result.ok());
auto array = array_result.ValueOrDie();

struct ArrowArray arrow_array;
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
ASSERT_TRUE(export_result.ok());

auto writer_result = WriterFactoryRegistry::Open(
FileFormatType::kAvro,
{.path = temp_avro_file_, .schema = schema, .io = file_io_});
ASSERT_TRUE(writer_result.has_value());
auto writer = std::move(writer_result.value());
ASSERT_THAT(writer->Write(arrow_array), IsOk());
ASSERT_THAT(writer->Close(), IsOk());

auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
ASSERT_TRUE(file_info_result.ok());
ASSERT_EQ(file_info_result->size(), writer->length().value());

auto reader_result = ReaderFactoryRegistry::Open(
FileFormatType::kAvro,
{.path = temp_avro_file_, .io = file_io_, .projection = schema});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
std::string temp_avro_file_;
Expand Down Expand Up @@ -161,4 +200,28 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(AvroReaderTest, AvroWriterBasicType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});

std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])";

WriteAndVerify(schema, expected_string);
}

TEST_F(AvroReaderTest, AvroWriterNestedType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeRequired(
2, "info",
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()),
SchemaField::MakeRequired(4, "age", std::make_shared<IntType>())}))});

std::string expected_string =
R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])";

WriteAndVerify(schema, expected_string);
}

} // namespace iceberg::avro
Loading